日历事件提醒是整个系统中与用户感知最直接相关的模块。这篇记录我在提醒触发机制选型、延迟任务实现和多渠道推送上的完整方案。
提醒模型设计
一个事件可以关联多条提醒规则,用户可配置提前 5 分钟、15 分钟、1 小时等:
type Reminder struct {
ID int64 `json:"id"`
EventID int64 `json:"event_id"`
UserID int64 `json:"user_id"`
Method string `json:"method"` // email / push / websocket
MinBefore int `json:"min_before"` // 提前分钟数
Fired bool `json:"fired"`
FireAt time.Time `json:"fire_at"` // 预计触发时间
}
fire_at 在事件创建或修改时预计算:fire_at = event.start_time - min_before * minute。事件改时间时需要级联更新所有关联提醒的 fire_at。
触发机制选型:定时扫描 vs 延迟队列
最初我用了最简单的方案——每分钟跑一次定时任务,查询 fire_at <= now AND fired = false:
// 简单但有延迟
func (s *ReminderService) ScanLoop(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
reminders, _ := s.repo.FindPending(ctx, time.Now())
for _, r := range reminders {
s.fire(ctx, r)
}
}
}
}
问题是平均有 30 秒延迟,对于「提前 5 分钟提醒」这种场景,30 秒的误差还行,但体验不够好。最终换成了 Redis 延迟队列。
Redis + Lua 延迟任务
利用 Redis Sorted Set,以 fire_at 的 Unix 时间戳作为 score:
func (s *ReminderService) Schedule(ctx context.Context, r *Reminder) error {
payload, _ := json.Marshal(r)
return s.rdb.ZAdd(ctx, "reminder:pending", &redis.Z{
Score: float64(r.FireAt.Unix()),
Member: string(payload),
}).Err()
}
消费端用 Lua 脚本原子地取出到期任务:
-- fetch_due_reminders.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local batch = tonumber(ARGV[2]) or 100
local items = redis.call('ZRANGEBYSCORE', key, '-inf', now, 'LIMIT', 0, batch)
if #items > 0 then
redis.call('ZREM', key, unpack(items))
end
return items
Go 端每秒调用一次这个脚本:
func (s *ReminderService) ConsumeLoop(ctx context.Context) {
script := redis.NewScript(luaScript)
for {
select {
case <-ctx.Done():
return
default:
now := time.Now().Unix()
results, err := script.Run(ctx, s.rdb, []string{"reminder:pending"}, now, 100).StringSlice()
if err != nil && err != redis.Nil {
log.Error("consume error", "err", err)
time.Sleep(time.Second)
continue
}
for _, raw := range results {
var r Reminder
json.Unmarshal([]byte(raw), &r)
go s.fire(ctx, r)
}
time.Sleep(time.Second)
}
}
}
改成每秒轮询后,提醒延迟降到 1 秒以内,且 Lua 脚本保证了多实例部署时不会重复消费。
多渠道推送
fire 方法根据 Method 分发到不同渠道:
func (s *ReminderService) fire(ctx context.Context, r Reminder) {
switch r.Method {
case "email":
s.emailSender.Send(ctx, r.UserID, buildEmailBody(r))
case "push":
s.pushClient.Send(ctx, r.UserID, buildPushPayload(r))
case "websocket":
s.wsHub.SendToUser(r.UserID, buildWSMessage(r))
}
// 标记已触发
s.repo.MarkFired(ctx, r.ID)
}
三个渠道的实现:
- 邮件:用公司内部 SMTP 网关,异步发送,失败重试 3 次
- APP 推送:对接 Firebase Cloud Messaging,需要客户端注册 device token
- WebSocket:服务端维护
map[userID][]*websocket.Conn,实时推到浏览器端
WebSocket 的 Hub 实现:
type WSHub struct {
mu sync.RWMutex
conns map[int64][]*websocket.Conn
}
func (h *WSHub) SendToUser(userID int64, msg []byte) {
h.mu.RLock()
defer h.mu.RUnlock()
for _, conn := range h.conns[userID] {
conn.WriteMessage(websocket.TextMessage, msg)
}
}
重复事件的提醒处理
重复事件比较特殊——不能一次性把未来所有实例的提醒都塞进队列。推荐的做法是只预调度未来 7 天的提醒,然后每天凌晨跑一次「滚动补充」任务:
func (s *ReminderService) RefillRecurring(ctx context.Context) {
horizon := time.Now().Add(7 * 24 * time.Hour)
events, _ := s.eventRepo.FindRecurringWithReminders(ctx)
for _, e := range events {
instances := e.ExpandBetween(time.Now(), horizon)
for _, inst := range instances {
for _, rule := range e.Reminders {
fireAt := inst.Start.Add(-time.Duration(rule.MinBefore) * time.Minute)
if fireAt.After(time.Now()) {
s.Schedule(ctx, &Reminder{
EventID: e.ID,
UserID: e.UserID,
Method: rule.Method,
MinBefore: rule.MinBefore,
FireAt: fireAt,
})
}
}
}
}
}
小结
提醒系统看起来不复杂,但细节很多:时区处理(用户设置的提醒基于事件所在时区)、重复事件的滚动调度、多实例下的消费幂等。Redis 延迟队列方案在我们的规模下表现很好,如果将来量级上去了,可以考虑换成 RocketMQ 的延迟消息。