asyncio是Python标准库的异步I/O框架,适合处理大量网络请求、文件I/O等等待密集型任务,用一个线程就能实现高并发。
为什么需要异步
假设你要请求100个URL。同步方式一个接一个地等待,大部分时间都在等网络响应。多线程可以解决,但线程切换有开销,且受GIL限制。
asyncio用事件循环(Event Loop)调度协程(coroutine)——当一个协程在等I/O时,事件循环切换到另一个协程执行,没有线程切换开销。
基本语法
import asyncio
async def say_hello(name: str, delay: float):
# async def 定义协程函数
await asyncio.sleep(delay) # await 暂停当前协程,交出控制权
print(f"Hello, {name}!")
async def main():
# 顺序执行
await say_hello("Alice", 1)
await say_hello("Bob", 1)
# 总耗时约2秒
asyncio.run(main()) # Python 3.7+ 启动事件循环
async def定义协程函数,调用它返回一个协程对象(不会立即执行)。await暂停当前协程直到等待的操作完成。
asyncio.gather并发执行
import asyncio
import time
async def fetch_data(name: str, delay: float) -> str:
print(f"[{name}] Start fetching...")
await asyncio.sleep(delay) # 模拟网络请求
print(f"[{name}] Done.")
return f"{name}: data"
async def main():
start = time.time()
# gather并发执行多个协程
results = await asyncio.gather(
fetch_data("API-1", 2),
fetch_data("API-2", 1),
fetch_data("API-3", 3),
)
elapsed = time.time() - start
print(f"Results: {results}")
print(f"Total time: {elapsed:.1f}s") # 约3秒,而非2+1+3=6秒
asyncio.run(main())
asyncio.gather同时启动多个协程,等待所有完成后返回结果列表。总耗时等于最慢的那个。
aiohttp:异步HTTP请求
requests库是同步的,异步场景用aiohttp:
import asyncio
import aiohttp
async def fetch_url(session: aiohttp.ClientSession, url: str) -> int:
async with session.get(url) as response:
text = await response.text()
return response.status
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/get",
"https://httpbin.org/ip",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, status in zip(urls, results):
print(f"{url} -> {status}")
asyncio.run(main())
关键点:
aiohttp.ClientSession要用async with管理生命周期- 复用一个session发起多个请求,而不是每次创建新的
- session内部会维护连接池
异步迭代与上下文管理
import asyncio
async def async_range(n):
# 异步生成器
for i in range(n):
await asyncio.sleep(0.1)
yield i
async def main():
# async for 遍历异步迭代器
async for num in async_range(5):
print(num)
asyncio.run(main())
控制并发量
不加限制地并发可能压垮目标服务器,用Semaphore控制:
import asyncio
import aiohttp
async def fetch_with_limit(
sem: asyncio.Semaphore,
session: aiohttp.ClientSession,
url: str
) -> str:
async with sem: # 同时最多N个协程进入
async with session.get(url) as resp:
return await resp.text()
async def main():
sem = asyncio.Semaphore(10) # 最多10个并发
urls = [f"https://httpbin.org/get?id={i}" for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(sem, session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} pages")
asyncio.run(main())
异步 vs 多线程
| 维度 | asyncio | 多线程 |
|---|---|---|
| 适用场景 | I/O密集型 | I/O密集型 / 少量CPU |
| 线程数 | 单线程 | 多线程 |
| 切换开销 | 极低(协程切换) | 较高(线程上下文切换) |
| 竞态条件 | 不存在(单线程) | 需要锁 |
| 代码风格 | async/await | threading + 锁 |
| 调试难度 | 中(异步堆栈) | 高(竞态、死锁) |
一般建议:纯I/O等待场景优先用asyncio,需要并行计算用multiprocessing,两者之间的灰色地带用threading。
常见陷阱
- 在协程里调用同步阻塞函数(如
time.sleep、requests.get)会阻塞整个事件循环。解决方案是用asyncio.to_thread()或loop.run_in_executor()包装。
import asyncio
import time
async def bad():
time.sleep(5) # 阻塞事件循环!
async def good():
await asyncio.to_thread(time.sleep, 5) # Python 3.9+
- 忘记await:调用协程函数不加await只会得到一个协程对象,不会执行。
- gather异常处理:默认一个任务抛异常,其他任务仍在运行。设置
return_exceptions=True可以让异常作为结果返回而不是抛出。
asyncio的学习曲线比同步代码陡一些,但对于网络密集型应用,性能提升是值得的。