Go语言简单、可靠、高效的分布式任务队列
Asynq是一个Go语言库,用于队列任务并通过工作者异步处理。它基于Redis,设计上既可扩展又易于上手。
Asynq的工作原理概述:
- 客户端将任务放入队列
- 服务器从队列中提取任务,并为每个任务启动一个工作者goroutine
- 多个工作者并发处理任务
任务队列用作在多台机器间分配工作的机制。系统可以包含多个工作者服务器和代理,从而实现高可用性和水平扩展。
用例示例
特性
- 保证任务至少执行一次
- 任务调度
- 失败任务的重试
- 工作者崩溃时自动恢复任务
- 加权优先队列
- 严格优先队列
- 由于Redis写入速度快,添加任务的延迟低
- 使用唯一选项实现任务去重
- 允许每个任务设置超时和截止时间
- 允许聚合任务组以批处理多个连续操作
- 灵活的处理程序接口,支持中间件
- 能够暂停队列以停止处理队列中的任务
- 周期性任务
- 支持Redis集群实现自动分片和高可用性
- 支持Redis哨兵实现高可用性
- 与Prometheus集成以收集和可视化队列指标
- Web UI用于检查和远程控制队列和任务
- 命令行工具用于检查和远程控制队列和任务
稳定性和兼容性
状态:该库目前正在大力开发中,API经常发生重大变化。
☝️ 重要提示:当前主版本为零(
v0.x.x
),以适应快速开发和快速迭代,同时获取用户的早期反馈(欢迎对API提供反馈!)。在v1.0.0
发布之前,公共API可能会在没有主要版本更新的情况下发生变化。
赞助
如果您在生产环境中使用此软件包,请考虑赞助该项目以表示您的支持!
快速入门
确保您已安装Go(下载)。支持最新的两个Go版本(参见https://go.dev/dl)。
通过创建一个文件夹并在文件夹内运行go mod init github.com/your/repo
(了解更多)来初始化您的项目。然后使用go get
命令安装Asynq库:
go get -u github.com/hibiken/asynq
确保您在本地或从Docker容器中运行Redis服务器。需要4.0
或更高版本。
接下来,编写一个封装任务创建和任务处理的包。
package tasks
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
// 任务类型列表
const (
TypeEmailDelivery = "email:deliver"
TypeImageResize = "image:resize"
)
type EmailDeliveryPayload struct {
UserID int
TemplateID string
}
type ImageResizePayload struct {
SourceURL string
}
//----------------------------------------------
// 编写一个NewXXXTask函数来创建任务。
// 任务由类型和负载组成。
//----------------------------------------------
func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeEmailDelivery, payload), nil
}
func NewImageResizeTask(src string) (*asynq.Task, error) {
payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
if err != nil {
return nil, err
}
// 可以向NewTask传递任务选项,这些选项可以在入队时被覆盖。
return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil
}
//---------------------------------------------------------------
// 编写一个HandleXXXTask函数来处理输入任务。
// 请注意,它满足asynq.HandlerFunc接口。
//
// 处理程序不必是函数。您可以定义一个满足asynq.Handler接口的类型。
// 请参见下面的示例。
//---------------------------------------------------------------
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
var p EmailDeliveryPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal失败: %v: %w", err, asynq.SkipRetry)
}
log.Printf("正在发送电子邮件给用户: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
// 电子邮件发送代码 ...
return nil
}
// ImageProcessor实现asynq.Handler接口。
type ImageProcessor struct {
// ... 结构体的字段
}
func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
var p ImageResizePayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal失败: %v: %w", err, asynq.SkipRetry)
}
log.Printf("正在调整图像大小: src=%s", p.SourceURL)
// 图像调整大小代码 ...
return nil
}
func NewImageProcessor() *ImageProcessor {
return &ImageProcessor{}
}
在您的应用程序代码中,导入上述包并使用Client
将任务放入队列。
package main
import (
"log"
"time"
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
// ------------------------------------------------------
// 示例1:将任务入队以立即处理。
// 使用(*Client).Enqueue方法。
// ------------------------------------------------------
task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
if err != nil {
log.Fatalf("无法创建任务: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("无法将任务入队: %v", err)
}
log.Printf("已入队任务: id=%s queue=%s", info.ID, info.Queue)
// ------------------------------------------------------------
// 示例2:安排任务在将来处理。
// 使用ProcessIn或ProcessAt选项。
// ------------------------------------------------------------
info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatalf("无法安排任务: %v", err)
}
log.Printf("已入队任务: id=%s queue=%s", info.ID, info.Queue)
// ----------------------------------------------------------------------------
// 示例3:设置其他选项以调整任务处理行为。
// 选项包括MaxRetry、Queue、Timeout、Deadline、Unique等。
// ----------------------------------------------------------------------------
task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg") 如果err不为nil { log.Fatalf("无法创建任务: %v", err) } info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute)) 如果err不为nil { log.Fatalf("无法将任务加入队列: %v", err) } log.Printf("已将任务加入队列: id=%s queue=%s", info.ID, info.Queue) }
接下来,启动一个工作服务器在后台处理这些任务。要启动后台工作者,请使用[`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server)并提供您的[`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler)来处理任务。
您可以选择使用[`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux)创建处理程序,就像使用[`net/http`](https://golang.org/pkg/net/http/)处理程序一样。
```go
package main
import (
"log"
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
// 指定要使用的并发工作者数量
Concurrency: 10,
// 可选择指定具有不同优先级的多个队列
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
// 查看godoc了解其他配置选项
},
)
// mux将类型映射到处理程序
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
// ...注册其他处理程序...
if err := srv.Run(mux); err != nil {
log.Fatalf("无法运行服务器: %v", err)
}
}
有关该库更详细的演练,请参阅我们的入门指南。
要了解更多关于asynq
功能和API的信息,请参阅包的godoc。
Web界面
Asynqmon是一个基于Web的工具,用于监控和管理Asynq队列和任务。
以下是Web界面的几个截图:
队列视图
任务视图
指标视图
设置和自适应暗模式
有关如何使用该工具的详细信息,请参阅工具的README。
命令行工具
Asynq附带一个命令行工具,用于检查队列和任务的状态。
要安装CLI工具,请运行以下命令:
go install github.com/hibiken/asynq/tools/asynq@latest
以下是运行asynq dash
命令的示例:
有关如何使用该工具的详细信息,请参阅工具的README。
贡献
我们对社区做出的任何贡献(GitHub问题/PR、Gitter频道上的反馈等)都持开放态度并表示感谢。
在贡献之前,请参阅贡献指南。
许可证
版权所有 (c) 2019-现在 Ken Hibino和贡献者。Asynq
是根据MIT许可证授权的免费开源软件。官方logo由Vic Shóstak创建,并根据知识共享许可证(CC0 1.0通用)分发。