项目添加依赖

# Cargo.toml

[dependencies]
tokio = { version = "1", features = ["full"] }

基本代码框架

#[tokio::main]
async fn main() {
    println!("hello");
}

//上面的代码会编译成👇🏻
fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

异步函数(Async Func)和异步闭包(Async Closure)

//异步函数
async fn func1() {
    println!("hello");
}
//异步闭包
let func2 = async {
    println!("world");
};
let func3 = async move {
    println!("!");
};

懒执行(Lazy Exec)

#[tokio::main]
async fn main() {
    //这行调用并不会执行
    let op = say_world();

    // 首先会输出"hello"
    println!("hello");

    //调用 `.await` 之后才会输出 `say_world`
    op.await;
}
async fn say_world() {
    println!("world");
}

并发(Spawning)

use tokio::net::TcpListener;

// 原始代码结构
#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        //下面这行await逻辑是process完成再进行下一个loop
        process(socket).await;
    }
}

// 优化后代码结构
#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        //产生了新的task,不会中断当前的loop
        //socket的所有权被移交给新的task(move关键字)
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

任务(Task)

  • Tokio的任务Task是一个异步的绿色线程.
  • 通过tokio::spawn和异步代码块可以创建Task.
  • tokio::spawn返回的是一个JoinHandle,这个Handle可以和Task进行交互
  • tokio::spawn的异步代码块内部可能会返回value,通过JoinHandle对象的.await可以获得该value
  • JoinHandle调用.await之后可能会返回Err(Cancelled或者Panic引起的)
  • Tokio的Task是轻量的,只需要64bytes内存
tokio::spawn(async {                   // <-----这个异步代码块就是一个Task
    println!("Here's a vec: {:?}", v); // <-----这个异步代码块就是一个Task
});                                    // <-----这个异步代码块就是一个Task

任务(Task)的生命周期必须是'static

//代码无法编译,因为task内部借用(borrow)了变量v,
//异步task如果存活的比当前main久,那么对于v的借用就会很危险
#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];
    tokio::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

//解决方法1(move)
#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];
    tokio::spawn(async move{
        println!("Here's a vec: {:?}", v);
    });
}

//解决方法1引发的问题
//变量v的所有权变动,无法在后续代码中使用
#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];
    tokio::spawn(async move{
        println!("Here's a vec: {:?}", v);
    });
    println!("{:?}", v);// <---这里会报错
}

//解决方法2(Arc)
#[tokio::main]
async fn main() {
    let v = Arc::new(vec![1, 2, 3]);
    let v2 = v.clone();
    tokio::spawn(async move {
        println!("Here's a vec: {:?}", v2);
    });
    println!("{:?}", v);
}

任务(Task)必须实现Send

实现Send的Task,就可以被Tokio保留状态的前提下,在不同的线程中执行

//例子
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");
        //下面的await被调用触发后,会暂存await后边代码中使用到的变量(比如rc)
        //等到后续await执行成功之后,保存的变量(比如rc)又会被恢复,继续执行代码
        //await前后的代码可能会在不同的线程中执行
        //这就要求所有被暂存的变量必须实现Send
        //由于Rc没有实现Send,所以编译会报错
        yield_now().await;

        println!("{}", rc);
    });
}

//Rc替换成Arc,或者移除Rc直接使用"hello"都可以解决报错

共享状态(Shared State)

//例子:利用std::sync::Mutex(互斥体)
use std::sync::Mutex;

#[tokio::main]
async fn main() {
    let mtx = Mutex::new(0);
    tokio::join!(work(&mtx), work(&mtx));
    println!("{}", *mtx.lock().unwrap());
}
async fn work(mtx: &Mutex<i32>) {
    println!("lock");
    {
        let mut v = mtx.lock().unwrap();
        println!("locked");
        *v += 1;
    }
    println!("unlock")
}
//会出问题的例子:这里的样例代码比上面多了一行'模拟网络请求',使用std的Mutex会引起阻塞
use std::sync::Mutex;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let mtx = Mutex::new(0);
    tokio::join!(work(&mtx), work(&mtx));
    println!("{}", *mtx.lock().unwrap());
}
async fn work(mtx: &Mutex<i32>) {
    println!("lock");
    {
        let mut v = mtx.lock().unwrap();
        println!("locked");
        //模拟网络请求,Task挂起
        tokio::time::sleep(Duration::from_millis(500)).await;
        *v += 1;//这行执行的线程可能不是执行await上面代码的线程,所以并没有获取到lock,这行会阻塞
    }
    println!("unlock")
}
//问题解决方案1:调整代码逻辑,使lock的作用域不包含await
use std::sync::Mutex;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let mtx = Mutex::new(0);
    tokio::join!(work(&mtx), work(&mtx));
    println!("{}", *mtx.lock().unwrap());
}
async fn work(mtx: &Mutex<i32>) {
    println!("lock");
    {
        {
            let mut v = mtx.lock().unwrap();
            println!("locked");
            *v += 1;
        }
        tokio::time::sleep(Duration::from_millis(500)).await;
    }
    println!("unlock")
}
//问题解决方案2:利用tokio::sync::Mutex(可以跨await,不会被阻塞)
use std::time::Duration;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let mtx = Mutex::new(0);
    tokio::join!(work(&mtx), work(&mtx));
    println!("{}", *mtx.lock().await);
}
async fn work(mtx: &Mutex<i32>) {
    println!("lock");
    {
        let mut v = mtx.lock().await;
        println!("locked");
        //模拟网络请求,Task挂起
        tokio::time::sleep(Duration::from_millis(100)).await;
        *v += 1;//不会被阻塞
    }
    println!("unlock")
}

