文档:https://taskiq-python.github.io/
什么是taskiq?
Taskiq是一个用于Python的异步分布式任务队列。 这个项目受到了像Celery和Dramatiq这样的大型项目的启发。 但taskiq可以发送和运行同步和异步函数,并与流行的异步框架(如FastAPI和AioHTTP)集成。
此外,我们使用PEP-612来提供最好的自动建议。所有代码都有类型提示。
安装
可以使用pip安装此项目:
pip install taskiq
或者直接从git安装:
pip install git+https://github.com/taskiq-python/taskiq
使用方法
首先,你需要创建一个代理。代理是一个可以使用分布式队列与工作进程通信的对象。
我们为不同的队列后端提供了不同的代理。例如,我们有NATS、Redis、RabbitMQ、Kafka等代理。选择适合你的并创建一个实例。
from taskiq_nats import JetStreamBroker
broker = JetStreamBroker("nats://localhost:4222", queue="my_queue")
声明任务就像声明一个函数一样简单。只需在你的函数上添加一个装饰器,就大功告成了。
import asyncio
from taskiq_nats import JetStreamBroker
broker = JetStreamBroker("nats://localhost:4222", queue="my_queue2")
@broker.task
async def my_task(a: int, b: int) -> None:
print("AB", a + b)
async def main():
await broker.startup()
await my_task.kiq(1, 2)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
消息将被发送到代理,然后发送到工作进程。工作进程将执行该函数。要启动工作进程,只需运行以下命令:
taskiq worker path.to.the.module:broker
其中path.to.the.module
是定义代理的模块路径,broker
是代理变量的名称。
如果你在不同的模块中有任务,可以通过传递--fs-discover
标志来让taskiq自动导入它们:
taskiq worker path.to.the.module:broker --fs-discover
它将导入当前目录及所有子目录中名为tasks.py
的所有模块。
此外,我们支持工作进程的热重载。要启用它,只需传递--reload
标志。当代码发生变化时,它将重新加载工作进程(要使用它,请安装带有reload额外功能的taskiq。例如pip install taskiq[reload]
)。
我们还有与流行的异步框架的酷炫集成。例如,我们有与FastAPI或AioHTTP的集成。你可以使用它在任务中重用Web应用程序的依赖项。
在我们的文档中阅读所有功能:https://taskiq-python.github.io/
本地开发
代码检查
我们使用pre-commit在本地进行代码检查。
克隆此项目后,请安装pre-commit。它有助于在提交更改之前修复文件。
pre-commit install
测试
Pytest可以在没有任何额外操作或选项的情况下运行。
pytest
文档
要在本地运行文档,你需要安装yarn。
首先,你需要安装依赖项。
yarn install
之后,你可以通过运行以下命令来设置文档服务器:
yarn docs:dev