语音转文字服务化:搭建转写API

之前在本地跑Whisper做语音转写,效果不错但只能自己用。这篇把Whisper包装成一个HTTP API服务,支持音频上传、异步转写、WebSocket实时推送结果,做成一个可以给其他系统调用的转写微服务。

整体架构

客户端 → FastAPI (上传音频) → Celery Worker (转写任务) → Redis (结果存储)
  ↑                                                          ↓
  └──────── WebSocket / 轮询 ←──────────────────────────────────┘

核心组件:

  • FastAPI:接收音频上传,创建转写任务
  • Celery + Redis:异步任务队列,Whisper推理放在Worker里
  • Redis:存储任务状态和转写结果
  • WebSocket:可选的实时推送通道

API设计

先定义接口:

POST /api/v1/transcribe
  - 上传音频文件
  - 返回 task_id

GET /api/v1/tasks/{task_id}
  - 查询任务状态和结果
  - 状态:pending / processing / completed / failed

WebSocket /ws/tasks/{task_id}
  - 实时推送转写进度

FastAPI服务搭建

# app/main.py
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.websockets import WebSocket
import uuid
import aiofiles
import os

from app.tasks import transcribe_audio
from app.redis_client import get_task_status, get_task_result

app = FastAPI(title="Whisper Transcription API")

UPLOAD_DIR = "/tmp/whisper_uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)

ALLOWED_TYPES = {
    "audio/wav", "audio/mpeg", "audio/mp4", "audio/ogg",
    "audio/webm", "audio/flac", "video/mp4", "video/webm"
}
MAX_FILE_SIZE = 100 * 1024 * 1024  # 100MB

@app.post("/api/v1/transcribe")
async def create_transcription(
    file: UploadFile = File(...),
    language: str = None,
    model_size: str = "base"
):
    # 校验文件类型
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(400, f"Unsupported file type: {file.content_type}")

    # 生成任务ID,保存文件
    task_id = str(uuid.uuid4())
    ext = os.path.splitext(file.filename)[1] or ".wav"
    file_path = os.path.join(UPLOAD_DIR, f"{task_id}{ext}")

    async with aiofiles.open(file_path, "wb") as f:
        content = await file.read()
        if len(content) > MAX_FILE_SIZE:
            raise HTTPException(413, "File too large")
        await f.write(content)

    # 投递异步任务
    transcribe_audio.delay(task_id, file_path, language, model_size)

    return {"task_id": task_id, "status": "pending"}

@app.get("/api/v1/tasks/{task_id}")
async def get_task(task_id: str):
    status = get_task_status(task_id)
    if status is None:
        raise HTTPException(404, "Task not found")

    result = {"task_id": task_id, "status": status}
    if status == "completed":
        result["result"] = get_task_result(task_id)
    elif status == "failed":
        result["error"] = get_task_result(task_id)
    return result

音频格式处理

用户上传的音频格式五花八门,需要统一转换成Whisper能处理的格式。用ffmpeg做转码:

# app/audio.py
import subprocess
import os

def convert_to_wav(input_path: str) -> str:
    \"\"\"将任意音频/视频转换为16kHz单声道WAV\"\"\"
    output_path = input_path.rsplit(".", 1)[0] + "_converted.wav"

    cmd = [
        "ffmpeg", "-i", input_path,
        "-ar", "16000",       # 16kHz采样率
        "-ac", "1",           # 单声道
        "-c:a", "pcm_s16le",  # 16bit PCM
        "-y",                 # 覆盖输出
        output_path
    ]

    result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
    if result.returncode != 0:
        raise RuntimeError(f"ffmpeg failed: {result.stderr}")

    return output_path

def get_audio_duration(file_path: str) -> float:
    \"\"\"获取音频时长(秒)\"\"\"
    cmd = [
        "ffprobe", "-v", "error",
        "-show_entries", "format=duration",
        "-of", "default=noprint_wrappers=1:nokey=1",
        file_path
    ]
    result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
    return float(result.stdout.strip())

ffmpeg 是音频处理的瑞士军刀,基本上能遇到的格式它都能转。16kHz 单声道是 Whisper 的标准输入格式。

Celery异步任务

转写是耗时操作,必须异步处理。用Celery + Redis做任务队列:

# app/tasks.py
from celery import Celery
import whisper
import os
import json

from app.audio import convert_to_wav, get_audio_duration
from app.redis_client import set_task_status, set_task_result

celery_app = Celery("whisper_worker", broker="redis://localhost:6379/0")
celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    task_time_limit=600,      # 单任务最长10分钟
    worker_max_tasks_per_child=50,  # 防内存泄漏
    worker_prefetch_multiplier=1,   # GPU任务不要预取
)

# 预加载模型(Worker启动时加载一次)
_models = {}

def get_model(model_size: str):
    if model_size not in _models:
        _models[model_size] = whisper.load_model(model_size)
    return _models[model_size]

