Project Icon

scalable-concurrent-containers

Rust并发编程高性能容器和工具库

scalable-concurrent-containers是一个Rust实现的高性能并发编程库。它提供了HashMap、HashSet、TreeIndex等并发容器,以及Queue、Stack等工具。这些组件支持异步操作,具有近线性扩展性,无自旋锁和忙等待。该库还支持Loom和Serde集成,并利用SIMD指令集优化查找性能。适用于要求高并发吞吐量的Rust项目。

可扩展并发容器

Cargo Crates.io GitHub Workflow Status

一个用于并发和异步编程的高性能容器和实用工具集合。

特性

  • 阻塞和同步方法的异步对应版本。
  • 支持LoomSerdefeatures = ["loom", "serde"]
  • 接近线性的可扩展性。
  • 无自旋锁和忙等循环。
  • 使用SIMD查找并行扫描多个条目1

并发和异步容器

  • HashMap是一个并发和异步哈希映射。
  • HashSet是一个并发和异步哈希集合。
  • HashIndex是一个针对读取优化的并发和异步哈希映射。
  • HashCache是一个由HashMap支持的32路关联缓存。
  • TreeIndex是一个针对读取优化的并发和异步B+树。

并发编程实用工具

  • LinkedList是一个实现无锁并发单向链表的类型特征。
  • Queue是一个并发无锁先进先出容器。
  • Stack是一个并发无锁后进先出容器。
  • Bag是一个并发无锁无序不透明容器。

HashMap

HashMap是一个并发哈希映射,针对高度并行的写入密集型工作负载进行了优化。HashMap的结构是一个无锁的条目桶数组栈。条目桶数组由sdd管理,从而实现无锁访问和非阻塞容器大小调整。每个桶是一个固定大小的条目数组,由一个特殊的读写锁保护,该锁提供阻塞和异步方法。

锁定行为

条目访问:细粒度锁定

对条目的读/写访问由包含该条目的桶中的读写锁序列化。没有容器级锁,因此,容器越大,桶级锁发生争用的机会就越小。

大小调整:无锁

HashMap的大小调整完全是非阻塞和无锁的;大小调整不会阻塞对容器的任何其他读/写访问或大小调整尝试。大小调整类似于将新的桶数组推入无锁栈。旧桶数组中的每个条目将在未来访问容器时逐步重新定位到新的桶数组,旧桶数组最终会在变空后被丢弃。

示例

如果键是唯一的,可以插入一个条目。插入的条目可以同步或异步地更新、读取和删除。

use scc::HashMap;

let hashmap: HashMap<u64, u32> = HashMap::default();

assert!(hashmap.insert(1, 0).is_ok());
assert!(hashmap.insert(1, 1).is_err());
assert_eq!(hashmap.upsert(1, 1).unwrap(), 0);
assert_eq!(hashmap.update(&1, |_, v| { *v = 3; *v }).unwrap(), 3);
assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 3);
assert_eq!(hashmap.remove(&1).unwrap(), (1, 3));

hashmap.entry(7).or_insert(17);
assert_eq!(hashmap.read(&7, |_, v| *v).unwrap(), 17);

let future_insert = hashmap.insert_async(2, 1);
let future_remove = hashmap.remove_async(&1);

如果工作流程比较复杂,HashMapEntry API会很有用。

use scc::HashMap;

let hashmap: HashMap<u64, u32> = HashMap::default();

hashmap.entry(3).or_insert(7);
assert_eq!(hashmap.read(&3, |_, v| *v), Some(7));

hashmap.entry(4).and_modify(|v| { *v += 1 }).or_insert(5);
assert_eq!(hashmap.read(&4, |_, v| *v), Some(5));

let future_entry = hashmap.entry_async(3);

