Project Icon

disruptor-rs

Rust实现的高性能跨线程通信库

disruptor-rs是一个Rust实现的高性能跨线程通信库。支持多种生产者-消费者模式,提供批量事件处理功能。专注低延迟设计,基准测试优于Crossbeam。支持线程亲和性设置和处理器线程命名,适用于对延迟敏感的应用场景。

Crates.io Crates.io 构建 codecov

Disruptor

这是一个用Rust编写的低延迟线程间通信库。

它深受出色的LMAX Disruptor库的启发。

目录

入门

在你的Cargo.toml文件中添加以下内容:

disruptor = "3.2.0"

要了解如何使用该库的详细信息,请查看docs.rs/disruptor上的文档。

以下是一个最小示例,演示了单个和批量发布。注意,为了获得最佳延迟和吞吐量,应尽可能使用批量发布(参见下面的基准测试)。

use disruptor::*;

// 环形缓冲区中的事件。
struct Event {
    price: f64
}

fn main() {
    // 用于初始化环形缓冲区中事件的工厂闭包。
    let factory = || { Event { price: 0.0 }};

    // 用于处理事件的闭包。
    let processor = |e: &Event, sequence: Sequence, end_of_batch: bool| {
        // 在这里添加你的处理逻辑。
    };

    let size = 64;
    let mut producer = disruptor::build_single_producer(size, factory, BusySpin)
        .handle_events_with(processor)
        .build();

    // 通过`Producer`句柄将单个事件发布到Disruptor中。
    for i in 0..10 {
        producer.publish(|e| {
            e.price = i as f64;
        });
    }

    // 将一批事件发布到Disruptor中。
    producer.batch_publish(5, |iter| {
        for e in iter { // `iter`保证会产生5个事件。
            e.price = 42.0;
        }
    });
}// 在此处,Producer实例超出作用域,当处理器完成处理所有事件后,
 // Disruptor也会被释放。

该库还支持将线程固定在核心上,以避免上下文切换引起的延迟。 一个更高级的用法,演示了这一点以及多个生产者和多个相互依赖的消费者,可能如下所示:

use disruptor::*;
use std::thread;

struct Event {
    price: f64
}

fn main() {
    let factory = || { Event { price: 0.0 }};

    // 用于处理事件的闭包。
    let h1 = |e: &Event, sequence: Sequence, end_of_batch: bool| {
        // 在这里添加处理逻辑。
    };
    let h2 = |e: &Event, sequence: Sequence, end_of_batch: bool| {
        // 在这里添加一些处理逻辑。
    };
    let h3 = |e: &Event, sequence: Sequence, end_of_batch: bool| {
        // 在这里添加更多处理逻辑。
    };

    let mut producer1 = disruptor::build_multi_producer(64, factory, BusySpin)
        // `h2`与`h1`并发处理事件。
        .pin_at_core(1).handle_events_with(h1)
        .pin_at_core(2).handle_events_with(h2)
            .and_then()
            // `h3`在`h1`和`h2`之后处理事件。
            .pin_at_core(3).handle_events_with(h3)
        .build();

    // 创建另一个生产者。
    let mut producer2 = producer1.clone();

    // 发布到Disruptor。
    thread::scope(|s| {
        s.spawn(move || {
            for i in 0..10 {
                producer1.publish(|e| {
                    e.price = i as f64;
                });
            }
        });
        s.spawn(move || {
            for i in 10..20 {
                producer2.publish(|e| {
                    e.price = i as f64;
                });
            }
        });
    });
}// 在此处,Producer实例超出作用域,当处理器完成处理所有事件后,
 // Disruptor也会被释放。

如果你需要在处理器线程中存储一些既不是Send也不是Sync的状态,例如Rc<RefCell<i32>>,那么你可以创建一个用于初始化该状态的闭包,并在构建Disruptor时将其与处理闭包一起传递。然后Disruptor将在每个事件上传递一个指向你的状态的可变引用。例如:

use std::{cell::RefCell, rc::Rc};
use disruptor::*;

struct Event {
    price: f64
}

#[derive(Default)]
struct State {
    data: Rc<RefCell<i32>>
}

fn main() {
    let factory = || { Event { price: 0.0 }};
    let initial_state = || { State::default() };

    // 用于处理事件*和*状态的闭包。
    let processor = |s: &mut State, e: &Event, _: Sequence, _: bool| {
        // 修改你的自定义状态:
        *s.data.borrow_mut() += 1;
    };

    let size = 64;
    let mut producer = disruptor::build_single_producer(size, factory, BusySpin)
        .handle_events_and_state_with(processor, initial_state)
        .build();

    for i in 0..10 {
        producer.publish(|e| {
            e.price = i as f64;
        });
    }
}

特性

  • 单生产者单消费者(SPSC)。
  • 单生产者多消费者(SPMC)与消费者相互依赖。
  • 多生产者单消费者(MPSC)。
  • 多生产者多消费者(MPMC)与消费者相互依赖。
  • 忙等待策略。
  • 事件批量发布。
  • 事件批量消费。
  • 可以为事件处理器线程设置线程亲和性。
  • 设置每个事件处理器线程的线程名称。

