商城系统开发(四):库存管理与分布式锁

商城系统里最怕什么?超卖。100件商品卖出去105件,这差额要么自己贴钱要么和用户扯皮,怎么着都是血亏。这篇来聊库存管理的方案——从最简单的数据库扣减到Redis分布式锁,把超卖问题彻底解决。

库存扣减的时机

先回答一个基础问题:什么时候扣库存?

方案一:下单时扣减(先扣库存后创订单)

  • 优点:不会超卖
  • 缺点:用户下单不付款,库存被锁住。大量恶意下单可以把库存"冻结"

方案二:支付时扣减(先创订单后扣库存)

  • 优点:不会被恶意占用库存
  • 缺点:可能超卖。多人同时下单支付,扣减时才发现库存不够

方案三:下单时预扣,支付后确认(两阶段)

  • 下单:available_stock -= 1, locked_stock += 1
  • 支付成功:locked_stock -= 1
  • 支付超时/取消:locked_stock -= 1, available_stock += 1

方案三是主流做法,兼顾了防超卖和防恶意占用。我的商城系统用的就是这个方案。

超卖问题的本质

超卖的本质是并发下的竞态条件。看一个典型场景:

库存=1

线程A: 查询库存 → 得到1 → 判断>0 → 扣减 → 库存=0  ✓
线程B: 查询库存 → 得到1 → 判断>0 → 扣减 → 库存=-1 ✗

A和B几乎同时查到库存为1,都认为可以扣减,结果超卖了。

解决思路:要么加锁保证串行,要么用原子操作保证check-and-set的原子性。

方案一:数据库乐观锁

最简单的方案,给库存表加一个version字段:

-- 库存表
CREATE TABLE product_stock (
    id BIGINT PRIMARY KEY,
    product_id BIGINT NOT NULL,
    sku_id BIGINT NOT NULL,
    available INT NOT NULL DEFAULT 0,
    locked INT NOT NULL DEFAULT 0,
    version INT NOT NULL DEFAULT 0,
    UNIQUE KEY uk_sku (sku_id)
);

-- 扣减库存(乐观锁)
UPDATE product_stock
SET available = available - 1,
    locked = locked + 1,
    version = version + 1
WHERE sku_id = ? AND available >= 1 AND version = ?;

Go代码实现:

func (r *StockRepo) DeductStock(ctx context.Context, skuID int64, quantity int) error {
    // 先查当前库存和version
    stock, err := r.GetStock(ctx, skuID)
    if err != nil {
        return err
    }
    if stock.Available < quantity {
        return ErrInsufficientStock
    }

    // 乐观锁更新
    result := r.db.WithContext(ctx).
        Model(&ProductStock{}).
        Where("sku_id = ? AND available >= ? AND version = ?",
            skuID, quantity, stock.Version).
        Updates(map[string]interface{}{
            "available": gorm.Expr("available - ?", quantity),
            "locked":    gorm.Expr("locked + ?", quantity),
            "version":   gorm.Expr("version + 1"),
        })

    if result.RowsAffected == 0 {
        return ErrStockConflict  // 版本冲突,让调用方重试
    }
    return result.Error
}

乐观锁方案的优缺点:

  • 优点:实现简单,不需要额外中间件
  • 缺点:高并发下冲突频繁,大量重试浪费数据库资源

实际上可以简化——把version去掉,直接用 available >= quantity 作为条件:

UPDATE product_stock
SET available = available - 1, locked = locked + 1
WHERE sku_id = ? AND available >= 1;

因为MySQL的UPDATE本身就是行级锁,available >= 1 已经保证了不会扣成负数。但这种方式在分布式数据库下可能有问题,加version更保险。

方案二:Redis分布式锁

数据库扛不住大并发时,把库存放到Redis里。关键是用分布式锁保证扣减的原子性。

SET NX EX + Lua释放

Redis分布式锁的标准实现:

// lock.go
package lock

import (
    "context"
    "time"
    "github.com/go-redis/nedis/v8"
    "github.com/google/uuid"
)

type RedisLock struct {
    client *redis.Client
    key    string
    value  string  // 唯一标识,防止误释放
    ttl    time.Duration
}

