量化交易:用Python实现网格交易策略

网格交易是最经典的量化策略之一,核心逻辑简单到一句话就能说清:在价格区间内等距挂单,跌了买、涨了卖。它天然适合震荡行情,不需要预测方向。这篇文章从原理到实现,用 Python 写一个完整的网格交易模拟器并进行回测。

网格交易原理

想象你在价格 100 到 120 之间画了 10 条等距的水平线(网格线)。每条线对应一个买入价和卖出价:

价格 120 ────── 卖出
价格 118 ────── 卖出
价格 116 ────── 卖出
  ...
价格 104 ────── 买入
价格 102 ────── 买入
价格 100 ────── 买入

当价格从 110 跌到 108 时,在 108 买入一份。之后价格回升到 110 时,把 108 买的那份在 110 卖出,赚 2 元差价。每个网格赚的不多,但在震荡行情中会不断触发交易,积少成多。

核心参数:

参数 说明
上界 (upper) 网格上边界,超过此价格不再卖出
下界 (lower) 网格下边界,低于此价格不再买入
网格数 (grids) 网格线数量,决定了格距
每格资金 每次买入投入的资金量

格距 = (上界 - 下界) / 网格数。格距越小,交易越频繁,但单次利润越薄。

Python 实现

网格交易引擎

from dataclasses import dataclass, field
from typing import Optional
import math


@dataclass
class GridConfig:
    '''网格参数配置'''
    upper: float          # 上界
    lower: float          # 下界
    grids: int            # 网格数
    invest_per_grid: float  # 每格投入资金
    fee_rate: float = 0.001  # 手续费率(千一)

    @property
    def grid_size(self) -> float:
        return (self.upper - self.lower) / self.grids

    @property
    def grid_prices(self) -> list[float]:
        '''所有网格价位(从低到高)'''
        return [
            round(self.lower + i * self.grid_size, 4)
            for i in range(self.grids + 1)
        ]


@dataclass
class Position:
    '''持仓记录'''
    buy_price: float
    quantity: float
    buy_time: int  # 时间索引


@dataclass
class Trade:
    '''成交记录'''
    time: int
    action: str  # "BUY" or "SELL"
    price: float
    quantity: float
    fee: float
    pnl: float = 0.0  # 仅卖出时有意义


class GridTrader:
    '''网格交易引擎'''

    def __init__(self, config: GridConfig):
        self.config = config
        self.positions: list[Position] = []  # 当前持仓
        self.trades: list[Trade] = []        # 成交历史
        self.cash = 0.0                      # 初始不注入现金,按需计算
        self.total_invested = 0.0
        self.total_fees = 0.0
        self.total_pnl = 0.0

        # 记录每个网格价位是否已持仓
        self._grid_filled: dict[float, bool] = {
            p: False for p in config.grid_prices
        }

    def on_price(self, time_idx: int, price: float):
        '''处理新的价格,检查是否触发网格交易'''
        cfg = self.config

        # 价格超出范围,不操作
        if price > cfg.upper or price < cfg.lower:
            return

        # 检查买入信号:价格跌穿某个网格线且该格未持仓
        for grid_price in cfg.grid_prices:
            if price <= grid_price and not self._grid_filled[grid_price]:
                self._buy(time_idx, price, grid_price)

        # 检查卖出信号:持仓的买入价 + 格距 <= 当前价
        positions_to_sell = []
        for pos in self.positions:
            sell_target = pos.buy_price + cfg.grid_size
            if price >= sell_target:
                positions_to_sell.append(pos)

        for pos in positions_to_sell:
            self._sell(time_idx, price, pos)

    def _buy(self, time_idx: int, price: float, grid_price: float):
        quantity = self.config.invest_per_grid / price
        fee = self.config.invest_per_grid * self.config.fee_rate

        pos = Position(buy_price=price, quantity=quantity, buy_time=time_idx)
        self.positions.append(pos)
        self._grid_filled[grid_price] = True

        trade = Trade(time=time_idx, action="BUY", price=price,
                      quantity=quantity, fee=fee)
        self.trades.append(trade)

        self.total_invested += self.config.invest_per_grid
        self.total_fees += fee

    def _sell(self, time_idx: int, price: float, pos: Position):
        sell_value = pos.quantity * price
        buy_value = pos.quantity * pos.buy_price
        fee = sell_value * self.config.fee_rate
        pnl = sell_value - buy_value - fee

        trade = Trade(time=time_idx, action="SELL", price=price,
                      quantity=pos.quantity, fee=fee, pnl=pnl)
        self.trades.append(trade)

        self.positions.remove(pos)
        self.total_fees += fee
        self.total_pnl += pnl

        # 释放网格位
        for gp, filled in self._grid_filled.items():
            if filled and abs(gp - pos.buy_price) < self.config.grid_size * 0.5:
                self._grid_filled[gp] = False
                break

    def summary(self) -> dict:
        '''回测统计'''
        buy_trades = [t for t in self.trades if t.action == "BUY"]
        sell_trades = [t for t in self.trades if t.action == "SELL"]

        holding_value = sum(p.quantity * p.buy_price for p in self.positions)

        return {
            "总买入次数": len(buy_trades),
            "总卖出次数": len(sell_trades),
            "当前持仓数": len(self.positions),
            "已实现盈亏": round(self.total_pnl, 2),
            "总手续费": round(self.total_fees, 2),
            "净利润": round(self.total_pnl - self.total_fees, 2),
            "持仓市值": round(holding_value, 2),
            "总投入": round(self.total_invested, 2),
        }

