Project Icon

tokio-cron-scheduler

Rust异步Cron调度库

tokio-cron-scheduler是一个Rust异步Cron调度库,基于Tokio构建。它提供Cron风格的任务调度,支持即时执行、固定间隔重复任务及任务数据持久化。该库支持PostgreSQL和Nats存储,具备灵活的时区设置和任务状态通知功能。适合需要在异步环境中进行精确任务调度的Rust项目。

tokio-cron-scheduler

在异步 tokio 环境中使用类似 cron 的调度。 还可以立即调度任务或以固定时间间隔重复执行任务。 任务数据可以选择使用 PostgreSQL 或 Nats 进行持久化。

灵感来自 https://github.com/lholden/job_scheduler

使用方法

更多详细信息请参阅文档

请确保在 Cargo.toml 中添加 job_scheduler crate:

[dependencies]
tokio-cron-scheduler = "*"

使用 cron 库的 Schedule 类型的 FromStr 实现来为作业创建调度。

调度格式如下:

秒   分   时   日   月   星期   年
*    *    *    *    *    *      *

时间以 UTC 指定,而非您的本地时区。请注意,年份可以省略。如果您想使用您的时区,请在作业创建调用后附加 _tz(例如 Job::new_async 对比 Job::new_async_tz)。

以逗号分隔的值(如 5,8,10)表示多个时间值。例如,调度 0 2,14,26 * * * * 将在每小时的第 2、14 和 26 分钟执行。

范围可以用破折号指定。调度 0 0 * 5-10 * * 将每小时执行一次,但仅在每月的 5 日至 10 日执行。

星期几可以用缩写或全名指定。调度 0 0 6 * * Sun,Sat 将在周日和周六的早上 6 点执行。

对于每个作业,您可以在作业开始、停止和移除时收到通知。由于这些通知是使用 tokio::spawn 调度的,如果任务快速完成,则无法保证这些通知的顺序。

一个简单的使用示例:

use std::time::Duration;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};

