Python异步编程:asyncio入门

asyncio是Python标准库的异步I/O框架,适合处理大量网络请求、文件I/O等等待密集型任务,用一个线程就能实现高并发。

为什么需要异步

假设你要请求100个URL。同步方式一个接一个地等待,大部分时间都在等网络响应。多线程可以解决,但线程切换有开销,且受GIL限制。

asyncio用事件循环(Event Loop)调度协程(coroutine)——当一个协程在等I/O时,事件循环切换到另一个协程执行,没有线程切换开销。

基本语法

import asyncio

async def say_hello(name: str, delay: float):
    # async def 定义协程函数
    await asyncio.sleep(delay)  # await 暂停当前协程,交出控制权
    print(f"Hello, {name}!")

async def main():
    # 顺序执行
    await say_hello("Alice", 1)
    await say_hello("Bob", 1)
    # 总耗时约2秒

asyncio.run(main())  # Python 3.7+ 启动事件循环

async def定义协程函数,调用它返回一个协程对象(不会立即执行)。await暂停当前协程直到等待的操作完成。

asyncio.gather并发执行

import asyncio
import time

async def fetch_data(name: str, delay: float) -> str:
    print(f"[{name}] Start fetching...")
    await asyncio.sleep(delay)  # 模拟网络请求
    print(f"[{name}] Done.")
    return f"{name}: data"

async def main():
    start = time.time()
    
    # gather并发执行多个协程
    results = await asyncio.gather(
        fetch_data("API-1", 2),
        fetch_data("API-2", 1),
        fetch_data("API-3", 3),
    )
    
    elapsed = time.time() - start
    print(f"Results: {results}")
    print(f"Total time: {elapsed:.1f}s")  # 约3秒,而非2+1+3=6秒

asyncio.run(main())

asyncio.gather同时启动多个协程,等待所有完成后返回结果列表。总耗时等于最慢的那个。

aiohttp:异步HTTP请求

requests库是同步的,异步场景用aiohttp:

import asyncio
import aiohttp

async def fetch_url(session: aiohttp.ClientSession, url: str) -> int:
    async with session.get(url) as response:
        text = await response.text()
        return response.status

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, status in zip(urls, results):
            print(f"{url} -> {status}")

asyncio.run(main())

关键点:

  • aiohttp.ClientSession要用async with管理生命周期
  • 复用一个session发起多个请求,而不是每次创建新的
  • session内部会维护连接池

异步迭代与上下文管理

import asyncio

async def async_range(n):
    # 异步生成器
    for i in range(n):
        await asyncio.sleep(0.1)
        yield i

async def main():
    # async for 遍历异步迭代器
    async for num in async_range(5):
        print(num)

asyncio.run(main())

控制并发量

不加限制地并发可能压垮目标服务器,用Semaphore控制:

import asyncio
import aiohttp

async def fetch_with_limit(
    sem: asyncio.Semaphore,
    session: aiohttp.ClientSession,
    url: str
) -> str:
    async with sem:  # 同时最多N个协程进入
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    sem = asyncio.Semaphore(10)  # 最多10个并发
    urls = [f"https://httpbin.org/get?id={i}" for i in range(100)]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(sem, session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"Fetched {len(results)} pages")

asyncio.run(main())

异步 vs 多线程

维度 asyncio 多线程
适用场景 I/O密集型 I/O密集型 / 少量CPU
线程数 单线程 多线程
切换开销 极低(协程切换) 较高(线程上下文切换)
竞态条件 不存在(单线程) 需要锁
代码风格 async/await threading + 锁
调试难度 中(异步堆栈) 高(竞态、死锁)

一般建议:纯I/O等待场景优先用asyncio,需要并行计算用multiprocessing,两者之间的灰色地带用threading。

常见陷阱

  1. 在协程里调用同步阻塞函数(如time.sleeprequests.get)会阻塞整个事件循环。解决方案是用asyncio.to_thread()loop.run_in_executor()包装。
import asyncio
import time

async def bad():
    time.sleep(5)  # 阻塞事件循环!

async def good():
    await asyncio.to_thread(time.sleep, 5)  # Python 3.9+
  1. 忘记await:调用协程函数不加await只会得到一个协程对象,不会执行。
  2. gather异常处理:默认一个任务抛异常,其他任务仍在运行。设置return_exceptions=True可以让异常作为结果返回而不是抛出。

asyncio的学习曲线比同步代码陡一些,但对于网络密集型应用,性能提升是值得的。