HashMap不提供Iterator,因为不可能将Iterator::Item的生命周期限制在Iterator内。这个限制可以通过依赖内部可变性来规避,例如让返回的引用持有一个锁,但如果使用不当很容易导致死锁,而频繁获取锁可能会影响性能。因此,没有实现Iterator,相反,HashMap提供了一些方法来同步或异步地迭代条目:anyany_asyncpruneprune_asyncretainretain_asyncscanscan_asyncOccupiedEntry::nextOccupiedEntry::next_async

use scc::HashMap;

let hashmap: HashMap<u64, u32> = HashMap::default();

assert!(hashmap.insert(1, 0).is_ok());
assert!(hashmap.insert(2, 1).is_ok());

// 可以通过`retain`修改或删除条目。
let mut acc = 0;
hashmap.retain(|k, v_mut| { acc += *k; *v_mut = 2; true });
assert_eq!(acc, 3);
assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 2);
assert_eq!(hashmap.read(&2, |_, v| *v).unwrap(), 2);

// 一旦找到满足谓词的条目,`any`就会返回`true`。
assert!(hashmap.insert(3, 2).is_ok());
assert!(hashmap.any(|k, _| *k == 3));

// 可以通过`retain`删除多个条目。
hashmap.retain(|k, v| *k == 1 && *v == 2);
// `hash_map::OccupiedEntry`也可以返回下一个最近的已占用条目。
let first_entry = hashmap.first_entry();
assert!(first_entry.is_some());
let second_entry = first_entry.and_then(|e| e.next());
assert!(second_entry.is_none());

// 使用`scan_async`异步迭代条目。
let future_scan = hashmap.scan_async(|k, v| println!("{k} {v}"));

HashSet

HashSetHashMap的一个特殊版本,其中值类型为()

示例

大多数HashSet方法与HashMap相同,只是它们不接收值参数,并且一些用于修改值的HashMap方法在HashSet中没有实现。

use scc::HashSet;

let hashset: HashSet<u64> = HashSet::default();

assert!(hashset.read(&1, |_| true).is_none());
assert!(hashset.insert(1).is_ok());
assert!(hashset.read(&1, |_| true).unwrap());

let future_insert = hashset.insert_async(2);
let future_remove = hashset.remove_async(&1);

HashIndex

HashIndexHashMap的一个针对读取优化的版本。在HashIndex中,不仅桶数组的内存由sdd管理,条目桶的内存也受sdd保护,从而实现对单个条目的无锁读取访问。

条目生命周期

HashIndex不会立即删除被移除的条目,而是在满足以下条件之一时才删除:

  1. 自上次在桶中移除条目以来,Epoch达到下一代,且桶被写入访问。
  2. HashIndex被清空或调整大小。
  3. 充满已移除条目的桶占用了容量的50%。

这些条件不保证被移除的条目会在确定的时间内被删除,因此如果工作负载以写入为主且条目大小较大,HashIndex可能不是最佳选择。

示例

peekpeek_with方法完全无锁。

use scc::HashIndex;

let hashindex: HashIndex<u64, u32> = HashIndex::default();

assert!(hashindex.insert(1, 0).is_ok());

// `peek`和`peek_with`是无锁的。
assert_eq!(hashindex.peek_with(&1, |_, v| *v).unwrap(), 0);

let future_insert = hashindex.insert_async(2, 1);
let future_remove = hashindex.remove_if_async(&1, |_| true);

HashIndexEntry API可用于原地更新条目。

use scc::HashIndex;

let hashindex: HashIndex<u64, u32> = HashIndex::default();
assert!(hashindex.insert(1, 1).is_ok());

if let Some(mut o) = hashindex.get(&1) {
    // 创建条目的新版本。
    o.update(2);
};

if let Some(mut o) = hashindex.get(&1) {
    // 原地更新条目。
    unsafe { *o.get_mut() = 3; }
};

HashIndex实现了Iterator,因为任何衍生的引用都可以在关联的ebr::Guard存在期间存活。

use scc::ebr::Guard;
use scc::HashIndex;

let hashindex: HashIndex<u64, u32> = HashIndex::default();