设计选择

该库中的一切都是关于低延迟的,这极大地影响了该库中的所有选择。 例如,你不能分配一个事件并将其移动到环形缓冲区中。相反,事件在启动时就被分配,以确保它们在内存中共同定位,从而提高缓存一致性。 然而,你仍然可以在堆上分配一个结构体,并将其所有权移动到环形缓冲区上的事件字段中。 只要你意识到这可能会增加延迟,因为结构体是由一个线程分配而由另一个线程释放的。 因此,在分配器中会发生同步。

也没有使用动态分发 - 一切都是单态的。

正确性

这个库需要使用Unsafe来实现低延迟。 虽然不能保证没有错误,但已经使用了以下方法来消除错误:

  • 最小化Unsafe块的使用。
  • 高测试覆盖率。
  • 所有测试都在CI/CD中通过Miri运行。
  • 在TLA+中进行验证(参见verification/文件夹)。

性能

SPSC和MPSC Disruptor变体已经进行了基准测试,并与Crossbeam进行了比较。请参见benches/spsc.rsbenches/mpsc.rs文件中的代码。 以下是在 2016 年的 MacBook Pro(配备 2.6 GHz 四核 Intel Core i7 处理器)上运行 SPSC 基准测试的结果。在现代 Intel Xeon 处理器上,这些数字应该会更好。此外,在 Mac 上无法隔离核心并固定线程,这本可以产生更稳定的结果。这是未来的工作。

如果您有任何改进基准测试的建议,请随时提出问题。

为了提供一个相对真实的基准测试,不仅考虑了不同大小的突发,还考虑了突发之间的不同暂停时间:0 毫秒、1 毫秒和 10 毫秒。

以下延迟是每个元素的平均延迟,置信区间为 95%(标准 criterion 设置)。捕获所有延迟并计算各种百分位数(特别是最大延迟)是未来的工作。然而,我预计下面的测量结果能代表您在实际应用中可以达到的性能。

突发之间无暂停

延迟:

突发大小CrossbeamDisruptor改进
165 ns32 ns51%
1068 ns9 ns87%
10029 ns8 ns72%

吞吐量:

突发大小CrossbeamDisruptor改进
115.2M / s31.7M / s109%
1014.5M / s117.3M / s709%
10034.3M / s119.7M / s249%

突发之间暂停 1 毫秒

延迟:

突发大小CrossbeamDisruptor改进
163 ns33 ns48%
1067 ns8 ns88%
10030 ns9 ns70%

吞吐量:

突发大小CrossbeamDisruptor改进
115.9M / s30.7M / s93%
1014.9M / s117.7M / s690%
10033.8M / s105.0M / s211%

突发之间暂停 10 毫秒

延迟:

突发大小CrossbeamDisruptor改进
151 ns32 ns37%
1067 ns9 ns87%
10030 ns10 ns67%

吞吐量:

突发大小CrossbeamDisruptor改进
119.5M / s31.6M / s62%
1014.9M / s114.5M / s668%
10033.6M / s105.0M / s213%

结论

Disruptor 和 Crossbeam 库之间显然存在差异。然而,这并不是因为 Crossbeam 库不是一个优秀的软件。事实上它很优秀。Disruptor 通过牺牲 CPU 和内存资源来换取更低的延迟和更高的吞吐量,这就是它能够达到这些结果的原因。如基准测试中 10 和 100 个事件的突发所示,Disruptor 在发布事件批次时表现更为出色。

随着突发大小的增加,两个库的性能都有很大提升,但 Disruptor 的性能对突发之间的暂停更具韧性,这也是其设计目标之一。

相关工作

有多个其他 Rust 项目模仿了 LMAX Disruptor 库:

  1. Turbine
  2. Disrustor

该库支持的一个关键特性是来自不同线程的多个生产者,而上述库都不支持这一特性(在撰写本文时)。

贡献

欢迎您创建拉取请求或提出问题,提出改进建议。

我将全权决定是否接受更改,并将重点关注这些更改是否适合本 crate 的目的和设计。

路线图

空白!所有项目都已实现。

项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

白日梦AI

白日梦AI提供专注于AI视频生成的多样化功能,包括文生视频、动态画面和形象生成等,帮助用户快速上手,创造专业级内容。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

讯飞绘镜

讯飞绘镜是一个支持从创意到完整视频创作的智能平台,用户可以快速生成视频素材并创作独特的音乐视频和故事。平台提供多样化的主题和精选作品,帮助用户探索创意灵感。

Project Cover

讯飞文书

讯飞文书依托讯飞星火大模型,为文书写作者提供从素材筹备到稿件撰写及审稿的全程支持。通过录音智记和以稿写稿等功能,满足事务性工作的高频需求,帮助撰稿人节省精力,提高效率,优化工作与生活。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号