日历系统开发(八):提醒与通知系统实现

日历事件提醒是整个系统中与用户感知最直接相关的模块。这篇记录我在提醒触发机制选型、延迟任务实现和多渠道推送上的完整方案。

提醒模型设计

一个事件可以关联多条提醒规则,用户可配置提前 5 分钟、15 分钟、1 小时等:

type Reminder struct {
    ID        int64     `json:"id"`
    EventID   int64     `json:"event_id"`
    UserID    int64     `json:"user_id"`
    Method    string    `json:"method"`    // email / push / websocket
    MinBefore int       `json:"min_before"` // 提前分钟数
    Fired     bool      `json:"fired"`
    FireAt    time.Time `json:"fire_at"`    // 预计触发时间
}

fire_at 在事件创建或修改时预计算:fire_at = event.start_time - min_before * minute。事件改时间时需要级联更新所有关联提醒的 fire_at

触发机制选型:定时扫描 vs 延迟队列

最初我用了最简单的方案——每分钟跑一次定时任务,查询 fire_at <= now AND fired = false

// 简单但有延迟
func (s *ReminderService) ScanLoop(ctx context.Context) {
    ticker := time.NewTicker(1 * time.Minute)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            reminders, _ := s.repo.FindPending(ctx, time.Now())
            for _, r := range reminders {
                s.fire(ctx, r)
            }
        }
    }
}

问题是平均有 30 秒延迟,对于「提前 5 分钟提醒」这种场景,30 秒的误差还行,但体验不够好。最终换成了 Redis 延迟队列。

Redis + Lua 延迟任务

利用 Redis Sorted Set,以 fire_at 的 Unix 时间戳作为 score:

func (s *ReminderService) Schedule(ctx context.Context, r *Reminder) error {
    payload, _ := json.Marshal(r)
    return s.rdb.ZAdd(ctx, "reminder:pending", &redis.Z{
        Score:  float64(r.FireAt.Unix()),
        Member: string(payload),
    }).Err()
}

消费端用 Lua 脚本原子地取出到期任务:

-- fetch_due_reminders.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local batch = tonumber(ARGV[2]) or 100

local items = redis.call('ZRANGEBYSCORE', key, '-inf', now, 'LIMIT', 0, batch)
if #items > 0 then
    redis.call('ZREM', key, unpack(items))
end
return items

Go 端每秒调用一次这个脚本:

func (s *ReminderService) ConsumeLoop(ctx context.Context) {
    script := redis.NewScript(luaScript)
    for {
        select {
        case <-ctx.Done():
            return
        default:
            now := time.Now().Unix()
            results, err := script.Run(ctx, s.rdb, []string{"reminder:pending"}, now, 100).StringSlice()
            if err != nil && err != redis.Nil {
                log.Error("consume error", "err", err)
                time.Sleep(time.Second)
                continue
            }
            for _, raw := range results {
                var r Reminder
                json.Unmarshal([]byte(raw), &r)
                go s.fire(ctx, r)
            }
            time.Sleep(time.Second)
        }
    }
}

改成每秒轮询后,提醒延迟降到 1 秒以内,且 Lua 脚本保证了多实例部署时不会重复消费。

多渠道推送

fire 方法根据 Method 分发到不同渠道:

func (s *ReminderService) fire(ctx context.Context, r Reminder) {
    switch r.Method {
    case "email":
        s.emailSender.Send(ctx, r.UserID, buildEmailBody(r))
    case "push":
        s.pushClient.Send(ctx, r.UserID, buildPushPayload(r))
    case "websocket":
        s.wsHub.SendToUser(r.UserID, buildWSMessage(r))
    }
    // 标记已触发
    s.repo.MarkFired(ctx, r.ID)
}

三个渠道的实现:

  • 邮件:用公司内部 SMTP 网关,异步发送,失败重试 3 次
  • APP 推送:对接 Firebase Cloud Messaging,需要客户端注册 device token
  • WebSocket:服务端维护 map[userID][]*websocket.Conn,实时推到浏览器端

WebSocket 的 Hub 实现:

type WSHub struct {
    mu    sync.RWMutex
    conns map[int64][]*websocket.Conn
}

func (h *WSHub) SendToUser(userID int64, msg []byte) {
    h.mu.RLock()
    defer h.mu.RUnlock()
    for _, conn := range h.conns[userID] {
        conn.WriteMessage(websocket.TextMessage, msg)
    }
}

重复事件的提醒处理

重复事件比较特殊——不能一次性把未来所有实例的提醒都塞进队列。推荐的做法是只预调度未来 7 天的提醒,然后每天凌晨跑一次「滚动补充」任务:

func (s *ReminderService) RefillRecurring(ctx context.Context) {
    horizon := time.Now().Add(7 * 24 * time.Hour)
    events, _ := s.eventRepo.FindRecurringWithReminders(ctx)
    for _, e := range events {
        instances := e.ExpandBetween(time.Now(), horizon)
        for _, inst := range instances {
            for _, rule := range e.Reminders {
                fireAt := inst.Start.Add(-time.Duration(rule.MinBefore) * time.Minute)
                if fireAt.After(time.Now()) {
                    s.Schedule(ctx, &Reminder{
                        EventID:   e.ID,
                        UserID:    e.UserID,
                        Method:    rule.Method,
                        MinBefore: rule.MinBefore,
                        FireAt:    fireAt,
                    })
                }
            }
        }
    }
}

小结

提醒系统看起来不复杂,但细节很多:时区处理(用户设置的提醒基于事件所在时区)、重复事件的滚动调度、多实例下的消费幂等。Redis 延迟队列方案在我们的规模下表现很好,如果将来量级上去了,可以考虑换成 RocketMQ 的延迟消息。