Go语言:分布式任务队列Asynq

Go生态里做异步任务队列,之前我一直是自己写goroutine + channel来处理。能跑但不优雅——重试、持久化、监控全得自己搞。后来发现了Asynq这个库,基于Redis的分布式任务队列,API设计很Go风格,用起来很舒服。

Asynq简介

Asynq是一个Go语言的异步任务处理库,核心特性:

  • 基于Redis,部署简单
  • 支持任务重试、延迟任务、定时任务
  • 优先级队列
  • 任务去重
  • 自带Web监控面板(Asynqmon)
  • API设计简洁

和Python的Celery定位类似,但更轻量。

基本使用

安装:

go get github.com/hibiken/asynq

定义任务

// tasks/email.go
package tasks

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/hibiken/asynq"
)

// 任务类型常量
const (
    TypeEmailDelivery = "email:deliver"
    TypeReportGenerate = "report:generate"
)

// 任务Payload
type EmailDeliveryPayload struct {
    UserID     int    `json:"user_id"`
    TemplateID string `json:"template_id"`
    To         string `json:"to"`
    Subject    string `json:"subject"`
}

// 创建任务
func NewEmailDeliveryTask(payload EmailDeliveryPayload) (*asynq.Task, error) {
    data, err := json.Marshal(payload)
    if err != nil {
        return nil, fmt.Errorf("marshal payload: %w", err)
    }
    return asynq.NewTask(
        TypeEmailDelivery,
        data,
        asynq.MaxRetry(3),              // 最多重试3次
        asynq.Timeout(30*time.Second),   // 单次执行超时30s
        asynq.Queue("default"),          // 指定队列
    ), nil
}

// 处理任务
func HandleEmailDelivery(ctx context.Context, t *asynq.Task) error {
    var payload EmailDeliveryPayload
    if err := json.Unmarshal(t.Payload(), &payload); err != nil {
        return fmt.Errorf("unmarshal: %w", err)
    }

    // 实际发送邮件的逻辑
    fmt.Printf("Sending email to %s, template: %s
", payload.To, payload.TemplateID)

    // 返回error会触发重试
    return nil
}

客户端发送任务

// cmd/client/main.go
package main

import (
    "log"
    "github.com/hibiken/asynq"
    "myapp/tasks"
)

func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()

    // 立即执行
    task, _ := tasks.NewEmailDeliveryTask(tasks.EmailDeliveryPayload{
        UserID:     42,
        TemplateID: "welcome",
        To:         "user@example.com",
        Subject:    "Welcome!",
    })
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Task enqueued: id=%s queue=%s", info.ID, info.Queue)

    // 延迟执行(5分钟后)
    info, _ = client.Enqueue(task, asynq.ProcessIn(5*time.Minute))

    // 指定执行时间
    info, _ = client.Enqueue(task, asynq.ProcessAt(time.Now().Add(24*time.Hour)))
}

Worker处理任务

// cmd/worker/main.go
package main

import (
    "log"
    "github.com/hibiken/asynq"
    "myapp/tasks"
)

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{
            Concurrency: 10,  // 并发Worker数
            Queues: map[string]int{
                "critical": 6,   // 权重6
                "default":  3,   // 权重3
                "low":      1,   // 权重1
            },
            RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
                // 指数退避: 10s, 20s, 40s...
                return time.Duration(math.Pow(2, float64(n))) * 10 * time.Second
            },
        },
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDelivery)
    mux.HandleFunc(tasks.TypeReportGenerate, tasks.HandleReportGenerate)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

定时任务

Asynq自带Scheduler,支持cron表达式:

// cmd/scheduler/main.go
func main() {
    scheduler := asynq.NewScheduler(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        &asynq.SchedulerOpts{Location: time.Local},
    )

    // 每天早上8点生成日报
    task, _ := tasks.NewReportGenerateTask("daily")
    scheduler.Register("0 8 * * *", task)

    // 每小时清理过期数据
    cleanTask, _ := tasks.NewCleanupTask()
    scheduler.Register("@every 1h", cleanTask)

    if err := scheduler.Run(); err != nil {
        log.Fatal(err)
    }
}

重试策略

Asynq的重试很灵活:

// 任务级别设置
task := asynq.NewTask("email:deliver", payload,
    asynq.MaxRetry(5),           // 最多重试5次
    asynq.Timeout(30*time.Second),
)

// 全局重试延迟策略
asynq.Config{
    RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
        // n是第几次重试
        return time.Duration(n) * time.Minute
    },
}

任务处理函数中,如果返回的error被包装为 asynq.SkipRetry,则不再重试直接进入失败队列。这在遇到不可恢复错误时很有用:

func HandleTask(ctx context.Context, t *asynq.Task) error {
    // ...
    if isUnrecoverableError(err) {
        return fmt.Errorf("permanent failure: %w", asynq.SkipRetry)
    }
    return err  // 正常重试
}

监控Dashboard

Asynq自带一个Web监控面板 Asynqmon:

go install github.com/hibiken/asynqmon/cmd/asynqmon@latest
asynqmon --port 8080 --redis-addr localhost:6379

Dashboard上可以看到:

  • 各队列的任务数量
  • 活跃/等待/完成/失败任务统计
  • 任务详情和Payload
  • 手动重试失败任务

与Celery的对比

特性 Asynq Celery
语言 Go Python
依赖 只需Redis Redis/RabbitMQ
部署 单二进制 pip + 配置
并发模型 goroutine prefork/gevent
监控 Asynqmon Flower
生态 较新 成熟

对于Go项目来说,Asynq是目前最好的选择。API设计符合Go的风格,不会有不自然的感觉。