assert!(hashindex.insert(1, 0).is_ok());

// 现有值可以被新值替换。
hashindex.get(&1).unwrap().update(1);

let guard = Guard::new();

// 必须为`iter`提供一个`Guard`。
let mut iter = hashindex.iter(&guard);

// 衍生的引用可以与`guard`同存。
let entry_ref = iter.next().unwrap();
assert_eq!(iter.next(), None);

drop(hashindex);

// 在`hashindex`被丢弃后仍可读取条目。
assert_eq!(entry_ref, (&1, &1));

HashCache

HashCache是一个基于HashMap实现的32路组相联并发缓存。HashCache不跟踪整个缓存中最近最少使用的条目,而是每个桶维护一个已占用条目的双向链表,该链表在访问条目时更新,以跟踪桶内最近最少使用的条目。

示例

当插入新条目且桶已满时,会驱逐桶中最近最少使用的条目。

use scc::HashCache;

let hashcache: HashCache<u64, u32> = HashCache::with_capacity(100, 2000);

/// 容量不能超过最大容量。
assert_eq!(hashcache.capacity_range(), 128..=2048);

/// 如果对应`1`或`2`的桶已满,最近最少使用的条目将被驱逐。
assert!(hashcache.put(1, 0).is_ok());
assert!(hashcache.put(2, 0).is_ok());

/// `1`成为桶中最近访问的条目。
assert!(hashcache.get(&1).is_some());

/// 可以正常移除条目。
assert_eq!(hashcache.remove(&2).unwrap(), (2, 0));

TreeIndex

TreeIndex是一个针对读取操作优化的B+树变体。sdd保护单个条目使用的内存,从而实现对它们的无锁读取访问。

锁定行为

读取访问始终是无锁且非阻塞的。只要不需要结构变化,对条目的写入访问也是无锁且非阻塞的。然而,当写入操作正在分裂或合并节点时,受影响范围内的键的其他写入操作会被阻塞。

条目生命周期

TreeIndex不会立即删除被移除的条目,而是在叶节点被清空或分裂时才删除,这使得TreeIndex在写入密集的工作负载下可能不是最佳选择。

示例

如果键是唯一的,可以插入条目,然后可以读取和移除它。只有在内部节点分裂或合并时才会获取或等待锁。

use scc::TreeIndex;

let treeindex: TreeIndex<u64, u32> = TreeIndex::new();

assert!(treeindex.insert(1, 2).is_ok());

// `peek`和`peek_with`是无锁操作。
assert_eq!(treeindex.peek_with(&1, |_, v| *v).unwrap(), 2);
assert!(treeindex.remove(&1));

let future_insert = treeindex.insert_async(2, 3);
let future_remove = treeindex.remove_if_async(&1, |v| *v == 2);

可以在不获取任何锁的情况下扫描条目。

use scc::TreeIndex;
use sdd::Guard;

let treeindex: TreeIndex<u64, u32> = TreeIndex::new();

assert!(treeindex.insert(1, 10).is_ok());
assert!(treeindex.insert(2, 11).is_ok());
assert!(treeindex.insert(3, 13).is_ok());

let guard = Guard::new();

// `visitor`在不获取锁的情况下迭代条目。
let mut visitor = treeindex.iter(&guard);
assert_eq!(visitor.next().unwrap(), (&1, &10));
assert_eq!(visitor.next().unwrap(), (&2, &11));
assert_eq!(visitor.next().unwrap(), (&3, &13));
assert!(visitor.next().is_none());

可以扫描特定范围的键。

use scc::ebr::Guard;
use scc::TreeIndex;

let treeindex: TreeIndex<u64, u32> = TreeIndex::new();

for i in 0..10 {
    assert!(treeindex.insert(i, 10).is_ok());
}

let guard = Guard::new();

