tokio-cron-scheduler
在异步 tokio 环境中使用类似 cron 的调度。 还可以立即调度任务或以固定时间间隔重复执行任务。 任务数据可以选择使用 PostgreSQL 或 Nats 进行持久化。
灵感来自 https://github.com/lholden/job_scheduler
使用方法
更多详细信息请参阅文档。
请确保在 Cargo.toml
中添加 job_scheduler crate:
[dependencies]
tokio-cron-scheduler = "*"
使用 cron 库的 Schedule
类型的 FromStr
实现来为作业创建调度。
调度格式如下:
秒 分 时 日 月 星期 年
* * * * * * *
时间以 UTC
指定,而非您的本地时区。请注意,年份可以省略。如果您想使用您的时区,请在作业创建调用后附加 _tz
(例如 Job::new_async 对比 Job::new_async_tz)。
以逗号分隔的值(如 5,8,10
)表示多个时间值。例如,调度 0 2,14,26 * * * *
将在每小时的第 2、14 和 26 分钟执行。
范围可以用破折号指定。调度 0 0 * 5-10 * *
将每小时执行一次,但仅在每月的 5 日至 10 日执行。
星期几可以用缩写或全名指定。调度 0 0 6 * * Sun,Sat
将在周日和周六的早上 6 点执行。
对于每个作业,您可以在作业开始、停止和移除时收到通知。由于这些通知是使用 tokio::spawn 调度的,如果任务快速完成,则无法保证这些通知的顺序。
一个简单的使用示例:
use std::time::Duration;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};
#[tokio::main]
async fn main() -> Result<(), JobSchedulerError> {
let mut sched = JobScheduler::new().await?;
// 添加基本 cron 作业
sched.add(
Job::new("1/10 * * * * *", |_uuid, _l| {
println!("我每 10 秒运行一次");
})?
).await?;
// 添加异步作业
sched.add(
Job::new_async("1/7 * * * * *", |uuid, mut l| {
Box::pin(async move {
println!("我每 7 秒异步运行一次");
// 查询此作业的下一次执行时间
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => println!("7 秒作业的下一次运行时间是 {:?}", ts),
_ => println!("无法获取 7 秒作业的下一次运行时间"),
}
})
})?
).await?;
// 添加指定持续时间的一次性作业
sched.add(
Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
println!("我只运行一次");
})?
).await?;
// 创建指定持续时间的重复作业,使其可变以便之后编辑
let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
println!("我每 8 秒重复运行一次");
})?;
// 添加在作业开始/停止等时执行的操作
jj.on_start_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("作业 {:?} 已启动,通知 {:?} 已运行({:?})", job_id, notification_id, type_of_notification);
})
})).await?;
jj.on_stop_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("作业 {:?} 已完成,通知 {:?} 已运行({:?})", job_id, notification_id, type_of_notification);
})
})).await?;
jj.on_removed_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("作业 {:?} 已移除,通知 {:?} 已运行({:?})", job_id, notification_id, type_of_notification);
})
})).await?;
sched.add(jj).await?;
// 必须启用 'signal' 功能
sched.shutdown_on_ctrl_c();
// 添加在关闭期间/之后运行的代码
sched.set_shutdown_handler(Box::new(|| {
Box::pin(async move {
println!("关闭完成");
})
}));
// 启动调度器
sched.start().await?;
// 等待作业运行
tokio::time::sleep(Duration::from_secs(100)).await;
Ok(())
}
时区变更
您可以使用 JobBuilder
API 创建使用特定时区的作业。
chrono-tz 不包含在依赖项中,因此如果您想轻松创建 Timezone
结构,需要将其添加到您的 Cargo.toml 中。
let job = JobBuilder::new()
.with_timezone(chrono_tz::Africa::Johannesburg)
.with_cron_job_type()
.with_schedule("*/2 * * * * *")
.unwrap()
.with_run_async(Box::new( | uuid, mut l| {
Box::pin(async move {
info ! ("JHB 每 2 秒异步运行一次,id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info !("JHB 2 秒作业的下一次运行时间是 {:?}", ts),
_ => warn !("无法获取 2 秒作业的下一次运行时间"),
}
})
}))
.build()
.unwrap();
类似的库
- job_scheduler 启发这个 crate 的项目
- cron 我们使用的 cron 表达式解析器
- schedule-rs 是一个类似的 Rust 库,实现了自己的 cron 表达式解析器
许可证
TokioCronScheduler 使用以下任一许可证
- Apache许可证2.0版(LICENSE-APACHE或 http://www.apache.org/licenses/LICENSE-2.0)
- MIT许可证(LICENSE-MIT或 http://opensource.org/licenses/MIT)
自定义存储
MetadataStore和NotificationStore特质可以被实现并在JobScheduler中使用。
默认提供了基于易失性哈希映射的SimpleMetadataStore和SimpleNotificationStore版本。使用Nats的持久版本由NatsMetadataStore和NatsNotificationStore提供。
贡献
除非您另有明确声明,否则您有意提交以包含在作品中的任何贡献,按照Apache-2.0许可证的定义,均应按上述方式双重许可,无任何附加条款或条件。
更多信息请参阅CONTRIBUTING文件。
特性
has_bytes
自0.7版本起
使Prost生成的数据结构可被需要获取数据结构字节的存储使用。Nats和Postgres存储依赖于启用此特性。
postgres_storage
自0.6版本起
添加Postgres元数据存储和通知存储(PostgresMetadataStore, PostgresNotificationStore)。使用Postgres数据库存储元数据和通知数据。
postgres_native_tls
自0.6版本起
使用postgres-native-tls crate作为PostgreSQL连接的TLS提供程序。
postgres_openssl
自0.6版本起
使用postgres-openssl crate作为PostgreSQL连接的TLS提供程序。
nats_storage
自0.6版本起
添加Nats元数据存储和通知存储(NatsMetadataStore, NatsNotificationStore)。使用Nats系统作为存储元数据和通知的方式。
参见Nats文档
signal
自0.5版本起
为调度器添加shutdown_on_signal
和shutdown_on_ctrl_c
。
当接收到信号时,两者都会关闭系统(停止调度器并移除所有任务)。
由于这利用了Tokio的信号处理,因此仅在Unix系统上可用。
编写测试
在进行tokio::test时,记得在多线程上下文中运行,否则测试将在scheduler.add()
上挂起。
例如:
#[cfg(test)]
mod test {
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;
// 需要多线程来测试,否则会在scheduler.add()上挂起
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// #[tokio::test]
async fn test_schedule() {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("设置默认订阅者失败");
info!("创建调度器");
let scheduler = JobScheduler::new().await.unwrap();
info!("添加任务");
scheduler
.add(
Job::new_async("*/1 * * * * *", |_, _| {
Box::pin(async {
info!("每秒运行");
})
})
.unwrap(),
)
.await
.expect("应该能够添加任务");
scheduler.start().await.unwrap();
tokio::time::sleep(core::time::Duration::from_secs(20)).await;
}
}
示例
simple
运行基于内存哈希映射的存储
cargo run --example simple --features="tracing-subscriber"
postgres
首先需要一个运行中的PostgreSQL实例:
docker run --rm -it -p 5432:5432 -e POSTGRES_USER="postgres" -e POSTGRES_PASSWORD="" -e POSTGRES_HOST_AUTH_METHOD="trust" postgres:14.1
然后运行示例:
POSTGRES_INIT_METADATA=true POSTGRES_INIT_NOTIFICATIONS=true cargo run --example postgres --features="postgres_storage tracing-subscriber"
nats
首先需要一个启用了Jetstream的运行中的Nats实例:
docker run --rm -it -p 4222:4222 -p 6222:6222 -p 7222:7222 -p 8222:8222 nats -js -DV
然后运行示例:
cargo run --example nats --features="nats_storage tracing-subscriber"