Project Icon

EnumerableAsyncProcessor

多模式异步任务处理库 灵活控制执行方式和速率

EnumerableAsyncProcessor是一个.NET异步任务处理库,提供单线程、批处理、并行和速率限制等多种执行模式。该库能够帮助管理大量异步操作,优化资源利用,并提供精细控制。适用于API请求限流、批量数据处理和并发性能优化等场景,简化了异步编程的复杂性。

EnumerableAsyncProcessor

以各种方式处理多个异步任务 - 一次一个/批量/速率限制/并发

nuget Codacy Badge CodeFactor

支持

如果这个库对你有帮助,考虑给我买杯咖啡

Buy Me A Coffee

安装

需要 .NET 6

通过 Nuget 安装 Install-Package EnumerableAsyncProcessor

为什么我要构建这个

因为我遇到过需要微调操作速率的情况。 也许你想要快速。 也许你想要缓慢。 也许你想要安全平衡。 也许你只是不想编写所有管理异步操作的样板代码!

速率限制并行处理器

类型

类型源对象返回对象方法 1方法 2
RateLimitedParallelAsyncProcessor.WithExecutionCount(int).ForEachAsync(delegate)
RateLimitedParallelAsyncProcessor<TInput>.WithItems(IEnumerable<TInput>).ForEachAsync(delegate)
ResultRateLimitedParallelAsyncProcessor<TOutput>.WithExecutionCount(int).SelectAsync(delegate)
ResultRateLimitedParallelAsyncProcessor<TInput, TOutput>.WithItems(IEnumerable<TInput>).SelectAsync(delegate)

工作原理
并行处理你的异步任务,但遵守你设置的限制。当一个完成时,另一个将开始。

例如,如果你设置了100的限制,任何时候都应该只有100个在运行

这是并行处理器和批处理器(见下文)的混合体 - 试图解决两者的缺点。提高批处理的速度,但不会通过使用完全并行化而使系统负担过重。

用法

var ids = Enumerable.Range(0, 5000).ToList();

// 如果你想返回something,使用SelectAsync
var results = await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .SelectAsync(id => DoSomethingAndReturnSomethingAsync(id), CancellationToken.None)
        .ProcessInParallel(levelOfParallelism: 100);

// 当你没有什么要返回时使用ForEachAsync
await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .ForEachAsync(id => DoSomethingAsync(id), CancellationToken.None) 
        .ProcessInParallel(levelOfParallelism: 100);

定时速率限制并行处理器(例如,限制RPS)

类型

类型源对象返回对象方法 1方法 2
TimedRateLimitedParallelAsyncProcessor.WithExecutionCount(int).ForEachAsync(delegate)
TimedRateLimitedParallelAsyncProcessor<TInput>.WithItems(IEnumerable<TInput>).ForEachAsync(delegate)
ResultTimedRateLimitedParallelAsyncProcessor<TOutput>.WithExecutionCount(int).SelectAsync(delegate)
ResultTimedRateLimitedParallelAsyncProcessor<TInput, TOutput>.WithItems(IEnumerable<TInput>).SelectAsync(delegate)

工作原理
并行处理你的异步任务,但遵守你在设置的时间跨度内设置的限制。当一个完成时,另一个将开始,除非你已经达到当前时间跨度内允许的最大值。

例如,如果你设置了100的限制,时间跨度为1秒,那么在1秒的过程中任何时候都应该只有100个操作在运行。如果操作在1秒(或你提供的时间跨度)之前完成,它会等待,然后在该时间跨度过去后开始下一个操作。

这在某些场景中很有用,例如,你有一个API,但它有每秒请求限制

用法

var ids = Enumerable.Range(0, 5000).ToList();

// 如果你想返回something,使用SelectAsync
var results = await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .SelectAsync(id => DoSomethingAndReturnSomethingAsync(id), CancellationToken.None)
        .ProcessInParallel(levelOfParallelism: 100, TimeSpan.FromSeconds(1));

