商城系统里最怕什么?超卖。100件商品卖出去105件,这差额要么自己贴钱要么和用户扯皮,怎么着都是血亏。这篇来聊库存管理的方案——从最简单的数据库扣减到Redis分布式锁,把超卖问题彻底解决。
库存扣减的时机
先回答一个基础问题:什么时候扣库存?
方案一:下单时扣减(先扣库存后创订单)
- 优点:不会超卖
- 缺点:用户下单不付款,库存被锁住。大量恶意下单可以把库存"冻结"
方案二:支付时扣减(先创订单后扣库存)
- 优点:不会被恶意占用库存
- 缺点:可能超卖。多人同时下单支付,扣减时才发现库存不够
方案三:下单时预扣,支付后确认(两阶段)
- 下单:
available_stock -= 1, locked_stock += 1 - 支付成功:
locked_stock -= 1 - 支付超时/取消:
locked_stock -= 1, available_stock += 1
方案三是主流做法,兼顾了防超卖和防恶意占用。我的商城系统用的就是这个方案。
超卖问题的本质
超卖的本质是并发下的竞态条件。看一个典型场景:
库存=1
线程A: 查询库存 → 得到1 → 判断>0 → 扣减 → 库存=0 ✓
线程B: 查询库存 → 得到1 → 判断>0 → 扣减 → 库存=-1 ✗
A和B几乎同时查到库存为1,都认为可以扣减,结果超卖了。
解决思路:要么加锁保证串行,要么用原子操作保证check-and-set的原子性。
方案一:数据库乐观锁
最简单的方案,给库存表加一个version字段:
-- 库存表
CREATE TABLE product_stock (
id BIGINT PRIMARY KEY,
product_id BIGINT NOT NULL,
sku_id BIGINT NOT NULL,
available INT NOT NULL DEFAULT 0,
locked INT NOT NULL DEFAULT 0,
version INT NOT NULL DEFAULT 0,
UNIQUE KEY uk_sku (sku_id)
);
-- 扣减库存(乐观锁)
UPDATE product_stock
SET available = available - 1,
locked = locked + 1,
version = version + 1
WHERE sku_id = ? AND available >= 1 AND version = ?;
Go代码实现:
func (r *StockRepo) DeductStock(ctx context.Context, skuID int64, quantity int) error {
// 先查当前库存和version
stock, err := r.GetStock(ctx, skuID)
if err != nil {
return err
}
if stock.Available < quantity {
return ErrInsufficientStock
}
// 乐观锁更新
result := r.db.WithContext(ctx).
Model(&ProductStock{}).
Where("sku_id = ? AND available >= ? AND version = ?",
skuID, quantity, stock.Version).
Updates(map[string]interface{}{
"available": gorm.Expr("available - ?", quantity),
"locked": gorm.Expr("locked + ?", quantity),
"version": gorm.Expr("version + 1"),
})
if result.RowsAffected == 0 {
return ErrStockConflict // 版本冲突,让调用方重试
}
return result.Error
}
乐观锁方案的优缺点:
- 优点:实现简单,不需要额外中间件
- 缺点:高并发下冲突频繁,大量重试浪费数据库资源
实际上可以简化——把version去掉,直接用 available >= quantity 作为条件:
UPDATE product_stock
SET available = available - 1, locked = locked + 1
WHERE sku_id = ? AND available >= 1;
因为MySQL的UPDATE本身就是行级锁,available >= 1 已经保证了不会扣成负数。但这种方式在分布式数据库下可能有问题,加version更保险。
方案二:Redis分布式锁
数据库扛不住大并发时,把库存放到Redis里。关键是用分布式锁保证扣减的原子性。
SET NX EX + Lua释放
Redis分布式锁的标准实现:
// lock.go
package lock
import (
"context"
"time"
"github.com/go-redis/nedis/v8"
"github.com/google/uuid"
)
type RedisLock struct {
client *redis.Client
key string
value string // 唯一标识,防止误释放
ttl time.Duration
}
func NewRedisLock(client *redis.Client, key string, ttl time.Duration) *RedisLock {
return &RedisLock{
client: client,
key: "lock:" + key,
value: uuid.New().String(),
ttl: ttl,
}
}
// 加锁
func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
return l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
}
// 释放锁(Lua脚本保证原子性)
var unlockScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
func (l *RedisLock) Unlock(ctx context.Context) error {
_, err := unlockScript.Run(ctx, l.client, []string{l.key}, l.value).Result()
return err
}
为什么释放锁要用Lua脚本?因为"检查value → 删除key"必须是原子操作。如果分成两步,在检查和删除之间锁可能已经被其他人获取了,直接DEL就把别人的锁删了。
用锁保护库存扣减
func (s *StockService) DeductWithLock(ctx context.Context, skuID int64, qty int) error {
lockKey := fmt.Sprintf("stock:%d", skuID)
lock := NewRedisLock(s.redis, lockKey, 5*time.Second)
// 自旋获取锁
for i := 0; i < 30; i++ {
ok, err := lock.Lock(ctx)
if err != nil {
return err
}
if ok {
defer lock.Unlock(ctx)
// 检查库存
stockKey := fmt.Sprintf("stock:available:%d", skuID)
available, err := s.redis.Get(ctx, stockKey).Int()
if err != nil {
return err
}
if available < qty {
return ErrInsufficientStock
}
// 扣减
s.redis.DecrBy(ctx, stockKey, int64(qty))
return nil
}
time.Sleep(100 * time.Millisecond)
}
return ErrLockTimeout
}
更好的方式是直接用Lua脚本把check和deduct合成一个原子操作,连锁都不用加:
var deductScript = redis.NewScript(`
local available = tonumber(redis.call("GET", KEYS[1]))
if available == nil then
return -1
end
if available < tonumber(ARGV[1]) then
return 0
end
redis.call("DECRBY", KEYS[1], ARGV[1])
return 1
`)
func (s *StockService) DeductAtomic(ctx context.Context, skuID int64, qty int) error {
key := fmt.Sprintf("stock:available:%d", skuID)
result, err := deductScript.Run(ctx, s.redis, []string{key}, qty).Int()
if err != nil {
return err
}
switch result {
case -1:
return ErrStockNotFound
case 0:
return ErrInsufficientStock
default:
return nil
}
}
Lua脚本方案性能最好——一次网络往返搞定,没有锁竞争。
库存预热与回滚
预热
大促前需要把库存从MySQL同步到Redis:
func (s *StockService) WarmUp(ctx context.Context, skuIDs []int64) error {
stocks, err := s.repo.BatchGetStocks(ctx, skuIDs)
if err != nil {
return err
}
pipe := s.redis.Pipeline()
for _, stock := range stocks {
key := fmt.Sprintf("stock:available:%d", stock.SkuID)
pipe.Set(ctx, key, stock.Available, 24*time.Hour)
}
_, err = pipe.Exec(ctx)
return err
}
回滚
订单取消或支付超时需要回滚库存:
func (s *StockService) Rollback(ctx context.Context, skuID int64, qty int) error {
// Redis回滚
key := fmt.Sprintf("stock:available:%d", skuID)
s.redis.IncrBy(ctx, key, int64(qty))
// 数据库回滚
return s.repo.RollbackStock(ctx, skuID, qty)
}
Redis和数据库的一致性通过定时对账任务保证——每隔几分钟比对Redis和MySQL的库存数据,有差异就报警。
选型建议
| 场景 | 推荐方案 |
|---|---|
| 并发低(QPS < 100) | 数据库乐观锁足够 |
| 中等并发(QPS 100-1000) | Redis Lua脚本 |
| 高并发秒杀 | Redis Lua + 队列削峰 |
我的商城系统目前用的是Redis Lua脚本方案,配合数据库乐观锁做兜底。Redis挂了的情况下降级到数据库方案,保证服务不中断。