#[tokio::main]
async fn main() -> Result<(), JobSchedulerError> {
    let mut sched = JobScheduler::new().await?;

    // 添加基本 cron 作业
    sched.add(
        Job::new("1/10 * * * * *", |_uuid, _l| {
            println!("我每 10 秒运行一次");
        })?
    ).await?;

    // 添加异步作业
    sched.add(
        Job::new_async("1/7 * * * * *", |uuid, mut l| {
            Box::pin(async move {
                println!("我每 7 秒异步运行一次");

                // 查询此作业的下一次执行时间
                let next_tick = l.next_tick_for_job(uuid).await;
                match next_tick {
                    Ok(Some(ts)) => println!("7 秒作业的下一次运行时间是 {:?}", ts),
                    _ => println!("无法获取 7 秒作业的下一次运行时间"),
                }
            })
        })?
    ).await?;

    // 添加指定持续时间的一次性作业
    sched.add(
        Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
            println!("我只运行一次");
        })?
    ).await?;

    // 创建指定持续时间的重复作业,使其可变以便之后编辑
    let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
        println!("我每 8 秒重复运行一次");
    })?;

    // 添加在作业开始/停止等时执行的操作
    jj.on_start_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("作业 {:?} 已启动,通知 {:?} 已运行({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;

    jj.on_stop_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("作业 {:?} 已完成,通知 {:?} 已运行({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;

    jj.on_removed_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("作业 {:?} 已移除,通知 {:?} 已运行({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;
    sched.add(jj).await?;

    // 必须启用 'signal' 功能
    sched.shutdown_on_ctrl_c();

    // 添加在关闭期间/之后运行的代码
    sched.set_shutdown_handler(Box::new(|| {
        Box::pin(async move {
            println!("关闭完成");
        })
    }));

    // 启动调度器
    sched.start().await?;

    // 等待作业运行
    tokio::time::sleep(Duration::from_secs(100)).await;

    Ok(())
}

时区变更

您可以使用 JobBuilder API 创建使用特定时区的作业。 chrono-tz 不包含在依赖项中,因此如果您想轻松创建 Timezone 结构,需要将其添加到您的 Cargo.toml 中。

    let job = JobBuilder::new()
.with_timezone(chrono_tz::Africa::Johannesburg)
.with_cron_job_type()
.with_schedule("*/2 * * * * *")
.unwrap()
.with_run_async(Box::new( | uuid, mut l| {
Box::pin(async move {
info ! ("JHB 每 2 秒异步运行一次,id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info !("JHB 2 秒作业的下一次运行时间是 {:?}", ts),
_ => warn !("无法获取 2 秒作业的下一次运行时间"),
}
})
}))
.build()
.unwrap();

类似的库

  • job_scheduler 启发这个 crate 的项目
  • cron 我们使用的 cron 表达式解析器
  • schedule-rs 是一个类似的 Rust 库,实现了自己的 cron 表达式解析器

许可证

TokioCronScheduler 使用以下任一许可证

自定义存储

MetadataStore和NotificationStore特质可以被实现并在JobScheduler中使用。

默认提供了基于易失性哈希映射的SimpleMetadataStore和SimpleNotificationStore版本。使用Nats的持久版本由NatsMetadataStore和NatsNotificationStore提供。

贡献

除非您另有明确声明,否则您有意提交以包含在作品中的任何贡献,按照Apache-2.0许可证的定义,均应按上述方式双重许可,无任何附加条款或条件。

更多信息请参阅CONTRIBUTING文件。

特性

has_bytes

自0.7版本起

使Prost生成的数据结构可被需要获取数据结构字节的存储使用。Nats和Postgres存储依赖于启用此特性。

postgres_storage

自0.6版本起

添加Postgres元数据存储和通知存储(PostgresMetadataStore, PostgresNotificationStore)。使用Postgres数据库存储元数据和通知数据。

参见PostgreSQL文档

postgres_native_tls

自0.6版本起

使用postgres-native-tls crate作为PostgreSQL连接的TLS提供程序。

postgres_openssl

自0.6版本起

使用postgres-openssl crate作为PostgreSQL连接的TLS提供程序。

nats_storage

自0.6版本起

添加Nats元数据存储和通知存储(NatsMetadataStore, NatsNotificationStore)。使用Nats系统作为存储元数据和通知的方式。

参见Nats文档

signal

自0.5版本起

为调度器添加shutdown_on_signalshutdown_on_ctrl_c。 当接收到信号时,两者都会关闭系统(停止调度器并移除所有任务)。

由于这利用了Tokio的信号处理,因此仅在Unix系统上可用。

编写测试

在进行tokio::test时,记得在多线程上下文中运行,否则测试将在scheduler.add()上挂起。

例如:


#[cfg(test)]
mod test {
    use tokio_cron_scheduler::{Job, JobScheduler};
    use tracing::{info, Level};
    use tracing_subscriber::FmtSubscriber;

    // 需要多线程来测试,否则会在scheduler.add()上挂起
    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    // #[tokio::test]
    async fn test_schedule() {
        let subscriber = FmtSubscriber::builder()
            .with_max_level(Level::TRACE)
            .finish();
        tracing::subscriber::set_global_default(subscriber)
            .expect("设置默认订阅者失败");

        info!("创建调度器");
        let scheduler = JobScheduler::new().await.unwrap();
        info!("添加任务");
        scheduler
            .add(
                Job::new_async("*/1  * * * * *", |_, _| {
                    Box::pin(async {
                        info!("每秒运行");
                    })
                })
                    .unwrap(),
            )
            .await
            .expect("应该能够添加任务");

        scheduler.start().await.unwrap();

        tokio::time::sleep(core::time::Duration::from_secs(20)).await;
    }
}

示例

simple

运行基于内存哈希映射的存储

 cargo run --example simple --features="tracing-subscriber"

postgres

首先需要一个运行中的PostgreSQL实例:

docker run --rm -it -p 5432:5432 -e POSTGRES_USER="postgres" -e POSTGRES_PASSWORD="" -e POSTGRES_HOST_AUTH_METHOD="trust" postgres:14.1

然后运行示例:

POSTGRES_INIT_METADATA=true POSTGRES_INIT_NOTIFICATIONS=true cargo run --example postgres --features="postgres_storage tracing-subscriber"

nats

首先需要一个启用了Jetstream的运行中的Nats实例:

docker run --rm -it -p 4222:4222 -p 6222:6222 -p 7222:7222 -p 8222:8222 nats -js -DV

然后运行示例:

cargo run --example nats --features="nats_storage tracing-subscriber"

设计

任务活动

任务活动

创建任务

创建任务

创建通知

创建通知

删除任务

删除任务

删除通知

删除通知

项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

白日梦AI

白日梦AI提供专注于AI视频生成的多样化功能,包括文生视频、动态画面和形象生成等,帮助用户快速上手,创造专业级内容。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

讯飞绘镜

讯飞绘镜是一个支持从创意到完整视频创作的智能平台,用户可以快速生成视频素材并创作独特的音乐视频和故事。平台提供多样化的主题和精选作品,帮助用户探索创意灵感。

Project Cover

讯飞文书

讯飞文书依托讯飞星火大模型,为文书写作者提供从素材筹备到稿件撰写及审稿的全程支持。通过录音智记和以稿写稿等功能,满足事务性工作的高频需求,帮助撰稿人节省精力,提高效率,优化工作与生活。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号