转写系统优化:流式语音转写实现

之前的转写系统是离线模式——录完音再整段转写。这次升级加入了流式转写,音频一边采集一边出文字,体验提升非常大。

流式ASR vs 离线ASR

离线ASR:收集完整音频 → 一次性送入模型 → 返回完整结果。延迟高但准确率高,因为模型能看到全局上下文。适合录音转写、字幕生成等对实时性要求不高的场景。

流式ASR:音频实时分段送入 → 每段返回部分结果 → 不断修正。延迟低(通常1-3秒),但可能有中间结果被修正的情况。适合实时字幕、语音输入、会议记录等需要即时反馈的场景。

流式的核心挑战:

  1. 如何分段——切太短缺少上下文,切太长延迟高
  2. 如何处理分段边界的词被切断的问题
  3. 如何在保持低延迟的同时不牺牲太多准确率

WebSocket实时音频流传输

客户端和服务端之间用WebSocket传输音频流。HTTP不适合这种持续双向的数据流。

服务端(Python + FastAPI):

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json

app = FastAPI()

@app.websocket("/ws/transcribe")
async def websocket_transcribe(websocket: WebSocket):
    await websocket.accept()
    
    # 初始化转写引擎
    transcriber = StreamingTranscriber(
        model_size="large-v2",
        language="zh",
    )
    
    try:
        while True:
            # 接收音频数据(二进制)
            data = await websocket.receive_bytes()
            
            if len(data) == 0:
                # 空数据表示结束信号
                final_result = transcriber.finalize()
                await websocket.send_json({
                    "type": "final",
                    "text": final_result,
                })
                break
            
            # 送入转写引擎
            results = transcriber.feed(data)
            
            for result in results:
                await websocket.send_json({
                    "type": result["type"],  # "partial" 或 "stable"
                    "text": result["text"],
                    "start": result["start"],
                    "end": result["end"],
                })
    
    except WebSocketDisconnect:
        transcriber.cleanup()

客户端(Python):

import asyncio
import websockets
import pyaudio
import json

async def stream_microphone():
    """从麦克风采集音频并实时转写"""
    uri = "ws://localhost:8000/ws/transcribe"
    
    # 音频参数
    RATE = 16000
    CHUNK = 4096  # 每次发送的采样数(约256ms)
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    
    audio = pyaudio.PyAudio()
    stream = audio.open(
        format=FORMAT,
        channels=CHANNELS,
        rate=RATE,
        input=True,
        frames_per_buffer=CHUNK,
    )
    
    async with websockets.connect(uri) as ws:
        # 启动接收协程
        async def receiver():
            async for message in ws:
                result = json.loads(message)
                if result["type"] == "partial":
                    # 部分结果,可能会被修正
                    print(f"\r[部分] {result['text']}", end="", flush=True)
                elif result["type"] == "stable":
                    # 稳定结果,不会再变
                    print(f"\n[确认] {result['text']}")
                elif result["type"] == "final":
                    print(f"\n[最终] {result['text']}")
        
        recv_task = asyncio.create_task(receiver())
        
        # 采集并发送音频
        try:
            print("开始录音,按Ctrl+C停止...")
            while True:
                data = stream.read(CHUNK, exception_on_overflow=False)
                await ws.send(data)
                await asyncio.sleep(0.01)  # 让出控制权给接收协程
        except KeyboardInterrupt:
            # 发送结束信号
            await ws.send(b"")
            await asyncio.sleep(1)  # 等待最终结果
        finally:
            recv_task.cancel()
            stream.stop_stream()
            stream.close()
            audio.terminate()

asyncio.run(stream_microphone())

VAD分段策略

Voice Activity Detection(语音活动检测)是流式转写的关键组件。它决定了何时将音频片段送入ASR模型。

为什么需要VAD:不能简单地按固定时长切分音频。固定切分可能把一个词切成两半,导致识别错误。VAD检测语音和静音的边界,在自然停顿处切分。

我使用的方案——Silero VAD

import torch
import numpy as np