tokio::sync模块

  • broadcast: N生产者,M消费者的广播队列.每个发送的值会被所有的消费者消费.
  • mpsc: N生产者,单消费者的队列
  • oneshot: 一次性的队列,可以实现’请求响应模型'
  • watch: 单生产者,N消费者,只能保留最后的发送值

see https://docs.rs/tokio/1/tokio/sync/index.html

tokio::io模块

  • AsyncRead
  • AsyncWrite
  • AsyncReadExt
  • AsyncWriteExt

see https://docs.rs/tokio/1/tokio/io/index.html

//帮助函数:tokio::io::copy
use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut reader: &[u8] = b"hello";
    let mut file = File::create("foo.txt").await?;

    io::copy(&mut reader, &mut file).await?;
    Ok(())
}
//帮助函数:tokio::io::split
use std::error::Error;
use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let tcp_listener = TcpListener::bind("127.0.0.1:8888").await?;
    let (mut stream, _) = tcp_listener.accept().await?;

    let (read, write) = io::split(stream);
    //let (read, write) = stream.split();

    Ok(())
}
//处理EOF
loop {
    match socket.read(&mut buf).await {
        // `Ok(0)`代表远端已经关闭了,无法再继续获取数据,必须return,否则会导致无限循环,CPU100%!
        Ok(0) => return,
        // ...其他处理逻辑
    }
}
//主动Flush
use tokio::io::{self, BufWriter, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let f = File::create("foo.txt").await?;
    let mut buffer = BufWriter::new(f);
    buffer.write_all(b"some bytes").await?;
    buffer.flush().await?; //<---- 将所有的缓存数据,输出到另一端
    Ok(())
}

Select概念

tokio::select!宏允许通知执行多个异步计算,当其中一个异步计算完成时,就会终止并返回结果.

语法: <pattern> = <async expression> => <handler>

use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();
    tokio::spawn(async {
        let _ = tx1.send("one");
    });
    tokio::spawn(async {
        let _ = tx2.send("two");
    });
    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
    //这里两个rx1代码块会优先执行,如果rx1有返回结果,则rx2代码块不会执行
}
use std::time::Duration;
use tokio::sync::oneshot;

async fn some_operation() -> String {
    "one".to_string()
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            _ = tx1.closed() => {
                println("tx1 closed");
            }
        }
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    //当本代码块结束时,对应的tx会被drop
    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}
async fn computation1() -> String {
    // .. computation
}

async fn computation2() -> String {
    // .. computation
}

#[tokio::main]
async fn main() {
    //tokio::select!的结果可作为值绑定给变量
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("Got = {}", out);
}
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel(128);
    let (mut tx2, mut rx2) = mpsc::channel(128);

    tokio::spawn(async move {
        // drop(tx1);
        // drop(tx2);
    });

    tokio::select! {
        Some(v) = rx1.recv() => {
            println!("Got {:?} from rx1", v);
        }
        Some(v) = rx2.recv() => {
            println!("Got {:?} from rx2", v);
        }
        else => {
            //当tx1和tx2都没发送消息,且被Drop掉时,会触发else分支
            println!("Both channels closed");
        }
    }
}
//tokio::select!代码块和tokio::spawn不同
//tokio::select!遵循Rust借用规则
//多个不可变借用可以存在多个异步表达式中
//可变借用只能存在于一个异步表达式中
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();
    let mut out = String::new();
    tokio::spawn(async move {
        // Send values on `tx1` and `tx2`.
    });
    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel(128);
    let (tx2, mut rx2) = mpsc::channel(128);
    let (tx3, mut rx3) = mpsc::channel(128);

    //当涉及到循环时,select只会以rx1,rx2,rx3的顺序执行,不会乱序
    //当每次循环时,rx1都有值的情况,后面的rx2,rx3都不会执行,保留执行前的状态
    loop {
        let msg = tokio::select! {
            Some(msg) = rx1.recv() => msg,
            Some(msg) = rx2.recv() => msg,
            Some(msg) = rx3.recv() => msg,
            else => { break }
        };

        println!("Got {:?}", msg);
    }

    println!("All channels have been closed.");
}
//恢复一个已存在的的Future
//步骤1:  tokio::pin!(operation)
//步骤2:  &mut operation作为异步表达式
async fn action() {
    // Some asynchronous logic
}
#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);    
    
    let operation = action();
    tokio::pin!(operation);
    
    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}
//利用set命令来动态修改Pin之后的变量的值!

async fn action(input: Option<i32>) -> Option<String> {
    // If the input is `None`, return `None`.
    // This could also be written as `let i = input?;`
    let i = match input {
        Some(input) => input,
        None => return None,
    };
    // async logic here
}

// `.set` is a method on `Pin`.
operation.set(action(Some(v)));

Select(Per-task concurrency 单任务并发)

Both tokio::spawn and select! enable running concurrent asynchronous operations. However, the strategy used to run concurrent operations differs. The tokio::spawn function takes an asynchronous operation and spawns a new task to run it. A task is the object that the Tokio runtime schedules. Two different tasks are scheduled independently by Tokio. They may run simultaneously on different operating system threads. Because of this, a spawned task has the same restriction as a spawned thread: no borrowing(不允许借用).

The select! macro runs all branches concurrently on the same task. Because all branches of the select! macro are executed on the same task, they will never run simultaneously. The select! macro multiplexes asynchronous operations on a single task.