🚀 快速安装
复制以下命令并运行,立即安装此 Skill:
npx skills add https://github.com/wshobson/agents --skill rust-async-patterns
💡 提示:需要 Node.js 和 NPM
Rust 异步模式
使用 Tokio 运行时进行 Rust 异步编程的生产级模式,包括任务、通道、流和错误处理。
何时使用此技能
- 构建异步 Rust 应用程序
- 实现并发网络服务
- 使用 Tokio 进行异步 I/O
- 正确处理异步错误
- 调试异步代码问题
- 优化异步性能
核心概念
1. 异步执行模型
Future (惰性) → poll() → Ready(值) | Pending
↑ ↓
Waker ← 运行时调度
2. 关键抽象
| 概念 | 用途 |
|---|---|
Future |
可能稍后完成的惰性计算 |
async fn |
返回 impl Future 的函数 |
await |
挂起直到 Future 完成 |
Task |
并发运行生成的 Future |
Runtime |
轮询 Future 的执行器 |
快速开始
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
use tokio::time::{sleep, Duration};
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()> {
// 初始化跟踪
tracing_subscriber::fmt::init();
// 异步操作
let result = fetch_data("https://api.example.com").await?;
println!("获取到: {}", result);
Ok(())
}
async fn fetch_data(url: &str) -> Result<String> {
// 模拟异步操作
sleep(Duration::from_millis(100)).await;
Ok(format!("来自 {} 的数据", url))
}
模式
模式 1:并发任务执行
use tokio::task::JoinSet;
use anyhow::Result;
// 生成多个并发任务
async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
let mut set = JoinSet::new();
for url in urls {
set.spawn(async move {
fetch_data(&url).await
});
}
let mut results = Vec::new();
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(data)) => results.push(data),
Ok(Err(e)) => tracing::error!("任务失败:{}", e),
Err(e) => tracing::error!("任务加入错误:{}", e),
}
}
Ok(results)
}
// 带并发限制
use futures::stream::{self, StreamExt};
async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
stream::iter(urls)
.map(|url| async move { fetch_data(&url).await })
.buffer_unordered(limit) // 最大并发任务数
.collect()
.await
}
// 选择最先完成的
use tokio::select;
async fn race_requests(url1: &str, url2: &str) -> Result<String> {
select! {
result = fetch_data(url1) => result,
result = fetch_data(url2) => result,
}
}
模式 2:通信通道
use tokio::sync::{mpsc, broadcast, oneshot, watch};
// 多生产者,单消费者
async fn mpsc_example() {
let (tx, mut rx) = mpsc::channel::<String>(100);
// 生成生产者
let tx2 = tx.clone();
tokio::spawn(async move {
tx2.send("你好".to_string()).await.unwrap();
});
// 消费
while let Some(msg) = rx.recv().await {
println!("收到:{}", msg);
}
}
// 广播:多生产者,多消费者
async fn broadcast_example() {
let (tx, _) = broadcast::channel::<String>(100);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tx.send("事件".to_string()).unwrap();
// 两个接收者都会收到消息
let _ = rx1.recv().await;
let _ = rx2.recv().await;
}
// 一次性通道:单个值,单次使用
async fn oneshot_example() -> String {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
tx.send("结果".to_string()).unwrap();
});
rx.await.unwrap()
}
// 观察者:单个生产者,多消费者,保持最新值
async fn watch_example() {
let (tx, mut rx) = watch::channel("初始值".to_string());
tokio::spawn(async move {
loop {
// 等待变化
rx.changed().await.unwrap();
println!("新值:{}", *rx.borrow());
}
});
tx.send("已更新".to_string()).unwrap();
}
模式 3:异步错误处理
use anyhow::{Context, Result, bail};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ServiceError {
#[error("网络错误:{0}")]
Network(#[from] reqwest::Error),
#[error("数据库错误:{0}")]
Database(#[from] sqlx::Error),
#[error("未找到:{0}")]
NotFound(String),
#[error("超时({:?})")]
Timeout(std::time::Duration),
}
// 使用 anyhow 处理应用程序错误
async fn process_request(id: &str) -> Result<Response> {
let data = fetch_data(id)
.await
.context("获取数据失败")?;
let parsed = parse_response(&data)
.context("解析响应失败")?;
Ok(parsed)
}
// 使用自定义错误处理库代码
async fn get_user(id: &str) -> Result<User, ServiceError> {
let result = db.query(id).await?;
match result {
Some(user) => Ok(user),
None => Err(ServiceError::NotFound(id.to_string())),
}
}
// 超时包装器
use tokio::time::timeout;
async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
where
F: std::future::Future<Output = Result<T, ServiceError>>,
{
timeout(duration, future)
.await
.map_err(|_| ServiceError::Timeout(duration))?
}
模式 4:优雅关闭
use tokio::signal;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
async fn run_server() -> Result<()> {
// 方法 1:CancellationToken
let token = CancellationToken::new();
let token_clone = token.clone();
// 生成尊重取消的任务
tokio::spawn(async move {
loop {
tokio::select! {
_ = token_clone.cancelled() => {
tracing::info!("任务正在关闭");
break;
}
_ = do_work() => {}
}
}
});
// 等待关闭信号
signal::ctrl_c().await?;
tracing::info!("收到关闭信号");
// 取消所有任务
token.cancel();
// 给任务时间清理
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
}
// 方法 2:使用广播通道进行关闭
async fn run_with_broadcast() -> Result<()> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let mut rx = shutdown_tx.subscribe();
tokio::spawn(async move {
tokio::select! {
_ = rx.recv() => {
tracing::info!("收到关闭信号");
}
_ = async { loop { do_work().await } } => {}
}
});
signal::ctrl_c().await?;
let _ = shutdown_tx.send(());
Ok(())
}
模式 5:异步 trait
use async_trait::async_trait;
#[async_trait]
pub trait Repository {
async fn get(&self, id: &str) -> Result<Entity>;
async fn save(&self, entity: &Entity) -> Result<()>;
async fn delete(&self, id: &str) -> Result<()>;
}
pub struct PostgresRepository {
pool: sqlx::PgPool,
}
#[async_trait]
impl Repository for PostgresRepository {
async fn get(&self, id: &str) -> Result<Entity> {
sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
.fetch_one(&self.pool)
.await
.map_err(Into::into)
}
async fn save(&self, entity: &Entity) -> Result<()> {
sqlx::query!(
"INSERT INTO entities (id, data) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET data = $2",
entity.id,
entity.data
)
.execute(&self.pool)
.await?;
Ok(())
}
async fn delete(&self, id: &str) -> Result<()> {
sqlx::query!("DELETE FROM entities WHERE id = $1", id)
.execute(&self.pool)
.await?;
Ok(())
}
}
// trait 对象使用
async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
let entity = repo.get(id).await?;
// 处理...
repo.save(&entity).await
}
模式 6:流和异步迭代
use futures::stream::{self, Stream, StreamExt};
use async_stream::stream;
// 从异步迭代器创建流
fn numbers_stream() -> impl Stream<Item = i32> {
stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
yield i;
}
}
}
// 处理流
async fn process_stream() {
let stream = numbers_stream();
// 映射和过滤
let processed: Vec<_> = stream
.filter(|n| futures::future::ready(*n % 2 == 0))
.map(|n| n * 2)
.collect()
.await;
println!("{:?}", processed);
}
// 分块处理
async fn process_in_chunks() {
let stream = numbers_stream();
let mut chunks = stream.chunks(3);
while let Some(chunk) = chunks.next().await {
println!("处理块:{:?}", chunk);
}
}
// 合并多个流
async fn merge_streams() {
let stream1 = numbers_stream();
let stream2 = numbers_stream();
let merged = stream::select(stream1, stream2);
merged
.for_each(|n| async move {
println!("收到:{}", n);
})
.await;
}
模式 7:资源管理
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore};
// 使用 RwLock 的共享状态(读多写少时首选)
struct Cache {
data: RwLock<HashMap<String, String>>,
}
impl Cache {
async fn get(&self, key: &str) -> Option<String> {
self.data.read().await.get(key).cloned()
}
async fn set(&self, key: String, value: String) {
self.data.write().await.insert(key, value);
}
}
// 使用信号量的连接池
struct Pool {
semaphore: Semaphore,
connections: Mutex<Vec<Connection>>,
}
impl Pool {
fn new(size: usize) -> Self {
Self {
semaphore: Semaphore::new(size),
connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
}
}
async fn acquire(&self) -> PooledConnection<'_> {
let permit = self.semaphore.acquire().await.unwrap();
let conn = self.connections.lock().await.pop().unwrap();
PooledConnection { pool: self, conn: Some(conn), _permit: permit }
}
}
struct PooledConnection<'a> {
pool: &'a Pool,
conn: Option<Connection>,
_permit: tokio::sync::SemaphorePermit<'a>,
}
impl Drop for PooledConnection<'_> {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
let pool = self.pool;
tokio::spawn(async move {
pool.connections.lock().await.push(conn);
});
}
}
}
调试技巧
// 启用 tokio-console 进行运行时调试
// Cargo.toml: tokio = { features = ["tracing"] }
// 运行:RUSTFLAGS="--cfg tokio_unstable" cargo run
// 然后:tokio-console
// 检测异步函数
use tracing::instrument;
#[instrument(skip(pool))]
async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
tracing::debug!("正在获取用户");
// ...
}
// 跟踪任务生成
let span = tracing::info_span!("worker", id = %worker_id);
tokio::spawn(async move {
// 当被轮询时进入 span
}.instrument(span));
最佳实践
应该做的
- 使用
tokio::select!– 用于竞争多个 future - 优先使用通道 – 尽可能使用通道而非共享状态
- 使用
JoinSet– 用于管理多个任务 - 使用 tracing 进行检测 – 用于调试异步代码
- 处理取消 – 检查
CancellationToken
避免做的
- 不要阻塞 – 绝不在异步代码中使用
std::thread::sleep - 不要在 await 时持有锁 – 会导致死锁
- 不要无限制地生成任务 – 使用信号量进行限制
- 不要忽略错误 – 使用
?传播或记录日志 - 不要忘记 Send 边界 – 对于生成的 future 很重要
📄 原始文档
完整文档(英文):
https://skills.sh/wshobson/agents/rust-async-patterns
💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

评论(0)