🚀 快速安装

复制以下命令并运行,立即安装此 Skill:

npx skills add https://github.com/wshobson/agents --skill rust-async-patterns

💡 提示:需要 Node.js 和 NPM

Rust 异步模式

使用 Tokio 运行时进行 Rust 异步编程的生产级模式,包括任务、通道、流和错误处理。

何时使用此技能

  • 构建异步 Rust 应用程序
  • 实现并发网络服务
  • 使用 Tokio 进行异步 I/O
  • 正确处理异步错误
  • 调试异步代码问题
  • 优化异步性能

核心概念

1. 异步执行模型

Future (惰性) → poll() → Ready(值) | Pending
                ↑           ↓
              Waker ← 运行时调度

2. 关键抽象

概念 用途
Future 可能稍后完成的惰性计算
async fn 返回 impl Future 的函数
await 挂起直到 Future 完成
Task 并发运行生成的 Future
Runtime 轮询 Future 的执行器

快速开始

# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
use tokio::time::{sleep, Duration};
use anyhow::Result;

#[tokio::main]
async fn main() -> Result<()> {
    // 初始化跟踪
    tracing_subscriber::fmt::init();

    // 异步操作
    let result = fetch_data("https://api.example.com").await?;
    println!("获取到: {}", result);

    Ok(())
}

async fn fetch_data(url: &str) -> Result<String> {
    // 模拟异步操作
    sleep(Duration::from_millis(100)).await;
    Ok(format!("来自 {} 的数据", url))
}

模式

模式 1:并发任务执行

use tokio::task::JoinSet;
use anyhow::Result;

// 生成多个并发任务
async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
    let mut set = JoinSet::new();

    for url in urls {
        set.spawn(async move {
            fetch_data(&url).await
        });
    }

    let mut results = Vec::new();
    while let Some(res) = set.join_next().await {
        match res {
            Ok(Ok(data)) => results.push(data),
            Ok(Err(e)) => tracing::error!("任务失败:{}", e),
            Err(e) => tracing::error!("任务加入错误:{}", e),
        }
    }

    Ok(results)
}

// 带并发限制
use futures::stream::{self, StreamExt};

async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
    stream::iter(urls)
        .map(|url| async move { fetch_data(&url).await })
        .buffer_unordered(limit) // 最大并发任务数
        .collect()
        .await
}

// 选择最先完成的
use tokio::select;

async fn race_requests(url1: &str, url2: &str) -> Result<String> {
    select! {
        result = fetch_data(url1) => result,
        result = fetch_data(url2) => result,
    }
}

模式 2:通信通道

use tokio::sync::{mpsc, broadcast, oneshot, watch};

// 多生产者,单消费者
async fn mpsc_example() {
    let (tx, mut rx) = mpsc::channel::<String>(100);

    // 生成生产者
    let tx2 = tx.clone();
    tokio::spawn(async move {
        tx2.send("你好".to_string()).await.unwrap();
    });

    // 消费
    while let Some(msg) = rx.recv().await {
        println!("收到:{}", msg);
    }
}

// 广播:多生产者,多消费者
async fn broadcast_example() {
    let (tx, _) = broadcast::channel::<String>(100);

    let mut rx1 = tx.subscribe();
    let mut rx2 = tx.subscribe();

    tx.send("事件".to_string()).unwrap();

    // 两个接收者都会收到消息
    let _ = rx1.recv().await;
    let _ = rx2.recv().await;
}

// 一次性通道:单个值,单次使用
async fn oneshot_example() -> String {
    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {
        tx.send("结果".to_string()).unwrap();
    });

    rx.await.unwrap()
}

// 观察者:单个生产者,多消费者,保持最新值
async fn watch_example() {
    let (tx, mut rx) = watch::channel("初始值".to_string());

    tokio::spawn(async move {
        loop {
            // 等待变化
            rx.changed().await.unwrap();
            println!("新值:{}", *rx.borrow());
        }
    });

    tx.send("已更新".to_string()).unwrap();
}

模式 3:异步错误处理

use anyhow::{Context, Result, bail};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum ServiceError {
    #[error("网络错误:{0}")]
    Network(#[from] reqwest::Error),

    #[error("数据库错误:{0}")]
    Database(#[from] sqlx::Error),

    #[error("未找到:{0}")]
    NotFound(String),

    #[error("超时({:?})")]
    Timeout(std::time::Duration),
}

// 使用 anyhow 处理应用程序错误
async fn process_request(id: &str) -> Result<Response> {
    let data = fetch_data(id)
        .await
        .context("获取数据失败")?;

    let parsed = parse_response(&data)
        .context("解析响应失败")?;

    Ok(parsed)
}

// 使用自定义错误处理库代码
async fn get_user(id: &str) -> Result<User, ServiceError> {
    let result = db.query(id).await?;

    match result {
        Some(user) => Ok(user),
        None => Err(ServiceError::NotFound(id.to_string())),
    }
}

// 超时包装器
use tokio::time::timeout;

async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
where
    F: std::future::Future<Output = Result<T, ServiceError>>,
{
    timeout(duration, future)
        .await
        .map_err(|_| ServiceError::Timeout(duration))?
}

模式 4:优雅关闭

use tokio::signal;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;

async fn run_server() -> Result<()> {
    // 方法 1:CancellationToken
    let token = CancellationToken::new();
    let token_clone = token.clone();

    // 生成尊重取消的任务
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token_clone.cancelled() => {
                    tracing::info!("任务正在关闭");
                    break;
                }
                _ = do_work() => {}
            }
        }
    });

    // 等待关闭信号
    signal::ctrl_c().await?;
    tracing::info!("收到关闭信号");

    // 取消所有任务
    token.cancel();

    // 给任务时间清理
    tokio::time::sleep(Duration::from_secs(5)).await;

    Ok(())
}

