Rust异步编程:tokio入门

Rust 的异步生态围绕 async/await 语法和 Future trait 构建,而 tokio 是最主流的异步运行时。这篇从基础用法到实际案例,记录 tokio 的核心概念。

async/await 基础

Rust 的 async fn 返回一个实现了 Future trait 的值,但 Future 本身不会自动执行——需要一个运行时(runtime)来驱动它:

async fn hello() -> String {
    "hello, async world".to_string()
}

// 这只是创建了一个 Future,并没有执行
let future = hello();

// 需要 .await 或交给运行时来执行

tokio::main 宏

最简单的方式是用 #[tokio::main] 宏:

use tokio;

#[tokio::main]
async fn main() {
    let result = hello().await;
    println!("{}", result);
}

async fn hello() -> String {
    // 模拟异步操作
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    "hello from tokio".to_string()
}

#[tokio::main] 展开后大致等价于:

fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            // your async main body
        })
}

spawn 创建并发任务

tokio::spawn 将一个 Future 提交给运行时,返回一个 JoinHandle

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let task1 = tokio::spawn(async {
        sleep(Duration::from_secs(2)).await;
        println!("task 1 done");
        42
    });

    let task2 = tokio::spawn(async {
        sleep(Duration::from_secs(1)).await;
        println!("task 2 done");
        "hello"
    });

    // 等待两个任务完成
    let result1 = task1.await.unwrap();
    let result2 = task2.await.unwrap();
    println!("results: {}, {}", result1, result2);
}

spawn 出去的任务会并发执行。注意 spawn 的 Future 必须是 'static 的——不能借用局部变量。如果需要共享数据,用 Arc

tokio::select!

select! 同时等待多个异步操作,第一个完成的分支会被执行,其余的被取消:

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    // 模拟一个生产者
    tokio::spawn(async move {
        sleep(Duration::from_millis(500)).await;
        tx.send("data arrived".to_string()).await.unwrap();
    });

    // select! 等待多个分支
    tokio::select! {
        msg = rx.recv() => {
            println!("received: {:?}", msg);
        }
        _ = sleep(Duration::from_secs(5)) => {
            println!("timeout!");
        }
    }
}

select! 是写超时逻辑、多路复用、优雅关闭的利器。

异步 TCP Server 示例

一个简单的 echo server:

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("listening on 127.0.0.1:8080");

    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("new connection from {}", addr);

        tokio::spawn(async move {
            let mut buf = [0u8; 1024];
            loop {
                let n = match socket.read(&mut buf).await {
                    Ok(0) => {
                        println!("{} disconnected", addr);
                        return;
                    }
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("read error from {}: {}", addr, e);
                        return;
                    }
                };

                if let Err(e) = socket.write_all(&buf[..n]).await {
                    eprintln!("write error to {}: {}", addr, e);
                    return;
                }
            }
        });
    }
}

每个连接都 spawn 一个独立的任务处理,tokio 的调度器会高效地在线程池上调度这些任务。相比 OS 线程,tokio 的异步任务开销极小——可以轻松创建数万个并发连接。

tokio::sync 同步原语

异步代码中不能直接用 std::sync::Mutex(它会阻塞整个线程),需要用 tokio 提供的异步版本:

Mutex

use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0u64));

    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(tokio::spawn(async move {
            for _ in 0..1000 {
                let mut lock = counter.lock().await;
                *lock += 1;
            }
        }));
    }

    for h in handles {
        h.await.unwrap();
    }

    println!("counter = {}", *counter.lock().await);
}

注意:如果锁的持有时间很短且不跨 await,其实用 std::sync::Mutex 反而更好(性能更高)。tokio::sync::Mutex 只在需要跨 await 持有锁时才使用。

mpsc Channel

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(100);

    // 多个发送者
    for i in 0..5 {
        let tx = tx.clone();
        tokio::spawn(async move {
            tx.send(format!("msg from task {}", i)).await.unwrap();
        });
    }

    // 必须 drop 原始 tx,否则 rx 永远不会收到 None
    drop(tx);

    while let Some(msg) = rx.recv().await {
        println!("{}", msg);
    }
}

mpsc(multi-producer, single-consumer)是最常用的异步通道。如果需要多消费者,可以用 tokio::sync::broadcast

Cargo.toml 配置

[dependencies]
tokio = { version = "1", features = ["full"] }

features = ["full"] 启用所有功能。生产环境中可以按需选择:rt-multi-threadnetio-utilsynctimemacros 等。

小结

tokio 的核心就是:async fn 定义异步函数,spawn 并发执行,select! 多路选择,加上异步版本的 IO 和同步原语。理解了 Future 是惰性的(需要被 poll)、运行时负责调度这两点,就抓住了 Rust 异步编程的关键。