可扩展并发容器
一个用于并发和异步编程的高性能容器和实用工具集合。
特性
并发和异步容器
HashMap
是一个并发和异步哈希映射。HashSet
是一个并发和异步哈希集合。HashIndex
是一个针对读取优化的并发和异步哈希映射。HashCache
是一个由HashMap
支持的32路关联缓存。TreeIndex
是一个针对读取优化的并发和异步B+树。
并发编程实用工具
LinkedList
是一个实现无锁并发单向链表的类型特征。Queue
是一个并发无锁先进先出容器。Stack
是一个并发无锁后进先出容器。Bag
是一个并发无锁无序不透明容器。
HashMap
HashMap
是一个并发哈希映射,针对高度并行的写入密集型工作负载进行了优化。HashMap
的结构是一个无锁的条目桶数组栈。条目桶数组由sdd
管理,从而实现无锁访问和非阻塞容器大小调整。每个桶是一个固定大小的条目数组,由一个特殊的读写锁保护,该锁提供阻塞和异步方法。
锁定行为
条目访问:细粒度锁定
对条目的读/写访问由包含该条目的桶中的读写锁序列化。没有容器级锁,因此,容器越大,桶级锁发生争用的机会就越小。
大小调整:无锁
HashMap
的大小调整完全是非阻塞和无锁的;大小调整不会阻塞对容器的任何其他读/写访问或大小调整尝试。大小调整类似于将新的桶数组推入无锁栈。旧桶数组中的每个条目将在未来访问容器时逐步重新定位到新的桶数组,旧桶数组最终会在变空后被丢弃。
示例
如果键是唯一的,可以插入一个条目。插入的条目可以同步或异步地更新、读取和删除。
use scc::HashMap;
let hashmap: HashMap<u64, u32> = HashMap::default();
assert!(hashmap.insert(1, 0).is_ok());
assert!(hashmap.insert(1, 1).is_err());
assert_eq!(hashmap.upsert(1, 1).unwrap(), 0);
assert_eq!(hashmap.update(&1, |_, v| { *v = 3; *v }).unwrap(), 3);
assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 3);
assert_eq!(hashmap.remove(&1).unwrap(), (1, 3));
hashmap.entry(7).or_insert(17);
assert_eq!(hashmap.read(&7, |_, v| *v).unwrap(), 17);
let future_insert = hashmap.insert_async(2, 1);
let future_remove = hashmap.remove_async(&1);
如果工作流程比较复杂,HashMap
的Entry
API会很有用。
use scc::HashMap;
let hashmap: HashMap<u64, u32> = HashMap::default();
hashmap.entry(3).or_insert(7);
assert_eq!(hashmap.read(&3, |_, v| *v), Some(7));
hashmap.entry(4).and_modify(|v| { *v += 1 }).or_insert(5);
assert_eq!(hashmap.read(&4, |_, v| *v), Some(5));
let future_entry = hashmap.entry_async(3);
HashMap
不提供Iterator
,因为不可能将Iterator::Item
的生命周期限制在Iterator内。这个限制可以通过依赖内部可变性来规避,例如让返回的引用持有一个锁,但如果使用不当很容易导致死锁,而频繁获取锁可能会影响性能。因此,没有实现Iterator
,相反,HashMap
提供了一些方法来同步或异步地迭代条目:any
、any_async
、prune
、prune_async
、retain
、retain_async
、scan
、scan_async
、OccupiedEntry::next
和OccupiedEntry::next_async
。
use scc::HashMap;
let hashmap: HashMap<u64, u32> = HashMap::default();
assert!(hashmap.insert(1, 0).is_ok());
assert!(hashmap.insert(2, 1).is_ok());
// 可以通过`retain`修改或删除条目。
let mut acc = 0;
hashmap.retain(|k, v_mut| { acc += *k; *v_mut = 2; true });
assert_eq!(acc, 3);
assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 2);
assert_eq!(hashmap.read(&2, |_, v| *v).unwrap(), 2);
// 一旦找到满足谓词的条目,`any`就会返回`true`。
assert!(hashmap.insert(3, 2).is_ok());
assert!(hashmap.any(|k, _| *k == 3));
// 可以通过`retain`删除多个条目。
hashmap.retain(|k, v| *k == 1 && *v == 2);
// `hash_map::OccupiedEntry`也可以返回下一个最近的已占用条目。
let first_entry = hashmap.first_entry();
assert!(first_entry.is_some());
let second_entry = first_entry.and_then(|e| e.next());
assert!(second_entry.is_none());
// 使用`scan_async`异步迭代条目。
let future_scan = hashmap.scan_async(|k, v| println!("{k} {v}"));
HashSet
HashSet
是HashMap
的一个特殊版本,其中值类型为()
。
示例
大多数HashSet
方法与HashMap
相同,只是它们不接收值参数,并且一些用于修改值的HashMap
方法在HashSet
中没有实现。
use scc::HashSet;
let hashset: HashSet<u64> = HashSet::default();
assert!(hashset.read(&1, |_| true).is_none());
assert!(hashset.insert(1).is_ok());
assert!(hashset.read(&1, |_| true).unwrap());
let future_insert = hashset.insert_async(2);
let future_remove = hashset.remove_async(&1);
HashIndex
HashIndex
是HashMap
的一个针对读取优化的版本。在HashIndex
中,不仅桶数组的内存由sdd
管理,条目桶的内存也受sdd
保护,从而实现对单个条目的无锁读取访问。
条目生命周期
HashIndex
不会立即删除被移除的条目,而是在满足以下条件之一时才删除:
- 自上次在桶中移除条目以来,
Epoch
达到下一代,且桶被写入访问。 HashIndex
被清空或调整大小。- 充满已移除条目的桶占用了容量的50%。
这些条件不保证被移除的条目会在确定的时间内被删除,因此如果工作负载以写入为主且条目大小较大,HashIndex
可能不是最佳选择。
示例
peek
和peek_with
方法完全无锁。
use scc::HashIndex;
let hashindex: HashIndex<u64, u32> = HashIndex::default();
assert!(hashindex.insert(1, 0).is_ok());
// `peek`和`peek_with`是无锁的。
assert_eq!(hashindex.peek_with(&1, |_, v| *v).unwrap(), 0);
let future_insert = hashindex.insert_async(2, 1);
let future_remove = hashindex.remove_if_async(&1, |_| true);
HashIndex
的Entry
API可用于原地更新条目。
use scc::HashIndex;
let hashindex: HashIndex<u64, u32> = HashIndex::default();
assert!(hashindex.insert(1, 1).is_ok());
if let Some(mut o) = hashindex.get(&1) {
// 创建条目的新版本。
o.update(2);
};
if let Some(mut o) = hashindex.get(&1) {
// 原地更新条目。
unsafe { *o.get_mut() = 3; }
};
为HashIndex
实现了Iterator
,因为任何衍生的引用都可以在关联的ebr::Guard
存在期间存活。
use scc::ebr::Guard;
use scc::HashIndex;
let hashindex: HashIndex<u64, u32> = HashIndex::default();
assert!(hashindex.insert(1, 0).is_ok());
// 现有值可以被新值替换。
hashindex.get(&1).unwrap().update(1);
let guard = Guard::new();
// 必须为`iter`提供一个`Guard`。
let mut iter = hashindex.iter(&guard);
// 衍生的引用可以与`guard`同存。
let entry_ref = iter.next().unwrap();
assert_eq!(iter.next(), None);
drop(hashindex);
// 在`hashindex`被丢弃后仍可读取条目。
assert_eq!(entry_ref, (&1, &1));
HashCache
HashCache
是一个基于HashMap
实现的32路组相联并发缓存。HashCache
不跟踪整个缓存中最近最少使用的条目,而是每个桶维护一个已占用条目的双向链表,该链表在访问条目时更新,以跟踪桶内最近最少使用的条目。
示例
当插入新条目且桶已满时,会驱逐桶中最近最少使用的条目。
use scc::HashCache;
let hashcache: HashCache<u64, u32> = HashCache::with_capacity(100, 2000);
/// 容量不能超过最大容量。
assert_eq!(hashcache.capacity_range(), 128..=2048);
/// 如果对应`1`或`2`的桶已满,最近最少使用的条目将被驱逐。
assert!(hashcache.put(1, 0).is_ok());
assert!(hashcache.put(2, 0).is_ok());
/// `1`成为桶中最近访问的条目。
assert!(hashcache.get(&1).is_some());
/// 可以正常移除条目。
assert_eq!(hashcache.remove(&2).unwrap(), (2, 0));
TreeIndex
TreeIndex
是一个针对读取操作优化的B+树变体。sdd
保护单个条目使用的内存,从而实现对它们的无锁读取访问。
锁定行为
读取访问始终是无锁且非阻塞的。只要不需要结构变化,对条目的写入访问也是无锁且非阻塞的。然而,当写入操作正在分裂或合并节点时,受影响范围内的键的其他写入操作会被阻塞。
条目生命周期
TreeIndex
不会立即删除被移除的条目,而是在叶节点被清空或分裂时才删除,这使得TreeIndex
在写入密集的工作负载下可能不是最佳选择。
示例
如果键是唯一的,可以插入条目,然后可以读取和移除它。只有在内部节点分裂或合并时才会获取或等待锁。
use scc::TreeIndex;
let treeindex: TreeIndex<u64, u32> = TreeIndex::new();
assert!(treeindex.insert(1, 2).is_ok());
// `peek`和`peek_with`是无锁操作。
assert_eq!(treeindex.peek_with(&1, |_, v| *v).unwrap(), 2);
assert!(treeindex.remove(&1));
let future_insert = treeindex.insert_async(2, 3);
let future_remove = treeindex.remove_if_async(&1, |v| *v == 2);
可以在不获取任何锁的情况下扫描条目。
use scc::TreeIndex;
use sdd::Guard;
let treeindex: TreeIndex<u64, u32> = TreeIndex::new();
assert!(treeindex.insert(1, 10).is_ok());
assert!(treeindex.insert(2, 11).is_ok());
assert!(treeindex.insert(3, 13).is_ok());
let guard = Guard::new();
// `visitor`在不获取锁的情况下迭代条目。
let mut visitor = treeindex.iter(&guard);
assert_eq!(visitor.next().unwrap(), (&1, &10));
assert_eq!(visitor.next().unwrap(), (&2, &11));
assert_eq!(visitor.next().unwrap(), (&3, &13));
assert!(visitor.next().is_none());
可以扫描特定范围的键。
use scc::ebr::Guard;
use scc::TreeIndex;
let treeindex: TreeIndex<u64, u32> = TreeIndex::new();
for i in 0..10 {
assert!(treeindex.insert(i, 10).is_ok());
}
let guard = Guard::new();
assert_eq!(treeindex.range(1..1, &guard).count(), 0);
assert_eq!(treeindex.range(4..8, &guard).count(), 4);
assert_eq!(treeindex.range(4..=8, &guard).count(), 5);
Bag
Bag
是一个并发无锁的无序容器。Bag
完全不透明,在弹出之前不允许访问其中包含的实例。如果所包含实例的数量可以保持在ARRAY_LEN(默认:usize::BITS / 2)
以下,Bag
特别高效。
示例
use scc::Bag;
let bag: Bag<usize> = Bag::default();
bag.push(1);
assert!(!bag.is_empty());
assert_eq!(bag.pop(), Some(1));
assert!(bag.is_empty());
Queue
示例
use scc::Queue;
let queue: Queue<usize> = Queue::default();
queue.push(1);
assert!(queue.push_if(2, |e| e.map_or(false, |x| **x == 1)).is_ok());
assert!(queue.push_if(3, |e| e.map_or(false, |x| **x == 1)).is_err());
assert_eq!(queue.pop().map(|e| **e), Some(1));
assert_eq!(queue.pop().map(|e| **e), Some(2));
assert!(queue.pop().is_none());
Stack
示例
use scc::Stack;
let stack: Stack<usize> = Stack::default();
stack.push(1);
stack.push(2);
assert_eq!(stack.pop().map(|e| **e), Some(2));
assert_eq!(stack.pop().map(|e| **e), Some(1));
assert!(stack.pop().is_none());
LinkedList
LinkedList
是一个实现无锁并发单向链表操作的类型特征,由sdd
支持。它还提供了一种方法来标记链表的一个条目以表示用户定义的状态。
示例
use scc::ebr::{AtomicShared, Guard, Shared};
use scc::LinkedList;
use std::sync::atomic::Ordering::Relaxed;
#[derive(Default)]
struct L(AtomicShared<L>, usize);
impl LinkedList for L {
fn link_ref(&self) -> &AtomicShared<L> {
&self.0
}
}
let guard = Guard::new();
let head: L = L::default();
let tail: Shared<L> = Shared::new(L(AtomicShared::null(), 1));
// 推入一个新条目。
assert!(head.push_back(tail.clone(), false, Relaxed, &guard).is_ok());
assert!(!head.is_marked(Relaxed));
// 用户可以在条目上标记一个标志。
head.mark(Relaxed);
assert!(head.is_marked(Relaxed));
// `next_ptr`遍历链表。
let next_ptr = head.next_ptr(Relaxed, &guard);
assert_eq!(next_ptr.as_ref().unwrap().1, 1);
// 一旦`tail`被删除,它就变得不可见。
tail.delete_self(Relaxed);
assert!(head.next_ptr(Relaxed, &guard).is_null());
性能
HashMap
尾部延迟
在Apple M2 Max上,1048576次插入操作(K = u64, V = u64
)的延迟分布的预期尾部延迟范围从180微秒到200微秒。
HashMap
和 HashIndex
吞吐量
- Apple M2 Max(12核)上的结果。
- Intel Xeon(32核,avx2)上的结果。
- 谨慎解读结果,因为基准测试通常不能代表真实世界的工作负载。
更新日志
Footnotes
-
只有在启用相应目标特性时才会使用高级SIMD指令,例如
-C target_feature=+avx2
。 ↩