// 方法 2:使用广播通道进行关闭
async fn run_with_broadcast() -> Result<()> {
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    let mut rx = shutdown_tx.subscribe();
    tokio::spawn(async move {
        tokio::select! {
            _ = rx.recv() => {
                tracing::info!("收到关闭信号");
            }
            _ = async { loop { do_work().await } } => {}
        }
    });

    signal::ctrl_c().await?;
    let _ = shutdown_tx.send(());

    Ok(())
}

模式 5:异步 trait

use async_trait::async_trait;

#[async_trait]
pub trait Repository {
    async fn get(&self, id: &str) -> Result<Entity>;
    async fn save(&self, entity: &Entity) -> Result<()>;
    async fn delete(&self, id: &str) -> Result<()>;
}

pub struct PostgresRepository {
    pool: sqlx::PgPool,
}

#[async_trait]
impl Repository for PostgresRepository {
    async fn get(&self, id: &str) -> Result<Entity> {
        sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
            .fetch_one(&self.pool)
            .await
            .map_err(Into::into)
    }

    async fn save(&self, entity: &Entity) -> Result<()> {
        sqlx::query!(
            "INSERT INTO entities (id, data) VALUES ($1, $2)
             ON CONFLICT (id) DO UPDATE SET data = $2",
            entity.id,
            entity.data
        )
        .execute(&self.pool)
        .await?;
        Ok(())
    }

    async fn delete(&self, id: &str) -> Result<()> {
        sqlx::query!("DELETE FROM entities WHERE id = $1", id)
            .execute(&self.pool)
            .await?;
        Ok(())
    }
}

// trait 对象使用
async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
    let entity = repo.get(id).await?;
    // 处理...
    repo.save(&entity).await
}

模式 6:流和异步迭代

use futures::stream::{self, Stream, StreamExt};
use async_stream::stream;

// 从异步迭代器创建流
fn numbers_stream() -> impl Stream<Item = i32> {
    stream! {
        for i in 0..10 {
            tokio::time::sleep(Duration::from_millis(100)).await;
            yield i;
        }
    }
}

// 处理流
async fn process_stream() {
    let stream = numbers_stream();

    // 映射和过滤
    let processed: Vec<_> = stream
        .filter(|n| futures::future::ready(*n % 2 == 0))
        .map(|n| n * 2)
        .collect()
        .await;

    println!("{:?}", processed);
}

// 分块处理
async fn process_in_chunks() {
    let stream = numbers_stream();

    let mut chunks = stream.chunks(3);

    while let Some(chunk) = chunks.next().await {
        println!("处理块:{:?}", chunk);
    }
}

// 合并多个流
async fn merge_streams() {
    let stream1 = numbers_stream();
    let stream2 = numbers_stream();

    let merged = stream::select(stream1, stream2);

    merged
        .for_each(|n| async move {
            println!("收到:{}", n);
        })
        .await;
}

模式 7:资源管理

use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore};

// 使用 RwLock 的共享状态(读多写少时首选)
struct Cache {
    data: RwLock<HashMap<String, String>>,
}

impl Cache {
    async fn get(&self, key: &str) -> Option<String> {
        self.data.read().await.get(key).cloned()
    }

    async fn set(&self, key: String, value: String) {
        self.data.write().await.insert(key, value);
    }
}

// 使用信号量的连接池
struct Pool {
    semaphore: Semaphore,
    connections: Mutex<Vec<Connection>>,
}

impl Pool {
    fn new(size: usize) -> Self {
        Self {
            semaphore: Semaphore::new(size),
            connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
        }
    }

    async fn acquire(&self) -> PooledConnection<'_> {
        let permit = self.semaphore.acquire().await.unwrap();
        let conn = self.connections.lock().await.pop().unwrap();
        PooledConnection { pool: self, conn: Some(conn), _permit: permit }
    }
}

struct PooledConnection<'a> {
    pool: &'a Pool,
    conn: Option<Connection>,
    _permit: tokio::sync::SemaphorePermit<'a>,
}

impl Drop for PooledConnection<'_> {
    fn drop(&mut self) {
        if let Some(conn) = self.conn.take() {
            let pool = self.pool;
            tokio::spawn(async move {
                pool.connections.lock().await.push(conn);
            });
        }
    }
}

调试技巧

// 启用 tokio-console 进行运行时调试
// Cargo.toml: tokio = { features = ["tracing"] }
// 运行:RUSTFLAGS="--cfg tokio_unstable" cargo run
// 然后:tokio-console

// 检测异步函数
use tracing::instrument;

#[instrument(skip(pool))]
async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
    tracing::debug!("正在获取用户");
    // ...
}

// 跟踪任务生成
let span = tracing::info_span!("worker", id = %worker_id);
tokio::spawn(async move {
    // 当被轮询时进入 span
}.instrument(span));

最佳实践

应该做的

  • 使用 tokio::select! – 用于竞争多个 future
  • 优先使用通道 – 尽可能使用通道而非共享状态
  • 使用 JoinSet – 用于管理多个任务
  • 使用 tracing 进行检测 – 用于调试异步代码
  • 处理取消 – 检查 CancellationToken

避免做的

  • 不要阻塞 – 绝不在异步代码中使用 std::thread::sleep
  • 不要在 await 时持有锁 – 会导致死锁
  • 不要无限制地生成任务 – 使用信号量进行限制
  • 不要忽略错误 – 使用 ? 传播或记录日志
  • 不要忘记 Send 边界 – 对于生成的 future 很重要

📄 原始文档

完整文档(英文):

https://skills.sh/wshobson/agents/rust-async-patterns

💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。