// 当你没有什么要返回时使用ForEachAsync
await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .ForEachAsync(id => DoSomethingAsync(id), CancellationToken.None) 
        .ProcessInParallel(levelOfParallelism: 100, TimeSpan.FromSeconds(1));

注意事项

  • 如果你的操作耗时超过你提供的TimeSpan,你可能无法获得所需的吞吐量。这个处理器确保你不会超过你的速率限制,但如果你低于限制,它不会增加并行执行。

一次一个

类型

类型源对象返回对象方法1方法2
OneAtATimeAsyncProcessor.WithExecutionCount(int).ForEachAsync(delegate)
OneAtATimeAsyncProcessor<TInput>.WithItems(IEnumerable<TInput>).ForEachAsync(delegate)
ResultOneAtATimeAsyncProcessor<TOutput>.WithExecutionCount(int).SelectAsync(delegate)
ResultOneAtATimeAsyncProcessor<TInput, TOutput>.WithItems(IEnumerable<TInput>).SelectAsync(delegate)

工作原理
一次处理一个异步任务。同一时间只有一个任务在进行。当一个任务完成时,另一个任务开始。

用法

var ids = Enumerable.Range(0, 5000).ToList();

// 如果你想返回某些内容,使用SelectAsync
var results = await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .SelectAsync(id => DoSomethingAndReturnSomethingAsync(id), CancellationToken.None)
        .ProcessOneAtATime();

// 如果你不需要返回任何内容,使用ForEachAsync
await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .ForEachAsync(id => DoSomethingAsync(id), CancellationToken.None) 
        .ProcessOneAtATime();

注意事项

  • 最慢的方法

批处理

类型

类型源对象返回对象方法1方法2
BatchAsyncProcessor.WithExecutionCount(int).ForEachAsync(delegate)
BatchAsyncProcessor<TInput>.WithItems(IEnumerable<TInput>).ForEachAsync(delegate)
ResultBatchAsyncProcessor<TOutput>.WithExecutionCount(int).SelectAsync(delegate)
ResultBatchAsyncProcessor<TInput, TOutput>.WithItems(IEnumerable<TInput>).SelectAsync(delegate)

工作原理
分批处理异步任务。下一批任务不会开始,直到前一批中的每个任务都完成。

用法

var ids = Enumerable.Range(0, 5000).ToList();

// 如果你想返回某些内容,使用SelectAsync
var results = await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .SelectAsync(id => DoSomethingAndReturnSomethingAsync(id), CancellationToken.None)
        .ProcessInBatches(batchSize: 100);

// 如果你不需要返回任何内容,使用ForEachAsync
await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .ForEachAsync(id => DoSomethingAsync(id), CancellationToken.None) 
        .ProcessInBatches(batchSize: 100);

注意事项

  • 如果一批中的任何一个任务变慢或挂起,这将阻止下一批开始
  • 如果你设置了100个任务的批次,而70个已经完成,你只剩下30个在执行。这可能会减慢处理速度

并行

类型

类型源对象返回对象方法1方法2
ParallelAsyncProcessor.WithExecutionCount(int).ForEachAsync(delegate)
ParallelAsyncProcessor<TInput>.WithItems(IEnumerable<TInput>).ForEachAsync(delegate)
ResultParallelAsyncProcessor<TOutput>.WithExecutionCount(int).SelectAsync(delegate)
ResultParallelAsyncProcessor<TInput, TOutput>.WithItems(IEnumerable<TInput>).SelectAsync(delegate)

工作原理
尽可能快地处理异步任务。如果可能的话,同时处理所有任务。

用法

var ids = Enumerable.Range(0, 5000).ToList();

// 如果你想返回某些内容,使用SelectAsync
var results = await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .SelectAsync(id => DoSomethingAndReturnSomethingAsync(id), CancellationToken.None)
        .ProcessInParallel();

