Go生态里做异步任务队列,之前我一直是自己写goroutine + channel来处理。能跑但不优雅——重试、持久化、监控全得自己搞。后来发现了Asynq这个库,基于Redis的分布式任务队列,API设计很Go风格,用起来很舒服。
Asynq简介
Asynq是一个Go语言的异步任务处理库,核心特性:
- 基于Redis,部署简单
- 支持任务重试、延迟任务、定时任务
- 优先级队列
- 任务去重
- 自带Web监控面板(Asynqmon)
- API设计简洁
和Python的Celery定位类似,但更轻量。
基本使用
安装:
go get github.com/hibiken/asynq
定义任务
// tasks/email.go
package tasks
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
)
// 任务类型常量
const (
TypeEmailDelivery = "email:deliver"
TypeReportGenerate = "report:generate"
)
// 任务Payload
type EmailDeliveryPayload struct {
UserID int `json:"user_id"`
TemplateID string `json:"template_id"`
To string `json:"to"`
Subject string `json:"subject"`
}
// 创建任务
func NewEmailDeliveryTask(payload EmailDeliveryPayload) (*asynq.Task, error) {
data, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("marshal payload: %w", err)
}
return asynq.NewTask(
TypeEmailDelivery,
data,
asynq.MaxRetry(3), // 最多重试3次
asynq.Timeout(30*time.Second), // 单次执行超时30s
asynq.Queue("default"), // 指定队列
), nil
}
// 处理任务
func HandleEmailDelivery(ctx context.Context, t *asynq.Task) error {
var payload EmailDeliveryPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
// 实际发送邮件的逻辑
fmt.Printf("Sending email to %s, template: %s
", payload.To, payload.TemplateID)
// 返回error会触发重试
return nil
}
客户端发送任务
// cmd/client/main.go
package main
import (
"log"
"github.com/hibiken/asynq"
"myapp/tasks"
)
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
defer client.Close()
// 立即执行
task, _ := tasks.NewEmailDeliveryTask(tasks.EmailDeliveryPayload{
UserID: 42,
TemplateID: "welcome",
To: "user@example.com",
Subject: "Welcome!",
})
info, err := client.Enqueue(task)
if err != nil {
log.Fatal(err)
}
log.Printf("Task enqueued: id=%s queue=%s", info.ID, info.Queue)
// 延迟执行(5分钟后)
info, _ = client.Enqueue(task, asynq.ProcessIn(5*time.Minute))
// 指定执行时间
info, _ = client.Enqueue(task, asynq.ProcessAt(time.Now().Add(24*time.Hour)))
}
Worker处理任务
// cmd/worker/main.go
package main
import (
"log"
"github.com/hibiken/asynq"
"myapp/tasks"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{
Concurrency: 10, // 并发Worker数
Queues: map[string]int{
"critical": 6, // 权重6
"default": 3, // 权重3
"low": 1, // 权重1
},
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
// 指数退避: 10s, 20s, 40s...
return time.Duration(math.Pow(2, float64(n))) * 10 * time.Second
},
},
)
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDelivery)
mux.HandleFunc(tasks.TypeReportGenerate, tasks.HandleReportGenerate)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
定时任务
Asynq自带Scheduler,支持cron表达式:
// cmd/scheduler/main.go
func main() {
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{Addr: "localhost:6379"},
&asynq.SchedulerOpts{Location: time.Local},
)
// 每天早上8点生成日报
task, _ := tasks.NewReportGenerateTask("daily")
scheduler.Register("0 8 * * *", task)
// 每小时清理过期数据
cleanTask, _ := tasks.NewCleanupTask()
scheduler.Register("@every 1h", cleanTask)
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}
重试策略
Asynq的重试很灵活:
// 任务级别设置
task := asynq.NewTask("email:deliver", payload,
asynq.MaxRetry(5), // 最多重试5次
asynq.Timeout(30*time.Second),
)
// 全局重试延迟策略
asynq.Config{
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
// n是第几次重试
return time.Duration(n) * time.Minute
},
}
任务处理函数中,如果返回的error被包装为 asynq.SkipRetry,则不再重试直接进入失败队列。这在遇到不可恢复错误时很有用:
func HandleTask(ctx context.Context, t *asynq.Task) error {
// ...
if isUnrecoverableError(err) {
return fmt.Errorf("permanent failure: %w", asynq.SkipRetry)
}
return err // 正常重试
}
监控Dashboard
Asynq自带一个Web监控面板 Asynqmon:
go install github.com/hibiken/asynqmon/cmd/asynqmon@latest
asynqmon --port 8080 --redis-addr localhost:6379
Dashboard上可以看到:
- 各队列的任务数量
- 活跃/等待/完成/失败任务统计
- 任务详情和Payload
- 手动重试失败任务
与Celery的对比
| 特性 | Asynq | Celery |
|---|---|---|
| 语言 | Go | Python |
| 依赖 | 只需Redis | Redis/RabbitMQ |
| 部署 | 单二进制 | pip + 配置 |
| 并发模型 | goroutine | prefork/gevent |
| 监控 | Asynqmon | Flower |
| 生态 | 较新 | 成熟 |
对于Go项目来说,Asynq是目前最好的选择。API设计符合Go的风格,不会有不自然的感觉。