@celery_app.task(bind=True, max_retries=2)
def transcribe_audio(self, task_id: str, file_path: str,
                     language: str = None, model_size: str = "base"):
    try:
        set_task_status(task_id, "processing")

        # 转换音频格式
        wav_path = convert_to_wav(file_path)
        duration = get_audio_duration(wav_path)

        # 加载模型并转写
        model = get_model(model_size)
        options = {}
        if language:
            options["language"] = language

        result = model.transcribe(wav_path, **options)

        # 构建结果
        output = {
            "text": result["text"],
            "segments": [
                {
                    "start": seg["start"],
                    "end": seg["end"],
                    "text": seg["text"].strip()
                }
                for seg in result["segments"]
            ],
            "language": result.get("language", "unknown"),
            "duration": duration
        }

        set_task_status(task_id, "completed")
        set_task_result(task_id, output)

    except Exception as e:
        set_task_status(task_id, "failed")
        set_task_result(task_id, str(e))
        raise

    finally:
        # 清理临时文件
        for f in [file_path, file_path.rsplit(".", 1)[0] + "_converted.wav"]:
            if os.path.exists(f):
                os.remove(f)

关键点:模型在Worker进程启动时预加载到GPU,避免每次任务都重新加载(Whisper base模型加载要几秒,large模型更久)。worker_prefetch_multiplier=1 很重要——GPU任务不能预取,否则第二个任务会因为显存不足报错。

WebSocket实时推送

对于长音频,客户端可能不想一直轮询。提供WebSocket接口,任务完成时主动推送:

# app/main.py (续)
import asyncio
from starlette.websockets import WebSocketDisconnect

@app.websocket("/ws/tasks/{task_id}")
async def task_websocket(websocket: WebSocket, task_id: str):
    await websocket.accept()

    try:
        while True:
            status = get_task_status(task_id)
            if status is None:
                await websocket.send_json({"error": "Task not found"})
                break

            if status == "completed":
                result = get_task_result(task_id)
                await websocket.send_json({
                    "status": "completed",
                    "result": result
                })
                break
            elif status == "failed":
                error = get_task_result(task_id)
                await websocket.send_json({
                    "status": "failed",
                    "error": error
                })
                break
            else:
                await websocket.send_json({"status": status})

            await asyncio.sleep(1)  # 每秒检查一次

    except WebSocketDisconnect:
        pass
    finally:
        await websocket.close()

Redis状态管理

# app/nedis_client.py
import redis
import json

r = redis.Redis(host="localhost", port=6379, db=1, decode_responses=True)
TTL = 3600 * 2  # 结果保留2小时

def set_task_status(task_id: str, status: str):
    r.set(f"task:{task_id}:status", status, ex=TTL)

def get_task_status(task_id: str) -> str:
    return r.get(f"task:{task_id}:status")

def set_task_result(task_id: str, result):
    r.set(f"task:{task_id}:result", json.dumps(result, ensure_ascii=False), ex=TTL)

def get_task_result(task_id: str):
    data = r.get(f"task:{task_id}:result")
    return json.loads(data) if data else None

部署

用Docker Compose把所有组件编排起来:

# docker-compose.yml
version: "3.8"
services:
  api:
    build: .
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000
    ports:
      - "8000:8000"
    depends_on:
      - redis
    volumes:
      - upload_tmp:/tmp/whisper_uploads

  worker:
    build: .
    command: celery -A app.tasks worker --concurrency=1 --pool=solo -Q transcribe
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
    depends_on:
      - redis
    volumes:
      - upload_tmp:/tmp/whisper_uploads

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

volumes:
  upload_tmp:

Worker的concurrency设为1,因为一张GPU同时只能跑一个推理任务。如果有多张卡,可以起多个Worker实例。

调用示例

import requests
import time

# 上传音频
with open("meeting.mp3", "rb") as f:
    resp = requests.post(
        "http://localhost:8000/api/v1/transcribe",
        files={"file": ("meeting.mp3", f, "audio/mpeg")},
        data={"model_size": "base", "language": "zh"}
    )
task_id = resp.json()["task_id"]

# 轮询等待结果
while True:
    resp = requests.get(f"http://localhost:8000/api/v1/tasks/{task_id}")
    data = resp.json()
    if data["status"] == "completed":
        print(data["result"]["text"])
        break
    elif data["status"] == "failed":
        print("Failed:", data["error"])
        break
    time.sleep(2)

后续可以改进的点

  • 大文件分片上传,避免一次性加载到内存
  • 支持长音频自动分段,多Worker并行转写再合并
  • 加上用户认证和API Key管理
  • 结果持久化到数据库,支持历史查询
  • 接入faster-whisper替换原版,推理速度快4倍
  • 加上说话人分离(Speaker Diarization)

服务化之后,其他系统只需要调HTTP接口就能用上语音转写能力,比直接集成Whisper库方便很多。