// 如果你不需要返回任何内容,使用ForEachAsync
await AsyncProcessorBuilder.WithItems(ids) // 或扩展方法: await ids.ToAsyncProcessorBuilder()
        .ForEachAsync(id => DoSomethingAsync(id), CancellationToken.None) 
        .ProcessInParallel();

注意事项

  • 根据你的操作数量,你可能会使系统负担过重。内存、CPU和网络使用可能会激增,导致瓶颈/崩溃/异常

处理器方法

如上所示,你可以直接在处理器上使用await来获取结果。 以下展示了使用处理器对象和各种可用方法的示例。

这适用于当你需要枚举一些对象并在操作中使用它们时。例如,向特定ID发送通知

    var httpClient = new HttpClient();

    var ids = Enumerable.Range(0, 5000).ToList();

    // 这适用于当你需要枚举一些对象并在操作中使用它们时
    
    var itemProcessor = Enumerable.Range(0, 5000).ToAsyncProcessorBuilder()
        .SelectAsync(NotifyAsync)
        .ProcessInParallel(100);

    // 或
    // var itemProcessor = AsyncProcessorBuilder.WithItems(ids)
    //     .SelectAsync(NotifyAsync, CancellationToken.None)
    //     .ProcessInParallel(100);

// GetEnumerableTasks()返回IEnumerable<Task<TOutput>> - 这些任务可能已完成,或可能仍在等待完成。
    var tasks = itemProcessor.GetEnumerableTasks();

// 或调用GetResultsAsyncEnumerable()以获取IAsyncEnumerable<TOutput>,以便在任务完成时实时处理它们。
    await foreach (var httpResponseMessage in itemProcessor.GetResultsAsyncEnumerable())
    {
        // 执行某些操作
    }

// 或调用GetResultsAsync()以获取包含所有已完成结果的Task<TOutput[]>
    var results = await itemProcessor.GetResultsAsync();

// 我的示例方法
    Task<HttpResponseMessage> NotifyAsync(int id)
    {
        return httpClient.GetAsync($"https://localhost:8080/notify/{id}");
    }

这适用于当你不需要任何对象 - 只是想执行某事一定次数时。例如,ping一个站点以预热多个实例

    var httpClient = new HttpClient();

    var itemProcessor = AsyncProcessorBuilder.WithExecutionCount(100)
        .SelectAsync(PingAsync, CancellationToken.None)
        .ProcessInParallel(10);

// GetEnumerableTasks()返回IEnumerable<Task> - 这些任务可能已完成,或者可能仍在等待完成。 var tasks = itemProcessor.GetEnumerableTasks();

// 或调用GetResultsAsyncEnumerable()以获取IAsyncEnumerable,这样您就可以在它们完成时实时处理它们。 await foreach (var httpResponseMessage in itemProcessor.GetResultsAsyncEnumerable()) { // 执行某些操作 }

// 或调用GetResultsAsync()以获取包含所有已完成结果的Task<TOutput[]> var results = await itemProcessor.GetResultsAsync();

// 我的示例方法 Task PingAsync() { return httpClient.GetAsync("https://localhost:8080/ping"); }

项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

白日梦AI

白日梦AI提供专注于AI视频生成的多样化功能,包括文生视频、动态画面和形象生成等,帮助用户快速上手,创造专业级内容。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

讯飞绘镜

讯飞绘镜是一个支持从创意到完整视频创作的智能平台,用户可以快速生成视频素材并创作独特的音乐视频和故事。平台提供多样化的主题和精选作品,帮助用户探索创意灵感。

Project Cover

讯飞文书

讯飞文书依托讯飞星火大模型,为文书写作者提供从素材筹备到稿件撰写及审稿的全程支持。通过录音智记和以稿写稿等功能,满足事务性工作的高频需求,帮助撰稿人节省精力,提高效率,优化工作与生活。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号