func NewRedisLock(client *redis.Client, key string, ttl time.Duration) *RedisLock {
    return &RedisLock{
        client: client,
        key:    "lock:" + key,
        value:  uuid.New().String(),
        ttl:    ttl,
    }
}

// 加锁
func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
    return l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
}

// 释放锁(Lua脚本保证原子性)
var unlockScript = redis.NewScript(`
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("DEL", KEYS[1])
    else
        return 0
    end
`)

func (l *RedisLock) Unlock(ctx context.Context) error {
    _, err := unlockScript.Run(ctx, l.client, []string{l.key}, l.value).Result()
    return err
}

为什么释放锁要用Lua脚本?因为"检查value → 删除key"必须是原子操作。如果分成两步,在检查和删除之间锁可能已经被其他人获取了,直接DEL就把别人的锁删了。

用锁保护库存扣减

func (s *StockService) DeductWithLock(ctx context.Context, skuID int64, qty int) error {
    lockKey := fmt.Sprintf("stock:%d", skuID)
    lock := NewRedisLock(s.redis, lockKey, 5*time.Second)

    // 自旋获取锁
    for i := 0; i < 30; i++ {
        ok, err := lock.Lock(ctx)
        if err != nil {
            return err
        }
        if ok {
            defer lock.Unlock(ctx)

            // 检查库存
            stockKey := fmt.Sprintf("stock:available:%d", skuID)
            available, err := s.redis.Get(ctx, stockKey).Int()
            if err != nil {
                return err
            }
            if available < qty {
                return ErrInsufficientStock
            }

            // 扣减
            s.redis.DecrBy(ctx, stockKey, int64(qty))
            return nil
        }
        time.Sleep(100 * time.Millisecond)
    }
    return ErrLockTimeout
}

更好的方式是直接用Lua脚本把check和deduct合成一个原子操作,连锁都不用加:

var deductScript = redis.NewScript(`
    local available = tonumber(redis.call("GET", KEYS[1]))
    if available == nil then
        return -1
    end
    if available < tonumber(ARGV[1]) then
        return 0
    end
    redis.call("DECRBY", KEYS[1], ARGV[1])
    return 1
`)

func (s *StockService) DeductAtomic(ctx context.Context, skuID int64, qty int) error {
    key := fmt.Sprintf("stock:available:%d", skuID)
    result, err := deductScript.Run(ctx, s.redis, []string{key}, qty).Int()
    if err != nil {
        return err
    }
    switch result {
    case -1:
        return ErrStockNotFound
    case 0:
        return ErrInsufficientStock
    default:
        return nil
    }
}

Lua脚本方案性能最好——一次网络往返搞定,没有锁竞争。

库存预热与回滚

预热

大促前需要把库存从MySQL同步到Redis:

func (s *StockService) WarmUp(ctx context.Context, skuIDs []int64) error {
    stocks, err := s.repo.BatchGetStocks(ctx, skuIDs)
    if err != nil {
        return err
    }

    pipe := s.redis.Pipeline()
    for _, stock := range stocks {
        key := fmt.Sprintf("stock:available:%d", stock.SkuID)
        pipe.Set(ctx, key, stock.Available, 24*time.Hour)
    }
    _, err = pipe.Exec(ctx)
    return err
}

回滚

订单取消或支付超时需要回滚库存:

func (s *StockService) Rollback(ctx context.Context, skuID int64, qty int) error {
    // Redis回滚
    key := fmt.Sprintf("stock:available:%d", skuID)
    s.redis.IncrBy(ctx, key, int64(qty))

    // 数据库回滚
    return s.repo.RollbackStock(ctx, skuID, qty)
}

Redis和数据库的一致性通过定时对账任务保证——每隔几分钟比对Redis和MySQL的库存数据,有差异就报警。

选型建议

场景 推荐方案
并发低(QPS < 100) 数据库乐观锁足够
中等并发(QPS 100-1000) Redis Lua脚本
高并发秒杀 Redis Lua + 队列削峰

我的商城系统目前用的是Redis Lua脚本方案,配合数据库乐观锁做兜底。Redis挂了的情况下降级到数据库方案,保证服务不中断。