Rust 的异步生态围绕 async/await 语法和 Future trait 构建,而 tokio 是最主流的异步运行时。这篇从基础用法到实际案例,记录 tokio 的核心概念。
async/await 基础
Rust 的 async fn 返回一个实现了 Future trait 的值,但 Future 本身不会自动执行——需要一个运行时(runtime)来驱动它:
async fn hello() -> String {
"hello, async world".to_string()
}
// 这只是创建了一个 Future,并没有执行
let future = hello();
// 需要 .await 或交给运行时来执行
tokio::main 宏
最简单的方式是用 #[tokio::main] 宏:
use tokio;
#[tokio::main]
async fn main() {
let result = hello().await;
println!("{}", result);
}
async fn hello() -> String {
// 模拟异步操作
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
"hello from tokio".to_string()
}
#[tokio::main] 展开后大致等价于:
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
// your async main body
})
}
spawn 创建并发任务
tokio::spawn 将一个 Future 提交给运行时,返回一个 JoinHandle:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let task1 = tokio::spawn(async {
sleep(Duration::from_secs(2)).await;
println!("task 1 done");
42
});
let task2 = tokio::spawn(async {
sleep(Duration::from_secs(1)).await;
println!("task 2 done");
"hello"
});
// 等待两个任务完成
let result1 = task1.await.unwrap();
let result2 = task2.await.unwrap();
println!("results: {}, {}", result1, result2);
}
spawn 出去的任务会并发执行。注意 spawn 的 Future 必须是 'static 的——不能借用局部变量。如果需要共享数据,用 Arc。
tokio::select!
select! 同时等待多个异步操作,第一个完成的分支会被执行,其余的被取消:
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(32);
// 模拟一个生产者
tokio::spawn(async move {
sleep(Duration::from_millis(500)).await;
tx.send("data arrived".to_string()).await.unwrap();
});
// select! 等待多个分支
tokio::select! {
msg = rx.recv() => {
println!("received: {:?}", msg);
}
_ = sleep(Duration::from_secs(5)) => {
println!("timeout!");
}
}
}
select! 是写超时逻辑、多路复用、优雅关闭的利器。
异步 TCP Server 示例
一个简单的 echo server:
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("listening on 127.0.0.1:8080");
loop {
let (mut socket, addr) = listener.accept().await?;
println!("new connection from {}", addr);
tokio::spawn(async move {
let mut buf = [0u8; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(0) => {
println!("{} disconnected", addr);
return;
}
Ok(n) => n,
Err(e) => {
eprintln!("read error from {}: {}", addr, e);
return;
}
};
if let Err(e) = socket.write_all(&buf[..n]).await {
eprintln!("write error to {}: {}", addr, e);
return;
}
}
});
}
}
每个连接都 spawn 一个独立的任务处理,tokio 的调度器会高效地在线程池上调度这些任务。相比 OS 线程,tokio 的异步任务开销极小——可以轻松创建数万个并发连接。
tokio::sync 同步原语
异步代码中不能直接用 std::sync::Mutex(它会阻塞整个线程),需要用 tokio 提供的异步版本:
Mutex
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0u64));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(tokio::spawn(async move {
for _ in 0..1000 {
let mut lock = counter.lock().await;
*lock += 1;
}
}));
}
for h in handles {
h.await.unwrap();
}
println!("counter = {}", *counter.lock().await);
}
注意:如果锁的持有时间很短且不跨 await,其实用 std::sync::Mutex 反而更好(性能更高)。tokio::sync::Mutex 只在需要跨 await 持有锁时才使用。
mpsc Channel
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(100);
// 多个发送者
for i in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(format!("msg from task {}", i)).await.unwrap();
});
}
// 必须 drop 原始 tx,否则 rx 永远不会收到 None
drop(tx);
while let Some(msg) = rx.recv().await {
println!("{}", msg);
}
}
mpsc(multi-producer, single-consumer)是最常用的异步通道。如果需要多消费者,可以用 tokio::sync::broadcast。
Cargo.toml 配置
[dependencies]
tokio = { version = "1", features = ["full"] }
features = ["full"] 启用所有功能。生产环境中可以按需选择:rt-multi-thread、net、io-util、sync、time、macros 等。
小结
tokio 的核心就是:async fn 定义异步函数,spawn 并发执行,select! 多路选择,加上异步版本的 IO 和同步原语。理解了 Future 是惰性的(需要被 poll)、运行时负责调度这两点,就抓住了 Rust 异步编程的关键。