项目添加依赖
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
基本代码框架
#[tokio::main]
async fn main() {
println!("hello");
}
//上面的代码会编译成👇🏻
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
异步函数(Async Func)和异步闭包(Async Closure)
//异步函数
async fn func1() {
println!("hello");
}
//异步闭包
let func2 = async {
println!("world");
};
let func3 = async move {
println!("!");
};
懒执行(Lazy Exec)
#[tokio::main]
async fn main() {
//这行调用并不会执行
let op = say_world();
// 首先会输出"hello"
println!("hello");
//调用 `.await` 之后才会输出 `say_world`
op.await;
}
async fn say_world() {
println!("world");
}
并发(Spawning)
use tokio::net::TcpListener;
// 原始代码结构
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
//下面这行await逻辑是process完成再进行下一个loop
process(socket).await;
}
}
// 优化后代码结构
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
//产生了新的task,不会中断当前的loop
//socket的所有权被移交给新的task(move关键字)
tokio::spawn(async move {
process(socket).await;
});
}
}
任务(Task)
- Tokio的任务Task是一个异步的绿色线程.
- 通过tokio::spawn和
异步代码块
可以创建Task. - tokio::spawn返回的是一个
JoinHandle
,这个Handle可以和Task进行交互 - tokio::spawn的
异步代码块
内部可能会返回value
,通过JoinHandle对象的.await
可以获得该value
JoinHandle
调用.await
之后可能会返回Err(Cancelled或者Panic引起的)- Tokio的Task是轻量的,只需要64bytes内存
tokio::spawn(async { // <-----这个异步代码块就是一个Task
println!("Here's a vec: {:?}", v); // <-----这个异步代码块就是一个Task
}); // <-----这个异步代码块就是一个Task
任务(Task)的生命周期必须是'static
//代码无法编译,因为task内部借用(borrow)了变量v,
//异步task如果存活的比当前main久,那么对于v的借用就会很危险
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
tokio::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
//解决方法1(move)
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
tokio::spawn(async move{
println!("Here's a vec: {:?}", v);
});
}
//解决方法1引发的问题
//变量v的所有权变动,无法在后续代码中使用
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
tokio::spawn(async move{
println!("Here's a vec: {:?}", v);
});
println!("{:?}", v);// <---这里会报错
}
//解决方法2(Arc)
#[tokio::main]
async fn main() {
let v = Arc::new(vec![1, 2, 3]);
let v2 = v.clone();
tokio::spawn(async move {
println!("Here's a vec: {:?}", v2);
});
println!("{:?}", v);
}
任务(Task)必须实现Send
实现Send的Task,就可以被Tokio保留状态的前提下,在不同的线程中执行
//例子
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
//下面的await被调用触发后,会暂存await后边代码中使用到的变量(比如rc)
//等到后续await执行成功之后,保存的变量(比如rc)又会被恢复,继续执行代码
//await前后的代码可能会在不同的线程中执行
//这就要求所有被暂存的变量必须实现Send
//由于Rc没有实现Send,所以编译会报错
yield_now().await;
println!("{}", rc);
});
}
//Rc替换成Arc,或者移除Rc直接使用"hello"都可以解决报错
共享状态(Shared State)
//例子:利用std::sync::Mutex(互斥体)
use std::sync::Mutex;
#[tokio::main]
async fn main() {
let mtx = Mutex::new(0);
tokio::join!(work(&mtx), work(&mtx));
println!("{}", *mtx.lock().unwrap());
}
async fn work(mtx: &Mutex<i32>) {
println!("lock");
{
let mut v = mtx.lock().unwrap();
println!("locked");
*v += 1;
}
println!("unlock")
}
//会出问题的例子:这里的样例代码比上面多了一行'模拟网络请求',使用std的Mutex会引起阻塞
use std::sync::Mutex;
use std::time::Duration;
#[tokio::main]
async fn main() {
let mtx = Mutex::new(0);
tokio::join!(work(&mtx), work(&mtx));
println!("{}", *mtx.lock().unwrap());
}
async fn work(mtx: &Mutex<i32>) {
println!("lock");
{
let mut v = mtx.lock().unwrap();
println!("locked");
//模拟网络请求,Task挂起
tokio::time::sleep(Duration::from_millis(500)).await;
*v += 1;//这行执行的线程可能不是执行await上面代码的线程,所以并没有获取到lock,这行会阻塞
}
println!("unlock")
}
//问题解决方案1:调整代码逻辑,使lock的作用域不包含await
use std::sync::Mutex;
use std::time::Duration;
#[tokio::main]
async fn main() {
let mtx = Mutex::new(0);
tokio::join!(work(&mtx), work(&mtx));
println!("{}", *mtx.lock().unwrap());
}
async fn work(mtx: &Mutex<i32>) {
println!("lock");
{
{
let mut v = mtx.lock().unwrap();
println!("locked");
*v += 1;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
println!("unlock")
}
//问题解决方案2:利用tokio::sync::Mutex(可以跨await,不会被阻塞)
use std::time::Duration;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let mtx = Mutex::new(0);
tokio::join!(work(&mtx), work(&mtx));
println!("{}", *mtx.lock().await);
}
async fn work(mtx: &Mutex<i32>) {
println!("lock");
{
let mut v = mtx.lock().await;
println!("locked");
//模拟网络请求,Task挂起
tokio::time::sleep(Duration::from_millis(100)).await;
*v += 1;//不会被阻塞
}
println!("unlock")
}
tokio::sync模块
- broadcast: N生产者,M消费者的广播队列.每个发送的值会被所有的消费者消费.
- mpsc: N生产者,单消费者的队列
- oneshot: 一次性的队列,可以实现’请求响应模型'
- watch: 单生产者,N消费者,只能保留最后的发送值
tokio::io模块
- AsyncRead
- AsyncWrite
- AsyncReadExt
- AsyncWriteExt
//帮助函数:tokio::io::copy
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
//帮助函数:tokio::io::split
use std::error::Error;
use tokio::io;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let tcp_listener = TcpListener::bind("127.0.0.1:8888").await?;
let (mut stream, _) = tcp_listener.accept().await?;
let (read, write) = io::split(stream);
//let (read, write) = stream.split();
Ok(())
}
//处理EOF
loop {
match socket.read(&mut buf).await {
// `Ok(0)`代表远端已经关闭了,无法再继续获取数据,必须return,否则会导致无限循环,CPU100%!
Ok(0) => return,
// ...其他处理逻辑
}
}
//主动Flush
use tokio::io::{self, BufWriter, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let f = File::create("foo.txt").await?;
let mut buffer = BufWriter::new(f);
buffer.write_all(b"some bytes").await?;
buffer.flush().await?; //<---- 将所有的缓存数据,输出到另一端
Ok(())
}
Select概念
tokio::select!宏允许通知执行多个异步计算,当其中一个异步计算完成时,就会终止并返回结果.
语法:
<pattern> = <async expression> => <handler>
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
let _ = tx1.send("one");
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
//这里两个rx1代码块会优先执行,如果rx1有返回结果,则rx2代码块不会执行
}
use std::time::Duration;
use tokio::sync::oneshot;
async fn some_operation() -> String {
"one".to_string()
}
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
tokio::select! {
val = some_operation() => {
let _ = tx1.send(val);
}
_ = tx1.closed() => {
println("tx1 closed");
}
}
});
tokio::spawn(async {
let _ = tx2.send("two");
});
//当本代码块结束时,对应的tx会被drop
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
async fn computation1() -> String {
// .. computation
}
async fn computation2() -> String {
// .. computation
}
#[tokio::main]
async fn main() {
//tokio::select!的结果可作为值绑定给变量
let out = tokio::select! {
res1 = computation1() => res1,
res2 = computation2() => res2,
};
println!("Got = {}", out);
}
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut tx1, mut rx1) = mpsc::channel(128);
let (mut tx2, mut rx2) = mpsc::channel(128);
tokio::spawn(async move {
// drop(tx1);
// drop(tx2);
});
tokio::select! {
Some(v) = rx1.recv() => {
println!("Got {:?} from rx1", v);
}
Some(v) = rx2.recv() => {
println!("Got {:?} from rx2", v);
}
else => {
//当tx1和tx2都没发送消息,且被Drop掉时,会触发else分支
println!("Both channels closed");
}
}
}
//tokio::select!代码块和tokio::spawn不同
//tokio::select!遵循Rust借用规则
//多个不可变借用可以存在多个异步表达式中
//可变借用只能存在于一个异步表达式中
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut out = String::new();
tokio::spawn(async move {
// Send values on `tx1` and `tx2`.
});
tokio::select! {
_ = rx1 => {
out.push_str("rx1 completed");
}
_ = rx2 => {
out.push_str("rx2 completed");
}
}
println!("{}", out);
}
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel(128);
let (tx2, mut rx2) = mpsc::channel(128);
let (tx3, mut rx3) = mpsc::channel(128);
//当涉及到循环时,select只会以rx1,rx2,rx3的顺序执行,不会乱序
//当每次循环时,rx1都有值的情况,后面的rx2,rx3都不会执行,保留执行前的状态
loop {
let msg = tokio::select! {
Some(msg) = rx1.recv() => msg,
Some(msg) = rx2.recv() => msg,
Some(msg) = rx3.recv() => msg,
else => { break }
};
println!("Got {:?}", msg);
}
println!("All channels have been closed.");
}
//恢复一个已存在的的Future
//步骤1: tokio::pin!(operation)
//步骤2: &mut operation作为异步表达式
async fn action() {
// Some asynchronous logic
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let operation = action();
tokio::pin!(operation);
loop {
tokio::select! {
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}
//利用set命令来动态修改Pin之后的变量的值!
async fn action(input: Option<i32>) -> Option<String> {
// If the input is `None`, return `None`.
// This could also be written as `let i = input?;`
let i = match input {
Some(input) => input,
None => return None,
};
// async logic here
}
// `.set` is a method on `Pin`.
operation.set(action(Some(v)));
Select(Per-task concurrency 单任务并发)
Both tokio::spawn and select! enable running concurrent asynchronous operations. However, the strategy used to run concurrent operations differs. The tokio::spawn
function takes an asynchronous operation and spawns a new task to run it. A task is the object that the Tokio runtime schedules. Two different tasks are scheduled independently by Tokio. They may run simultaneously on different operating system threads.
Because of this, a spawned task has the same restriction as a spawned thread: no borrowing(不允许借用)
.
The select!
macro runs all branches concurrently on the same task
. Because all branches of the select! macro are executed on the same task
, they will never run simultaneously
. The select! macro multiplexes asynchronous operations on a single task
.