assert_eq!(treeindex.range(1..1, &guard).count(), 0);
assert_eq!(treeindex.range(4..8, &guard).count(), 4);
assert_eq!(treeindex.range(4..=8, &guard).count(), 5);

Bag

Bag是一个并发无锁的无序容器。Bag完全不透明,在弹出之前不允许访问其中包含的实例。如果所包含实例的数量可以保持在ARRAY_LEN(默认:usize::BITS / 2)以下,Bag特别高效。

示例

use scc::Bag;

let bag: Bag<usize> = Bag::default();

bag.push(1);
assert!(!bag.is_empty());
assert_eq!(bag.pop(), Some(1));
assert!(bag.is_empty());

Queue

Queue是一个由sdd支持的并发无锁先进先出容器。

示例

use scc::Queue;

let queue: Queue<usize> = Queue::default();

queue.push(1);
assert!(queue.push_if(2, |e| e.map_or(false, |x| **x == 1)).is_ok());
assert!(queue.push_if(3, |e| e.map_or(false, |x| **x == 1)).is_err());
assert_eq!(queue.pop().map(|e| **e), Some(1));
assert_eq!(queue.pop().map(|e| **e), Some(2));
assert!(queue.pop().is_none());

Stack

Stack是一个由sdd支持的并发无锁后进先出容器。

示例

use scc::Stack;

let stack: Stack<usize> = Stack::default();

stack.push(1);
stack.push(2);
assert_eq!(stack.pop().map(|e| **e), Some(2));
assert_eq!(stack.pop().map(|e| **e), Some(1));
assert!(stack.pop().is_none());

LinkedList

LinkedList是一个实现无锁并发单向链表操作的类型特征,由sdd支持。它还提供了一种方法来标记链表的一个条目以表示用户定义的状态。

示例

use scc::ebr::{AtomicShared, Guard, Shared};
use scc::LinkedList;
use std::sync::atomic::Ordering::Relaxed;

#[derive(Default)]
struct L(AtomicShared<L>, usize);
impl LinkedList for L {
    fn link_ref(&self) -> &AtomicShared<L> {
        &self.0
    }
}

let guard = Guard::new();

let head: L = L::default();
let tail: Shared<L> = Shared::new(L(AtomicShared::null(), 1));

// 推入一个新条目。
assert!(head.push_back(tail.clone(), false, Relaxed, &guard).is_ok());
assert!(!head.is_marked(Relaxed));

// 用户可以在条目上标记一个标志。
head.mark(Relaxed);
assert!(head.is_marked(Relaxed));

// `next_ptr`遍历链表。
let next_ptr = head.next_ptr(Relaxed, &guard);
assert_eq!(next_ptr.as_ref().unwrap().1, 1);

// 一旦`tail`被删除,它就变得不可见。
tail.delete_self(Relaxed);
assert!(head.next_ptr(Relaxed, &guard).is_null());

性能

HashMap 尾部延迟

在Apple M2 Max上,1048576次插入操作(K = u64, V = u64)的延迟分布的预期尾部延迟范围从180微秒到200微秒。

HashMapHashIndex 吞吐量

更新日志

Footnotes

  1. 只有在启用相应目标特性时才会使用高级SIMD指令,例如-C target_feature=+avx2

项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

吐司

探索Tensor.Art平台的独特AI模型,免费访问各种图像生成与AI训练工具,从Stable Diffusion等基础模型开始,轻松实现创新图像生成。体验前沿的AI技术,推动个人和企业的创新发展。

Project Cover

SubCat字幕猫

SubCat字幕猫APP是一款创新的视频播放器,它将改变您观看视频的方式!SubCat结合了先进的人工智能技术,为您提供即时视频字幕翻译,无论是本地视频还是网络流媒体,让您轻松享受各种语言的内容。

Project Cover

美间AI

美间AI创意设计平台,利用前沿AI技术,为设计师和营销人员提供一站式设计解决方案。从智能海报到3D效果图,再到文案生成,美间让创意设计更简单、更高效。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号