class VADSegmenter:
    """基于Silero VAD的音频分段器"""
    
    def __init__(self, sample_rate=16000):
        self.model, self.utils = torch.hub.load(
            'snakers4/silero-vad', 'silero_vad', trust_repo=True
        )
        self.sample_rate = sample_rate
        self.buffer = np.array([], dtype=np.float32)
        self.speech_buffer = np.array([], dtype=np.float32)
        self.is_speaking = False
        
        # 参数
        self.min_speech_duration = 0.5   # 最短语音段(秒)
        self.max_speech_duration = 15.0  # 最长语音段(秒)
        self.silence_threshold = 0.6     # 静音判定阈值(秒)
        self.window_size = 512           # VAD检测窗口(采样数)
        
        self.silence_count = 0
    
    def feed(self, audio_chunk: np.ndarray):
        """
        输入音频数据,返回完整的语音段列表
        audio_chunk: float32 numpy array, [-1, 1]
        """
        self.buffer = np.concatenate([self.buffer, audio_chunk])
        segments = []
        
        while len(self.buffer) >= self.window_size:
            window = self.buffer[:self.window_size]
            self.buffer = self.buffer[self.window_size:]
            
            # VAD检测
            tensor = torch.from_numpy(window)
            speech_prob = self.model(tensor, self.sample_rate).item()
            
            if speech_prob > 0.5:
                # 检测到语音
                self.is_speaking = True
                self.silence_count = 0
                self.speech_buffer = np.concatenate([
                    self.speech_buffer, window
                ])
            else:
                if self.is_speaking:
                    self.silence_count += 1
                    self.speech_buffer = np.concatenate([
                        self.speech_buffer, window
                    ])
                    
                    silence_duration = (self.silence_count * self.window_size 
                                       / self.sample_rate)
                    speech_duration = (len(self.speech_buffer) 
                                      / self.sample_rate)
                    
                    # 静音超过阈值,或语音段超过最大长度
                    if (silence_duration >= self.silence_threshold or 
                        speech_duration >= self.max_speech_duration):
                        
                        if speech_duration >= self.min_speech_duration:
                            segments.append(self.speech_buffer.copy())
                        
                        self.speech_buffer = np.array([], dtype=np.float32)
                        self.is_speaking = False
                        self.silence_count = 0
        
        return segments
    
    def flush(self):
        """结束时获取剩余的语音段"""
        if len(self.speech_buffer) > 0:
            duration = len(self.speech_buffer) / self.sample_rate
            if duration >= self.min_speech_duration:
                segment = self.speech_buffer.copy()
                self.speech_buffer = np.array([], dtype=np.float32)
                return [segment]
        return []

分段策略的关键参数:

  • silence_threshold:多长的静音算"说完了"。太短会在正常停顿时切断,太长延迟高。0.5-0.8秒是比较好的范围。
  • max_speech_duration:防止有人一直说不停导致片段太长。15秒是个不错的上限。
  • min_speech_duration:过滤掉太短的噪声片段。0.3-0.5秒。

FasterWhisper推理优化

模型推理用FasterWhisper,它是Whisper的CTranslate2优化版本,速度比原版快4倍:

from faster_whisper import WhisperModel

class StreamingTranscriber:
    """流式转写引擎"""
    
    def __init__(self, model_size="large-v2", language="zh",
                 device="cuda", compute_type="float16"):
        self.model = WhisperModel(
            model_size, 
            device=device,
            compute_type=compute_type,  # GPU用float16,CPU用int8
        )
        self.language = language
        self.vad = VADSegmenter()
        
        # 结果管理
        self.stable_segments = []    # 已确认的结果
        self.pending_audio = None    # 待确认的音频片段
        self.total_duration = 0.0    # 已处理的总时长
    
    def feed(self, raw_bytes: bytes):
        """
        输入原始PCM音频数据
        返回转写结果列表
        """
        # bytes → float32 numpy array
        audio = np.frombuffer(raw_bytes, dtype=np.int16).astype(np.float32)
        audio = audio / 32768.0  # 归一化到[-1, 1]
        
        # VAD分段
        segments = self.vad.feed(audio)
        results = []
        
        for segment in segments:
            # 转写这个语音段
            text = self._transcribe(segment)
            if text.strip():
                duration = len(segment) / 16000
                result = {
                    "type": "stable",
                    "text": text,
                    "start": self.total_duration,
                    "end": self.total_duration + duration,
                }
                results.append(result)
                self.stable_segments.append(result)
                self.total_duration += duration
        
        return results
    
    def _transcribe(self, audio: np.ndarray) -> str:
        """对一个音频段执行转写"""
        segments, info = self.model.transcribe(
            audio,
            language=self.language,
            beam_size=5,
            best_of=5,
            vad_filter=False,  # 我们自己做了VAD
            word_timestamps=False,
            condition_on_previous_text=True,
            initial_prompt="以下是一段中文语音转写。",
        )
        
        text = "".join(seg.text for seg in segments)
        return text.strip()
    
    def finalize(self):
        """结束转写,处理剩余音频"""
        remaining = self.vad.flush()
        for segment in remaining:
            text = self._transcribe(segment)
            if text.strip():
                self.stable_segments.append({
                    "type": "final",
                    "text": text,
                })
        
        full_text = "".join(s["text"] for s in self.stable_segments)
        return full_text
    
    def cleanup(self):
        """清理资源"""
        self.stable_segments.clear()
        self.total_duration = 0.0

