Rust 进阶学习笔记(八):异步编程
目录
aysnc、await和future,异步编程调度和执行器,Pin和UnPin,join和select
本文属于我的 Rust 学习笔记 系列。
Rust 入门学习笔记以实际例子为主,讲解部分不是从零开始的,所以不建议纯萌新观看,读者最好拥有任意一种面向对象语言的基础,然后自己多多少少看过 Rust 的基本语法,刷过一点 rustlings。
Rust 进阶学习笔记以及实战的来源则五花八门,将会标注在下一行⬇️。
本节出处:圣经-4.11异步编程
异步比多线程难多了,各种离谱的命名、离谱的设计给我看吐了,纯粹是靠强迫症坚持看下去的。后面会有一节是基于 tokio 的实践,强迫自己看完本节后再去看 tokio 洗洗眼睛能舒服一些。
异步简介
Rust 同时支持多线程和异步编程,并且选择了基于async/await
的异步编程1。Rust 的异步编程性能很高,可以认为没有额外消耗。
async 底层也是基于线程实现,但它基于线程封装了一个运行时,让多个任务映射到少量线程上,然后线程切换就变成了任务切换,这就变得十分高效。对于 IO 密集型任务,如果使用多线程,那么大多数线程都被阻塞,处于等待状态,一旦唤醒又有昂贵的上下文切换代价,此时使用 async 就十分合适。
一般地,建议这样选择:
- 有大量 IO 任务需要并发运行时,选 async 模型
- 有部分 IO 任务需要并发运行时,选多线程,如果想要降低线程创建和销毁的开销,可以使用线程池
- 有大量 CPU 密集任务需要并行运行时,例如并行计算,选多线程模型,且让线程数等于或者稍大于 CPU 核心数
- 无所谓时,统一选多线程
// 来看一组对比
// 一旦下载文件的并发请求多起来,那一个下载任务占用一个线程的模式就太重了,会很容易成为程序的瓶颈。
async
事实上,async 和多线程并不是二选一,在同一应用中,可以根据情况两者一起使用。
异步入门
async 的底层实现非常复杂,且会导致编译后文件体积显著增加。因此 Rust 内置的异步特性并不完整。要完整的使用 async 异步编程,你需要依赖以下特性和外部库:
- 必须的特质(例如
Future
)、类型和函数,由标准库提供实现 - 关键字
async/await
由 Rust 语言提供,并进行了编译器层面的支持 - 众多实用的类型、宏和函数由官方开发的 futures 包提供(不属于 std),它们可以用于任何 async 应用中。
- async 代码的执行、IO 操作、任务创建和调度等等复杂功能由社区的 async 运行时提供,例如 tokio 和 async-std
Rust 为异步增加了很多限制,导致异步编程中很容易遇到在同步中没见过的问题。
作为入门,先来看看最简单的async/await
。
第一件事是需要引入 futures 包:
[]
= "0.3"
async
使用async fn
语法来创建一个异步函数。异步函数的返回值是一个 Future
,若直接调用该函数,不会输出任何结果,因为Future
还未被执行,需要使用一个执行器(executor)。
// `block_on`会阻塞当前线程直到指定的`Future`执行完成,这种阻塞当前线程以等待任务完成的方式较为简单、粗暴,
// 好在其它运行时的执行器会提供更加复杂的行为,例如将多个`future`调度到同一个线程上执行。
use block_on;
async
block_on
执行器使得这段异步代码看起来就和同步代码一样。
await
如果要在一个async fn
函数中去调用另一个async fn
并等待其完成后再执行后续的代码,该如何做?
use block_on;
async
async
在async fn
函数中使用await
方法可以等待另一个异步调用的完成。await
并不会阻塞当前的线程,而是允许我们在同一个线程内并发地运行多个任务,而不是一个一个先后完成,最终实现了并发处理的效果。因此,我们在上面代码中使用同步的代码顺序实现了异步的执行效果,非常简单、高效,而且很好理解,未来也绝对不会有回调地狱的发生。
Future 的执行器和任务调度
Future
是异步函数的返回值,是异步函数执行的关键。因此,Future
特质是异步编程的核心。
Future 特质
Future
是一个是一个能产出值的异步计算。下面是一个简化的特质:
Future
需要被执行器轮询(poll)后才能运行。通过调用poll
方法,可以推进Future
的执行直到(Future
不保证轮询一次就执行完)。如果在当前轮询中Future
能够完成,就会返回一个Poll::Ready(result)
;反之则是Poll::Pending
加wake
。wake
函数的作用是,当未来Future
准备好进一步执行时,wake
将被调用,然后Future
的执行器会再次调用poll
方法,让Future
继续执行。也就是说,wake
能够让Future
主动通知执行器,让执行器精确地执行这个Future
,而不是不断进行全遍历。
来看一个例子:
// 为 SocketRead 结构体实现一个 Future
这种Future
模型允许将多个异步操作组合在一起,同时还无需任何内存分配。不仅仅如此,如果你需要同时运行多个Future
或链式调用多个Future
,也可以通过无内存分配的状态机实现:
/// 一个 SimpleFuture, 它使用顺序的方式,一个接一个地运行两个 Future
//
// 注意: 由于本例子用于演示,因此功能简单,`AndThenFut` 会假设两个 Future 在创建时就可用了.
// 而真实的`Andthen`允许根据第一个`Future`的输出来创建第二个`Future`,因此复杂的多。
上面是简化版的Future
。真实的Future
如下:
唤醒任务
Waker
提供了一个wake()
方法可以用于告诉执行器:相关的任务可以被唤醒了,此时执行器就可以对相应的Future
再次进行poll
操作。
想象一个简单场景:实现一个计时装置。当计时器创建时,启动一个线程并让该线程进入睡眠,等睡眠结束后再通知给Future
。
use ;
/// 在Future和等待的线程间共享状态
// 构建定时器和启动计时线程的 new 方法
此时有了一个定时器Future
,但是还没有调用者。自然的,调用者就是一个执行器。
执行器
Future
是懒的,不会主动启动。await
可以唤醒async
函数,但是await
本身也是一个async
函数的方法,最外层的async
函数就需要执行器来调用。
执行器(Executor)管理一批Future
,然后通过不停地poll
推动它们直到完成。执行器会先poll
一次,如果未能成功,后面就会等待Future
通过调用wake
函数来通知它可以继续,直到Future
完成。
执行器需要从一个消息通道(channel)中拉取事件,然后运行它们。当一个任务准备好后(可以继续执行),它会将自己放入消息通道中,然后等待执行器poll
。
use ;
/// 任务执行器,负责从通道中接收任务然后执行
/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中
/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
// spawn 方法生成 Future , 然后将它放入任务通道中:
// 为任务实现 ArcWake 特质,这样它们就能被转变成 Waker 然后被唤醒:
// 执行器将从通道中获取任务,然后进行 poll 执行
// 使用执行器运行定时器
执行器和 IO 复用
之前的 SocketRead 的例子中,Future
将从 Socket 读取数据,若当前还没有数据,则会让出当前线程的所有权,允许执行器去执行其它的Future
。当数据准备好后,会调用wake()
函数将该Future
的任务放入任务通道中,等待执行器的poll
。这个例子中有一个 callback 方法是关键,只有它知道 socket 中的数据已经可以被读取了。那它的原理是什么?
其中一个简单粗暴的方法就是使用一个新线程不停的检查socket
中是否有了数据,但 Rust 显然是不会这么干的,因为这样性能太低了。操作系统会提供 IO 多路复用机制,借助 IO 多路复用机制,可以实现一个线程同时阻塞地去等待多个异步 IO 事件,一旦某个事件完成就立即退出阻塞并返回数据。这是一个例子:
// rust 有一个 mio 包
let mut io_blocker = new;
io_blocker.add_io_event_interest;
io_blocker.add_io_event_interest;
let event = io_blocker.block;
// 当socket的数据可以读取时,打印 "Socket 1 is now READABLE"
println!;
这样,我们只需要一个执行器线程,它会接收 IO 事件并将其分发到对应的Waker
中,接着后者会唤醒相关的任务,最终通过执行器poll
后,任务可以顺利地继续执行, 这种 IO 读取流程可以不停的循环,直到 socket 关闭。
Pin
在之前的例子中看到过Pin
,它可以防止一个类型在内存中被移动。还有一个UnPin
表示类型可以在内存中安全地移动。
吐槽一下,这一小节简直难炸了,实在是恶心……辅助理解,可以看看这个 Rust 的 Pin 与 Unpin 和 Rust Async: Pin概念解析
Why Pin?
这段意义有限,为防止理解出现偏差,先折叠了
一个`async`会创建一个实现了`Future`的匿名类型,并提供了一个`poll`方法:let fut_one = /* ... */; // Future 1
let fut_two = /* ... */; // Future 2
async move
// `async { ... }`语句块创建的 `Future` 类型
// `async` 语句块可能处于的状态
正常情况下,当 poll 第一次被调用时,它会去查询 fut_one 的状态,若 fut_one 无法完成,则 poll 方法会返回。未来对 poll 的调用将从上一次调用结束的地方开始。该过程会一直持续,直到 Future 完成为止。
然而,如果我们的 async 语句块中使用了引用类型
async
这段代码会编译成下面的形式:
这里,ReadIntoBuf 拥有一个引用字段,指向了结构体的另一个字段 x ,一旦 AsyncFuture 被移动,那 x 的地址也将随之变化,此时对 x 的引用就变成了不合法的,也就是 read_into_buf_fut.buf 会变为不合法的。
若能将 Future 在内存中固定到一个位置,就可以避免这种问题的发生,也就可以安全的创建上面这种引用类型。
Unpin
大多数类型都自动实现了Unpin
特质,它表明一个类型可以随意被移动。Unpin
是一个标记特质(不定义任何行为),与之相对,Pin
是一个结构体:
// 包裹一个指针,并且能确保该指针指向的数据不会被移动
// !Unpin 不符合 Target: Unpin,故无法获取到可变引用
可以被Pin
住的类型会实现一个!Unpin
特质,这个特质说明类型没有实现Unpin
特质。对于实现了Unpin
的类型,还是可以使用Pin
,但是没有任何效果。因此,一个类型如果不能被移动,它必须实现!Unpin
特质。
理解 Pin
Pin
虽然名字叫“钉住”,但他的原理根本不是固定住。Pin
的作用就是在实现了!Unpin
的情况下阻止调用get_mut
。
Pin<&mut T>
,Pin<&T>
,Pin<Box<T>>
这样的数据结构,都能够确保 T 不会被移动。可以看出来,Pin
是一个智能指针,包裹一个值,这个指针无法安全地获取到可变引用,那自然就不可移动了。至于为什么获取不到,可以看下上一节中Pin
定义里DerefMut
方法的签名。
Pin
经常用于处理一些自引用的类型。自引用指的是了类型内部的某个成员是另一个成员的引用。来看一个例子:
// 自引用结构体,b 是 a 的一个引用
// Test 提供方法用于获取字段 a 和 b 的值的引用。
下面来用Pin
解决问题。注意,一旦类型实现了!Unpin
,那将它的值固定到栈上就是不安全的行为。
use Pin;
use PhantomPinned;
// fn main() {
// let mut test1 = Test::new("test1");
// // 如果忘记遮蔽,就可以 drop 掉 Pin,然后在生命周期结束后继续移动数据
// let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) };
// Test::init(test1_pin.as_mut());
// drop(test1_pin);
// println!(r#"test1.b points to "test1": {:?}..."#, test1.b);
// let mut test2 = Test::new("test2");
// mem::swap(&mut test1, &mut test2);
// println!("... and now it points nowhere: {:?}", test1.b);
// }
与栈不同,将一个!Unpin
类型的值固定到堆上会给予该值一个稳定的内存地址,它指向的堆中的值在Pin
后是无法被移动的,并且堆上的值在整个生命周期内都会被稳稳地固定住。
use Pin;
use PhantomPinned;
解冻 Future
async
函数返回的Future
默认就是!Unpin
的。在实际应用中,一些函数会要求它们处理的Future
是Unpin
的,此时需要这样:
Box::pin
, 创建一个Pin<Box<T>>
pin_utils::pin_mut!
, 创建一个Pin<&mut T>
use pin_mut; // `pin_utils` 可以在crates.io中找到
// 函数的参数是一个`Future`,但是要求该`Future`实现`Unpin`
let fut = async ;
// 下面代码报错: 默认情况下,`fut` 实现的是`!Unpin`,并没有实现`Unpin`
// execute_unpin_future(fut);
// 使用`Box`进行固定
let fut = async ;
let fut = Box pin;
execute_unpin_future; // OK
// 使用`pin_mut!`进行固定
let fut = async ;
pin_mut!;
execute_unpin_future; // OK
固定后获得的Pin<Box<T>>
和Pin<&mut T>
既可以用于Future
,又会自动实现Unpin
。
总结
- 若
T: Unpin
(Rust 类型的默认实现),那么Pin<'a, T>
跟&'a mut T
完全相同,也就是Pin
将没有任何效果, 该移动还是照常移动 - 绝大多数标准库类型都实现了
Unpin
,事实上,对于 Rust 中能遇到的绝大多数类型,该结论依然成立。其中一个例外就是:async/await
生成的Future
没有实现Unpin
- 可以通过以下方法为自己的类型添加 !Unpin 约束:
- 使用 std::marker::PhantomPinned
- 不稳定版有
impl !Unpin
的功能
- 可以将值固定到栈上,也可以固定到堆上
- 将
!Unpin
值固定到栈上需要使用unsafe
- 将
!Unpin
值固定到堆上无需unsafe
,可以通过Box
来简单的实现
- 将
- 当固定类型
T: !Unpin
时,你需要保证数据从被固定到被 drop 这段时期内,其内存不会变得非法或者被重用
异步的所有权和流处理
async/await
在遇到阻塞操作时会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。
生命周期
有两种方式可以使用async
:async fn
用于声明函数,async { ... }
用于声明语句块,它们会返回一个实现Future
特质的值。
async fn
函数如果拥有引用类型的参数,那它返回的Future
的生命周期就会被这些参数的生命周期所限制。也就是说,被引用的数据生命周期至少与引用它的Future一样长。
// 当 x 依然有效时, 该 Future 就必须继续等待
// 也就是说 x 必须比 Future 活得更久
async
// 上面的函数跟下面的函数是等价的:
+ 'a
若Future
被先存起来或发送到另一个任务或者线程,而不是调用await
,就可能存在问题了:
use Future;
async
常用的解决方法是使用具有静态生命周期的块:
use Future;
async
所有权移动
允许使用async move
关键字来将环境中变量的所有权转移到语句块内,就像闭包那样。
// 多个不同的 `async` 语句块可以访问同一个本地变量,只要它们在该变量的作用域内执行
async
// 由于 `async move` 会捕获环境中的变量,因此只有一个 `async move` 语句块可以访问该变量,无法跟其它代码实现对变量的共享
// 但是它也有非常明显的好处:变量可以转移到返回的 Future 中,不再受借用生命周期的限制
多线程执行器
如果执行器是多线程的,Future
内部的任何.await
都可能导致它被切换到一个新线程上去执行。这就是说,Future
可能会在线程间移动,因此async
块中的变量必须要能在线程间传递。这会导致Rc
、RefCell
、没有实现Send
的所有权类型、没有实现Sync
的引用类型都不安全,无法在.await
调用期间使用(不在作用域内还是可以用的)。同样的,普通的锁如Mutex
也不可用(线程池会死锁),需要使用futures::lock
来替代Mutex
完成任务。
流处理
Stream
特质类似于Future
特质,但是在完成前可以生成多个值,这种行为跟标准库中的Iterator
特质颇为相似。
消息通道的接收者经常能够用到Stream
:
async
当然也可以像Iterator
那样迭代一个Stream
,比如他支持map
,filter
,fold
方法,以及它们对应的try_map
,try_filter
,try_fold
。此外还有next
和try_next
:
async
async
但是一次处理一个值还叫什么并发。于是还有并发方法for_each_concurrent
和它对应的try_for_each_concurrent
:
async
多 Future 处理
await
只能排队完成Future
,如果希望同时运行多个任务,就要考虑其他方法。
join!
在上面的例子中已经见过了 futures 包中的join!
。join!
宏允许同时等待多个不同Future
的完成,并且可以让他们并发运行。
// Rust 中的 Future 是惰性的,直到调用 .await 时,才会开始运行
// 而两个 await 由于在代码中有先后顺序,因此是顺序运行的。
// async fn enjoy_book_and_music() -> (Book, Music) {
// let book_future = enjoy_book();
// let music_future = enjoy_music();
// (book_future.await, music_future.await)
// }
use join;
async
join!
会返回一个元组,里面的值是对应的Future
执行结束后输出的值。
如果希望同时运行一个数组里的多个异步任务,可以使用futures::future::join_all
方法。
try_join!
有了join!
自然就有try_join!
。如果希望在某一个Future
报错后就立即停止所有Future
的执行,就可以使用try_join!
,特别是当Future
返回Result
时:
use try_join;
async
async
async
注意,所有Future
都必须拥有相同的错误类型。如果错误类型不同,可以考虑使用来自futures::future::TryFutureExt
模块的map_err
和err_info
方法将错误转换成相同类型。
use ;
async
async
async
select!
join!
只有等所有Future
结束后,才能集中处理结果,如果想同时等待多个Future
,且任何一个Future
结束后,都可以立即被处理,可以考虑使用futures::select!
。
use ;
async
async
async
select!
在“选择”一个分支调用后会立即结束,不会等待其他任务的完成。
select!
也支持 complete 和 default 分支:
use future;
use select;
FuseFuture
之前的例子中出现过 fuse,这里将其展开。
.fuse
方法可以让Future
实现FusedFuture
特质,而pin_mut!
宏会为Future
实现Unpin
特质,这两个特质恰恰是使用select!
所必须的:
- Unpin:由于 select 不会通过拿走所有权的方式使用
Future
,而是通过可变引用的方式去使用,这样当 select 结束后,该Future
若没有被完成,它的所有权还可以继续被其它代码使用。注意,对于来自表达式的 future 可以放宽Unpin
的限制(会自动变换)。 - FusedFuture:
Future
一旦完成后,那 select 就不能再对其进行轮询使用。Fuse 意味着熔断,相当于Future
一旦完成,再次调用 poll 会直接返回 Poll::Pending。
只有实现了FusedFuture
,select 才能配合 loop 一起使用。假如没有实现,就算一个 Future 已经完成了,它依然会被 select 不停的轮询执行。
Stream
稍有不同,它们使用的特质是FusedStream
。通过 fuse 实现了该特质的 Stream,对其调用 next 或 try_next 方法可以获取实现了FusedFuture
特质的Future
:
use ;
async
select 时并发
Fuse::terminated()
函数可以构建一个空的Future
。
考虑以下场景:在 select 循环中运行一个任务,但是该任务却也是在 select 循环内部创建的。
use ;
async
async
async
当某个 future 有多个拷贝都需要同时运行时,可以使用FuturesUnordered
类型。下面的例子会将run_on_new_num_fut
的每一个拷贝都运行到完成,而不是像之前那样一旦创建新的就终止旧的。
use ;
async
async
// 使用从 `get_new_num` 获取的最新数字 来运行 `run_on_new_num`
//
// 每当计时器结束后,`get_new_num` 就会运行一次,它会立即取消当前正在运行的`run_on_new_num` ,
// 并且使用新返回的值来替换
async
异步疑难杂症
原文还有一节 Rust 异步疑难杂症,里面都是异步编程的限制。按我目前浅薄的见识根本理解不了。俺不中嘞,感兴趣的直接看原文吧。
阅读材料
Asynchronous Programming in Rust
-
异步编程的模型有一大堆,包括线程、协程、actor模型等等,具体可以看原文 ↩
📝 系列导航
- 上一篇: Rust 进阶学习笔记(七):多线程
- 下一篇: Rust 进阶学习笔记(九):强类型特性
- 合集列表