之前的转写系统是离线模式——录完音再整段转写。这次升级加入了流式转写,音频一边采集一边出文字,体验提升非常大。
流式ASR vs 离线ASR
离线ASR:收集完整音频 → 一次性送入模型 → 返回完整结果。延迟高但准确率高,因为模型能看到全局上下文。适合录音转写、字幕生成等对实时性要求不高的场景。
流式ASR:音频实时分段送入 → 每段返回部分结果 → 不断修正。延迟低(通常1-3秒),但可能有中间结果被修正的情况。适合实时字幕、语音输入、会议记录等需要即时反馈的场景。
流式的核心挑战:
- 如何分段——切太短缺少上下文,切太长延迟高
- 如何处理分段边界的词被切断的问题
- 如何在保持低延迟的同时不牺牲太多准确率
WebSocket实时音频流传输
客户端和服务端之间用WebSocket传输音频流。HTTP不适合这种持续双向的数据流。
服务端(Python + FastAPI):
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json
app = FastAPI()
@app.websocket("/ws/transcribe")
async def websocket_transcribe(websocket: WebSocket):
await websocket.accept()
# 初始化转写引擎
transcriber = StreamingTranscriber(
model_size="large-v2",
language="zh",
)
try:
while True:
# 接收音频数据(二进制)
data = await websocket.receive_bytes()
if len(data) == 0:
# 空数据表示结束信号
final_result = transcriber.finalize()
await websocket.send_json({
"type": "final",
"text": final_result,
})
break
# 送入转写引擎
results = transcriber.feed(data)
for result in results:
await websocket.send_json({
"type": result["type"], # "partial" 或 "stable"
"text": result["text"],
"start": result["start"],
"end": result["end"],
})
except WebSocketDisconnect:
transcriber.cleanup()
客户端(Python):
import asyncio
import websockets
import pyaudio
import json
async def stream_microphone():
"""从麦克风采集音频并实时转写"""
uri = "ws://localhost:8000/ws/transcribe"
# 音频参数
RATE = 16000
CHUNK = 4096 # 每次发送的采样数(约256ms)
FORMAT = pyaudio.paInt16
CHANNELS = 1
audio = pyaudio.PyAudio()
stream = audio.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
async with websockets.connect(uri) as ws:
# 启动接收协程
async def receiver():
async for message in ws:
result = json.loads(message)
if result["type"] == "partial":
# 部分结果,可能会被修正
print(f"\r[部分] {result['text']}", end="", flush=True)
elif result["type"] == "stable":
# 稳定结果,不会再变
print(f"\n[确认] {result['text']}")
elif result["type"] == "final":
print(f"\n[最终] {result['text']}")
recv_task = asyncio.create_task(receiver())
# 采集并发送音频
try:
print("开始录音,按Ctrl+C停止...")
while True:
data = stream.read(CHUNK, exception_on_overflow=False)
await ws.send(data)
await asyncio.sleep(0.01) # 让出控制权给接收协程
except KeyboardInterrupt:
# 发送结束信号
await ws.send(b"")
await asyncio.sleep(1) # 等待最终结果
finally:
recv_task.cancel()
stream.stop_stream()
stream.close()
audio.terminate()
asyncio.run(stream_microphone())
VAD分段策略
Voice Activity Detection(语音活动检测)是流式转写的关键组件。它决定了何时将音频片段送入ASR模型。
为什么需要VAD:不能简单地按固定时长切分音频。固定切分可能把一个词切成两半,导致识别错误。VAD检测语音和静音的边界,在自然停顿处切分。
我使用的方案——Silero VAD:
import torch
import numpy as np
class VADSegmenter:
"""基于Silero VAD的音频分段器"""
def __init__(self, sample_rate=16000):
self.model, self.utils = torch.hub.load(
'snakers4/silero-vad', 'silero_vad', trust_repo=True
)
self.sample_rate = sample_rate
self.buffer = np.array([], dtype=np.float32)
self.speech_buffer = np.array([], dtype=np.float32)
self.is_speaking = False
# 参数
self.min_speech_duration = 0.5 # 最短语音段(秒)
self.max_speech_duration = 15.0 # 最长语音段(秒)
self.silence_threshold = 0.6 # 静音判定阈值(秒)
self.window_size = 512 # VAD检测窗口(采样数)
self.silence_count = 0
def feed(self, audio_chunk: np.ndarray):
"""
输入音频数据,返回完整的语音段列表
audio_chunk: float32 numpy array, [-1, 1]
"""
self.buffer = np.concatenate([self.buffer, audio_chunk])
segments = []
while len(self.buffer) >= self.window_size:
window = self.buffer[:self.window_size]
self.buffer = self.buffer[self.window_size:]
# VAD检测
tensor = torch.from_numpy(window)
speech_prob = self.model(tensor, self.sample_rate).item()
if speech_prob > 0.5:
# 检测到语音
self.is_speaking = True
self.silence_count = 0
self.speech_buffer = np.concatenate([
self.speech_buffer, window
])
else:
if self.is_speaking:
self.silence_count += 1
self.speech_buffer = np.concatenate([
self.speech_buffer, window
])
silence_duration = (self.silence_count * self.window_size
/ self.sample_rate)
speech_duration = (len(self.speech_buffer)
/ self.sample_rate)
# 静音超过阈值,或语音段超过最大长度
if (silence_duration >= self.silence_threshold or
speech_duration >= self.max_speech_duration):
if speech_duration >= self.min_speech_duration:
segments.append(self.speech_buffer.copy())
self.speech_buffer = np.array([], dtype=np.float32)
self.is_speaking = False
self.silence_count = 0
return segments
def flush(self):
"""结束时获取剩余的语音段"""
if len(self.speech_buffer) > 0:
duration = len(self.speech_buffer) / self.sample_rate
if duration >= self.min_speech_duration:
segment = self.speech_buffer.copy()
self.speech_buffer = np.array([], dtype=np.float32)
return [segment]
return []
分段策略的关键参数:
- silence_threshold:多长的静音算"说完了"。太短会在正常停顿时切断,太长延迟高。0.5-0.8秒是比较好的范围。
- max_speech_duration:防止有人一直说不停导致片段太长。15秒是个不错的上限。
- min_speech_duration:过滤掉太短的噪声片段。0.3-0.5秒。
FasterWhisper推理优化
模型推理用FasterWhisper,它是Whisper的CTranslate2优化版本,速度比原版快4倍:
from faster_whisper import WhisperModel
class StreamingTranscriber:
"""流式转写引擎"""
def __init__(self, model_size="large-v2", language="zh",
device="cuda", compute_type="float16"):
self.model = WhisperModel(
model_size,
device=device,
compute_type=compute_type, # GPU用float16,CPU用int8
)
self.language = language
self.vad = VADSegmenter()
# 结果管理
self.stable_segments = [] # 已确认的结果
self.pending_audio = None # 待确认的音频片段
self.total_duration = 0.0 # 已处理的总时长
def feed(self, raw_bytes: bytes):
"""
输入原始PCM音频数据
返回转写结果列表
"""
# bytes → float32 numpy array
audio = np.frombuffer(raw_bytes, dtype=np.int16).astype(np.float32)
audio = audio / 32768.0 # 归一化到[-1, 1]
# VAD分段
segments = self.vad.feed(audio)
results = []
for segment in segments:
# 转写这个语音段
text = self._transcribe(segment)
if text.strip():
duration = len(segment) / 16000
result = {
"type": "stable",
"text": text,
"start": self.total_duration,
"end": self.total_duration + duration,
}
results.append(result)
self.stable_segments.append(result)
self.total_duration += duration
return results
def _transcribe(self, audio: np.ndarray) -> str:
"""对一个音频段执行转写"""
segments, info = self.model.transcribe(
audio,
language=self.language,
beam_size=5,
best_of=5,
vad_filter=False, # 我们自己做了VAD
word_timestamps=False,
condition_on_previous_text=True,
initial_prompt="以下是一段中文语音转写。",
)
text = "".join(seg.text for seg in segments)
return text.strip()
def finalize(self):
"""结束转写,处理剩余音频"""
remaining = self.vad.flush()
for segment in remaining:
text = self._transcribe(segment)
if text.strip():
self.stable_segments.append({
"type": "final",
"text": text,
})
full_text = "".join(s["text"] for s in self.stable_segments)
return full_text
def cleanup(self):
"""清理资源"""
self.stable_segments.clear()
self.total_duration = 0.0
FasterWhisper的关键优化参数:
- compute_type:GPU用
float16,CPU用int8。int8量化在CPU上速度提升明显,精度损失很小。 - beam_size:束搜索宽度。5是默认值,减到1可以加速但质量下降。
- condition_on_previous_text:利用上一段的文字作为上下文,提高连贯性。但有时会导致幻觉(重复上一段内容),需要做后处理过滤。
- initial_prompt:给模型一个初始提示,帮助它确定语言和风格。中文场景加上中文提示效果更好。
部分结果实时返回
上面的实现是等到VAD切出完整段才返回结果。为了更低的延迟,可以加入"部分结果"机制——在语音段还没结束时就先转写已有的部分,返回一个可能被修正的中间结果:
class StreamingTranscriberV2(StreamingTranscriber):
"""支持部分结果的流式转写引擎"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.partial_interval = 1.0 # 每隔1秒输出一次部分结果
self.last_partial_time = 0
self.accumulated_audio = np.array([], dtype=np.float32)
def feed(self, raw_bytes: bytes):
audio = np.frombuffer(raw_bytes, dtype=np.int16).astype(np.float32)
audio = audio / 32768.0
self.accumulated_audio = np.concatenate([
self.accumulated_audio, audio
])
segments = self.vad.feed(audio)
results = []
if segments:
# 有完整段了,返回stable结果
for segment in segments:
text = self._transcribe(segment)
if text.strip():
duration = len(segment) / 16000
results.append({
"type": "stable",
"text": text,
"start": self.total_duration,
"end": self.total_duration + duration,
})
self.total_duration += duration
self.accumulated_audio = np.array([], dtype=np.float32)
self.last_partial_time = 0
elif self.vad.is_speaking:
# 正在说话但还没切段,检查是否该输出部分结果
current_duration = len(self.vad.speech_buffer) / 16000
if (current_duration - self.last_partial_time
>= self.partial_interval
and current_duration >= 0.5):
partial_text = self._transcribe(self.vad.speech_buffer)
if partial_text.strip():
results.append({
"type": "partial",
"text": partial_text,
"start": self.total_duration,
"end": self.total_duration + current_duration,
})
self.last_partial_time = current_duration
return results
部分结果的体验类似手机语音输入——文字先出来,可能会跳动修正,最终稳定。客户端收到partial类型的结果时覆盖显示,收到stable类型时追加。
性能参考
使用 FasterWhisper large-v2(float16)在配备独立 GPU 的环境中:
- 单段转写延迟:0.3-0.8秒(取决于音频长度)
- 端到端延迟(从说完到看到结果):1-2秒
- 实时率(RTF):约0.1(处理10秒音频只需1秒)
- GPU显存占用:约2.5GB
CPU 模式(int8 量化)在现代多核处理器上:
- RTF 约 0.5-0.8
- 基本能做到实时,但余量不大
总结
流式转写的核心是三个组件的配合:WebSocket传输、VAD智能分段、FasterWhisper高效推理。VAD的分段策略对最终效果影响最大——切分点选得好,转写质量和离线模式差距很小。部分结果机制虽然增加了复杂度,但对用户体验的提升很明显。