FasterWhisper的关键优化参数:

  • compute_type:GPU用float16,CPU用int8。int8量化在CPU上速度提升明显,精度损失很小。
  • beam_size:束搜索宽度。5是默认值,减到1可以加速但质量下降。
  • condition_on_previous_text:利用上一段的文字作为上下文,提高连贯性。但有时会导致幻觉(重复上一段内容),需要做后处理过滤。
  • initial_prompt:给模型一个初始提示,帮助它确定语言和风格。中文场景加上中文提示效果更好。

部分结果实时返回

上面的实现是等到VAD切出完整段才返回结果。为了更低的延迟,可以加入"部分结果"机制——在语音段还没结束时就先转写已有的部分,返回一个可能被修正的中间结果:

class StreamingTranscriberV2(StreamingTranscriber):
    """支持部分结果的流式转写引擎"""
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.partial_interval = 1.0  # 每隔1秒输出一次部分结果
        self.last_partial_time = 0
        self.accumulated_audio = np.array([], dtype=np.float32)
    
    def feed(self, raw_bytes: bytes):
        audio = np.frombuffer(raw_bytes, dtype=np.int16).astype(np.float32)
        audio = audio / 32768.0
        
        self.accumulated_audio = np.concatenate([
            self.accumulated_audio, audio
        ])
        
        segments = self.vad.feed(audio)
        results = []
        
        if segments:
            # 有完整段了,返回stable结果
            for segment in segments:
                text = self._transcribe(segment)
                if text.strip():
                    duration = len(segment) / 16000
                    results.append({
                        "type": "stable",
                        "text": text,
                        "start": self.total_duration,
                        "end": self.total_duration + duration,
                    })
                    self.total_duration += duration
            self.accumulated_audio = np.array([], dtype=np.float32)
            self.last_partial_time = 0
        
        elif self.vad.is_speaking:
            # 正在说话但还没切段,检查是否该输出部分结果
            current_duration = len(self.vad.speech_buffer) / 16000
            if (current_duration - self.last_partial_time 
                    >= self.partial_interval 
                    and current_duration >= 0.5):
                
                partial_text = self._transcribe(self.vad.speech_buffer)
                if partial_text.strip():
                    results.append({
                        "type": "partial",
                        "text": partial_text,
                        "start": self.total_duration,
                        "end": self.total_duration + current_duration,
                    })
                self.last_partial_time = current_duration
        
        return results

部分结果的体验类似手机语音输入——文字先出来,可能会跳动修正,最终稳定。客户端收到partial类型的结果时覆盖显示,收到stable类型时追加。

性能参考

使用 FasterWhisper large-v2(float16)在配备独立 GPU 的环境中:

  • 单段转写延迟:0.3-0.8秒(取决于音频长度)
  • 端到端延迟(从说完到看到结果):1-2秒
  • 实时率(RTF):约0.1(处理10秒音频只需1秒)
  • GPU显存占用:约2.5GB

CPU 模式(int8 量化)在现代多核处理器上:

  • RTF 约 0.5-0.8
  • 基本能做到实时,但余量不大

总结

流式转写的核心是三个组件的配合:WebSocket传输、VAD智能分段、FasterWhisper高效推理。VAD的分段策略对最终效果影响最大——切分点选得好,转写质量和离线模式差距很小。部分结果机制虽然增加了复杂度,但对用户体验的提升很明显。