之前在本地跑Whisper做语音转写,效果不错但只能自己用。这篇把Whisper包装成一个HTTP API服务,支持音频上传、异步转写、WebSocket实时推送结果,做成一个可以给其他系统调用的转写微服务。
整体架构
客户端 → FastAPI (上传音频) → Celery Worker (转写任务) → Redis (结果存储)
↑ ↓
└──────── WebSocket / 轮询 ←──────────────────────────────────┘
核心组件:
- FastAPI:接收音频上传,创建转写任务
- Celery + Redis:异步任务队列,Whisper推理放在Worker里
- Redis:存储任务状态和转写结果
- WebSocket:可选的实时推送通道
API设计
先定义接口:
POST /api/v1/transcribe
- 上传音频文件
- 返回 task_id
GET /api/v1/tasks/{task_id}
- 查询任务状态和结果
- 状态:pending / processing / completed / failed
WebSocket /ws/tasks/{task_id}
- 实时推送转写进度
FastAPI服务搭建
# app/main.py
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.websockets import WebSocket
import uuid
import aiofiles
import os
from app.tasks import transcribe_audio
from app.redis_client import get_task_status, get_task_result
app = FastAPI(title="Whisper Transcription API")
UPLOAD_DIR = "/tmp/whisper_uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
ALLOWED_TYPES = {
"audio/wav", "audio/mpeg", "audio/mp4", "audio/ogg",
"audio/webm", "audio/flac", "video/mp4", "video/webm"
}
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB
@app.post("/api/v1/transcribe")
async def create_transcription(
file: UploadFile = File(...),
language: str = None,
model_size: str = "base"
):
# 校验文件类型
if file.content_type not in ALLOWED_TYPES:
raise HTTPException(400, f"Unsupported file type: {file.content_type}")
# 生成任务ID,保存文件
task_id = str(uuid.uuid4())
ext = os.path.splitext(file.filename)[1] or ".wav"
file_path = os.path.join(UPLOAD_DIR, f"{task_id}{ext}")
async with aiofiles.open(file_path, "wb") as f:
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(413, "File too large")
await f.write(content)
# 投递异步任务
transcribe_audio.delay(task_id, file_path, language, model_size)
return {"task_id": task_id, "status": "pending"}
@app.get("/api/v1/tasks/{task_id}")
async def get_task(task_id: str):
status = get_task_status(task_id)
if status is None:
raise HTTPException(404, "Task not found")
result = {"task_id": task_id, "status": status}
if status == "completed":
result["result"] = get_task_result(task_id)
elif status == "failed":
result["error"] = get_task_result(task_id)
return result
音频格式处理
用户上传的音频格式五花八门,需要统一转换成Whisper能处理的格式。用ffmpeg做转码:
# app/audio.py
import subprocess
import os
def convert_to_wav(input_path: str) -> str:
\"\"\"将任意音频/视频转换为16kHz单声道WAV\"\"\"
output_path = input_path.rsplit(".", 1)[0] + "_converted.wav"
cmd = [
"ffmpeg", "-i", input_path,
"-ar", "16000", # 16kHz采样率
"-ac", "1", # 单声道
"-c:a", "pcm_s16le", # 16bit PCM
"-y", # 覆盖输出
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg failed: {result.stderr}")
return output_path
def get_audio_duration(file_path: str) -> float:
\"\"\"获取音频时长(秒)\"\"\"
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
file_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return float(result.stdout.strip())
ffmpeg 是音频处理的瑞士军刀,基本上能遇到的格式它都能转。16kHz 单声道是 Whisper 的标准输入格式。
Celery异步任务
转写是耗时操作,必须异步处理。用Celery + Redis做任务队列:
# app/tasks.py
from celery import Celery
import whisper
import os
import json
from app.audio import convert_to_wav, get_audio_duration
from app.redis_client import set_task_status, set_task_result
celery_app = Celery("whisper_worker", broker="redis://localhost:6379/0")
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
task_time_limit=600, # 单任务最长10分钟
worker_max_tasks_per_child=50, # 防内存泄漏
worker_prefetch_multiplier=1, # GPU任务不要预取
)
# 预加载模型(Worker启动时加载一次)
_models = {}
def get_model(model_size: str):
if model_size not in _models:
_models[model_size] = whisper.load_model(model_size)
return _models[model_size]
@celery_app.task(bind=True, max_retries=2)
def transcribe_audio(self, task_id: str, file_path: str,
language: str = None, model_size: str = "base"):
try:
set_task_status(task_id, "processing")
# 转换音频格式
wav_path = convert_to_wav(file_path)
duration = get_audio_duration(wav_path)
# 加载模型并转写
model = get_model(model_size)
options = {}
if language:
options["language"] = language
result = model.transcribe(wav_path, **options)
# 构建结果
output = {
"text": result["text"],
"segments": [
{
"start": seg["start"],
"end": seg["end"],
"text": seg["text"].strip()
}
for seg in result["segments"]
],
"language": result.get("language", "unknown"),
"duration": duration
}
set_task_status(task_id, "completed")
set_task_result(task_id, output)
except Exception as e:
set_task_status(task_id, "failed")
set_task_result(task_id, str(e))
raise
finally:
# 清理临时文件
for f in [file_path, file_path.rsplit(".", 1)[0] + "_converted.wav"]:
if os.path.exists(f):
os.remove(f)
关键点:模型在Worker进程启动时预加载到GPU,避免每次任务都重新加载(Whisper base模型加载要几秒,large模型更久)。worker_prefetch_multiplier=1 很重要——GPU任务不能预取,否则第二个任务会因为显存不足报错。
WebSocket实时推送
对于长音频,客户端可能不想一直轮询。提供WebSocket接口,任务完成时主动推送:
# app/main.py (续)
import asyncio
from starlette.websockets import WebSocketDisconnect
@app.websocket("/ws/tasks/{task_id}")
async def task_websocket(websocket: WebSocket, task_id: str):
await websocket.accept()
try:
while True:
status = get_task_status(task_id)
if status is None:
await websocket.send_json({"error": "Task not found"})
break
if status == "completed":
result = get_task_result(task_id)
await websocket.send_json({
"status": "completed",
"result": result
})
break
elif status == "failed":
error = get_task_result(task_id)
await websocket.send_json({
"status": "failed",
"error": error
})
break
else:
await websocket.send_json({"status": status})
await asyncio.sleep(1) # 每秒检查一次
except WebSocketDisconnect:
pass
finally:
await websocket.close()
Redis状态管理
# app/nedis_client.py
import redis
import json
r = redis.Redis(host="localhost", port=6379, db=1, decode_responses=True)
TTL = 3600 * 2 # 结果保留2小时
def set_task_status(task_id: str, status: str):
r.set(f"task:{task_id}:status", status, ex=TTL)
def get_task_status(task_id: str) -> str:
return r.get(f"task:{task_id}:status")
def set_task_result(task_id: str, result):
r.set(f"task:{task_id}:result", json.dumps(result, ensure_ascii=False), ex=TTL)
def get_task_result(task_id: str):
data = r.get(f"task:{task_id}:result")
return json.loads(data) if data else None
部署
用Docker Compose把所有组件编排起来:
# docker-compose.yml
version: "3.8"
services:
api:
build: .
command: uvicorn app.main:app --host 0.0.0.0 --port 8000
ports:
- "8000:8000"
depends_on:
- redis
volumes:
- upload_tmp:/tmp/whisper_uploads
worker:
build: .
command: celery -A app.tasks worker --concurrency=1 --pool=solo -Q transcribe
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
depends_on:
- redis
volumes:
- upload_tmp:/tmp/whisper_uploads
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
upload_tmp:
Worker的concurrency设为1,因为一张GPU同时只能跑一个推理任务。如果有多张卡,可以起多个Worker实例。
调用示例
import requests
import time
# 上传音频
with open("meeting.mp3", "rb") as f:
resp = requests.post(
"http://localhost:8000/api/v1/transcribe",
files={"file": ("meeting.mp3", f, "audio/mpeg")},
data={"model_size": "base", "language": "zh"}
)
task_id = resp.json()["task_id"]
# 轮询等待结果
while True:
resp = requests.get(f"http://localhost:8000/api/v1/tasks/{task_id}")
data = resp.json()
if data["status"] == "completed":
print(data["result"]["text"])
break
elif data["status"] == "failed":
print("Failed:", data["error"])
break
time.sleep(2)
后续可以改进的点
- 大文件分片上传,避免一次性加载到内存
- 支持长音频自动分段,多Worker并行转写再合并
- 加上用户认证和API Key管理
- 结果持久化到数据库,支持历史查询
- 接入faster-whisper替换原版,推理速度快4倍
- 加上说话人分离(Speaker Diarization)
服务化之后,其他系统只需要调HTTP接口就能用上语音转写能力,比直接集成Whisper库方便很多。