做量化策略绕不开高频数据。日线级别的回测能验证大方向,但一旦涉及执行层面——滑点估算、VWAP/TWAP 拆单、微观结构分析——就必须把颗粒度压到 Tick 级别。本文整理了我在处理 Level2 行情时踩过的坑和一些实用技巧。
Level2 行情数据结构
国内 A 股 Level2 行情主要包含三类数据:
| 数据类型 | 频率 | 关键字段 |
|---|---|---|
| 逐笔成交 | 每笔 | SecurityID, TradePrice, TradeQty, ExecType(主买/主卖), TradeTime |
| 逐笔委托 | 每笔 | SecurityID, Price, OrderQty, Side(B/S), OrderType(限价/市价/撤单) |
| 快照(Snapshot) | 3s | SecurityID, LastPx, BidPrice[10], AskPrice[10], BidOrderQty[10], AskOrderQty[10], TotalVolumeTrade, Timestamp |
数据量级:单只股票一天的逐笔成交大约几万到几十万条,全市场一天约 2-3 亿条。直接用 CSV 存储是不现实的,通常落盘格式选择 Parquet(列式、压缩比高、pandas 读取快)。
Tick 数据清洗
原始 Tick 数据存在不少脏数据,常见问题:
1. 集合竞价与连续竞价混杂
import pandas as pd
def filter_continuous_auction(df: pd.DataFrame) -> pd.DataFrame:
'''只保留连续竞价时段 (09:30-11:30, 13:00-15:00)'''
t = df["TradeTime"]
morning = (t >= "09:30:00") & (t <= "11:30:00")
afternoon = (t >= "13:00:00") & (t <= "15:00:00")
return df[morning | afternoon].copy()
2. 零价格 / 异常价格
def remove_bad_price(df: pd.DataFrame, col: str = "TradePrice") -> pd.DataFrame:
'''移除价格为0或偏离均值过远的记录'''
df = df[df[col] > 0]
median = df[col].median()
# 偏离中位数 20% 以上视为异常
df = df[df[col].between(median * 0.8, median * 1.2)]
return df
3. 重复时间戳去重
交易所推送可能出现同一时间戳多条记录(尤其是逐笔委托),需要根据 SeqNo 去重:
df.drop_duplicates(subset=["SecurityID", "SeqNo"], keep="first", inplace=True)
VWAP 与 TWAP 计算
**VWAP(成交量加权平均价)**是算法交易最核心的基准价格:
def calc_vwap(df: pd.DataFrame) -> float:
'''
VWAP = sum(Price_i * Volume_i) / sum(Volume_i)
'''
turnover = (df["TradePrice"] * df["TradeQty"]).sum()
total_vol = df["TradeQty"].sum()
return turnover / total_vol if total_vol > 0 else 0.0
def calc_vwap_series(df: pd.DataFrame, freq: str = "1min") -> pd.Series:
'''分时段 VWAP,用于和实际成交对比滑点'''
df = df.set_index(pd.to_datetime(df["TradeTime"]))
grouped = df.resample(freq)
vwap = grouped.apply(lambda g: calc_vwap(g) if len(g) > 0 else float("nan"))
return vwap
**TWAP(时间加权平均价)**则是等时间间隔的算术均价,计算更简单但实际参考价值不如 VWAP:
def calc_twap(df: pd.DataFrame, freq: str = "1min") -> float:
'''TWAP = mean(各时间切片的 mid price)'''
df = df.set_index(pd.to_datetime(df["TradeTime"]))
mid_prices = df.resample(freq)["TradePrice"].mean()
return mid_prices.mean()
订单簿快照分析
订单簿(Order Book)分析可以辅助判断短期价格压力方向。常用指标:
买卖压力比:
def bid_ask_imbalance(snap: dict) -> float:
'''
正值偏买方 -> 价格上行压力
负值偏卖方 -> 价格下行压力
'''
bid_vol = sum(snap["BidOrderQty"][:5])
ask_vol = sum(snap["AskOrderQty"][:5])
total = bid_vol + ask_vol
if total == 0:
return 0.0
return (bid_vol - ask_vol) / total
加权中间价(Weighted Mid Price):
def weighted_mid_price(snap: dict) -> float:
'''
用挂单量加权的中间价,比简单 (bid1 + ask1)/2 更能反映真实均衡位置
'''
bid1 = snap["BidPrice"][0]
ask1 = snap["AskPrice"][0]
bid_vol1 = snap["BidOrderQty"][0]
ask_vol1 = snap["AskOrderQty"][0]
total = bid_vol1 + ask_vol1
if total == 0:
return (bid1 + ask1) / 2
# 注意:买量越大,价格越靠近 ask(买方推高)
return (bid1 * ask_vol1 + ask1 * bid_vol1) / total
pandas 高性能处理技巧
处理几亿条 Tick 数据,性能是硬约束。几个实测有效的优化:
1. 用 category dtype 压缩字符串列
df["SecurityID"] = df["SecurityID"].astype("category")
df["ExecType"] = df["ExecType"].astype("category")
# 内存占用能降 60%+
2. 尽量避免 apply,用向量化操作
# 慢:逐行 apply
df["turnover"] = df.apply(lambda row: row["TradePrice"] * row["TradeQty"], axis=1)
# 快:向量化
df["turnover"] = df["TradePrice"] * df["TradeQty"]
实测全市场一天逐笔成交数据,向量化比 apply 快 50-100 倍。
3. 读 Parquet 时只选需要的列
df = pd.read_parquet("ticks_20240801.parquet",
columns=["SecurityID", "TradePrice", "TradeQty", "TradeTime"])
4. 用 numba 加速自定义计算
import numba as nb
import numpy as np
@nb.njit
def rolling_vwap(prices: np.ndarray, volumes: np.ndarray, window: int) -> np.ndarray:
n = len(prices)
result = np.empty(n, dtype=np.float64)
for i in range(n):
start = max(0, i - window + 1)
turnover = 0.0
vol = 0.0
for j in range(start, i + 1):
turnover += prices[j] * volumes[j]
vol += volumes[j]
result[i] = turnover / vol if vol > 0 else 0.0
return result
numba JIT 编译后性能接近 C,适合写滑动窗口类的指标。
小结
高频数据处理的核心就是两件事:数据质量和计算效率。清洗环节宁可严格一些、多丢几条脏数据,也不要让异常值污染指标计算。性能方面,Parquet + 向量化 + numba 基本能覆盖大部分场景,实在不够再上 Polars 或 DuckDB。