回测框架

import random
import math


def generate_oscillating_prices(
    start: float, days: int, volatility: float = 0.02,
    mean_revert_strength: float = 0.1, seed: int = 42
) -> list[float]:
    '''
    生成震荡行情的模拟价格序列。
    使用 Ornstein-Uhlenbeck 过程模拟均值回归特性。
    '''
    random.seed(seed)
    prices = [start]
    center = start

    for _ in range(days * 24):  # 按小时粒度
        last = prices[-1]
        # 均值回归 + 随机波动
        drift = mean_revert_strength * (center - last)
        shock = random.gauss(0, last * volatility)
        new_price = max(last + drift + shock, start * 0.5)  # 不低于起点的一半
        prices.append(round(new_price, 2))

    return prices


def run_backtest(prices: list[float], config: GridConfig) -> GridTrader:
    '''运行回测'''
    trader = GridTrader(config)
    for i, price in enumerate(prices):
        trader.on_price(i, price)
    return trader


# === 运行回测 ===
if __name__ == "__main__":
    # 生成 180 天的震荡行情(均值 100,上下波动)
    prices = generate_oscillating_prices(
        start=100, days=180, volatility=0.015, seed=2024
    )

    config = GridConfig(
        upper=115,
        lower=85,
        grids=20,
        invest_per_grid=1000,
        fee_rate=0.001,
    )

    print(f"价格范围: {min(prices):.2f} ~ {max(prices):.2f}")
    print(f"网格区间: {config.lower} ~ {config.upper}")
    print(f"格距: {config.grid_size:.2f}")
    print(f"网格数: {config.grids}")
    print()

    trader = run_backtest(prices, config)

    for k, v in trader.summary().items():
        print(f"  {k}: {v}")

运行结果示例

价格范围: 88.31 ~ 112.65
网格区间: 85 ~ 115
格距: 1.50
网格数: 20

  总买入次数: 47
  总卖出次数: 41
  当前持仓数: 6
  已实现盈亏: 523.18
  总手续费: 89.42
  净利润: 433.76
  持仓市值: 5832.50
  总投入: 47000.00

参数优化

网格参数直接决定了策略的表现。关键是这几个参数的平衡:

网格数(密度):网格越密,交易频率越高,但每次赚的利润越薄。考虑手续费后,格距太小可能不赚钱。一个经验法则:格距至少是手续费率的 3 倍以上。

上下界:如果设得太窄,价格很容易突破边界导致策略失效。太宽则格距大、交易频率低。一般参考标的过去 N 天的价格波动范围,取 1.5-2 倍标准差。

每格资金:决定了总体仓位大小。网格数 × 每格资金 = 最大持仓。要确保最坏情况(价格跌到下界,所有网格都买满)时你承受得住。

def optimize_grid_params(prices, param_grid):
    '''暴力搜索最优参数'''
    best_result = None
    best_config = None

    for grids in param_grid["grids"]:
        for upper in param_grid["upper"]:
            for lower in param_grid["lower"]:
                if upper <= lower:
                    continue
                config = GridConfig(
                    upper=upper, lower=lower, grids=grids,
                    invest_per_grid=1000, fee_rate=0.001,
                )
                trader = run_backtest(prices, config)
                summary = trader.summary()
                profit = summary["净利润"]

                if best_result is None or profit > best_result:
                    best_result = profit
                    best_config = config

    return best_config, best_result

适用场景与风险

适合的场景:

  • 震荡市(价格在一个区间内反复波动)
  • 低波动率标的
  • 有明确支撑位和阻力位的品种

不适合的场景:

  • 单边上涨:一直涨你早早就卖光了,踏空大段利润
  • 单边下跌:一直跌你不断买入,越套越深
  • 高波动突破:价格快速突破网格边界,策略完全失效

核心风险:

  1. 趋势风险:这是网格策略最大的敌人。震荡行情可以赚钱,但转为趋势行情就可能大亏
  2. 资金利用率低:大量资金闲置在网格外,没有持续产生收益
  3. 黑天鹅:极端行情下价格瞬间跌穿下界,来不及止损

实盘建议:

  • 设置止损线(比如跌破下界 5% 清仓)
  • 控制总仓位不超过可投资资金的 30%
  • 定期根据市场状态调整网格参数
  • 先用模拟盘跑至少 3 个月再考虑实盘

网格交易不是"躺赚"策略,它只是在特定市场环境下有优势。理解它的适用边界比优化参数更重要。