本文属于我的 Rust 学习笔记 系列,您现在看到的这段话是本系列的固定起始语。
Rust 入门学习笔记以实际例子为主,讲解部分不是从零开始的,所以不建议纯萌新观看,读者最好拥有任意一种面向对象语言的基础,然后自己多多少少看过 Rust 的基本语法,刷过一点 rustlings。
来源:原子之音。当然也包含个人的一些补充。
视频
代码
Rust 进阶学习笔记以及实战的来源则五花八门,将会标注在下一行⬇️。
本节出处:圣经-4.11异步编程
异步比多线程难多了,各种离谱的命名、离谱的设计给我看吐了,纯粹是靠强迫症坚持看下去的。后面会有一节是基于 tokio 的实践,强迫自己看完本节后再去看 tokio 洗洗眼睛能舒服一些。
异步简介
Rust 同时支持多线程和异步编程,并且选择了基于async/await的异步编程。Rust 的异步编程性能很高,可以认为没有额外消耗。
async 底层也是基于线程实现,但它基于线程封装了一个运行时,让多个任务映射到少量线程上,然后线程切换就变成了任务切换,这就变得十分高效。对于 IO 密集型任务,如果使用多线程,那么大多数线程都被阻塞,处于等待状态,一旦唤醒又有昂贵的上下文切换代价,此时使用 async 就十分合适。
一般地,建议这样选择:
- 有大量 IO 任务需要并发运行时,选 async 模型
- 有部分 IO 任务需要并发运行时,选多线程,如果想要降低线程创建和销毁的开销,可以使用线程池
- 有大量 CPU 密集任务需要并行运行时,例如并行计算,选多线程模型,且让线程数等于或者稍大于 CPU 核心数
- 无所谓时,统一选多线程
fn get_two_sites() {
let thread_one = thread::spawn(|| download("https://course.rs"));
let thread_two = thread::spawn(|| download("https://fancy.rs"));
thread_one.join().expect("thread one panicked");
thread_two.join().expect("thread two panicked");
}
async fn get_two_sites_async() {
let future_one = download_async("https://www.foo.com");
let future_two = download_async("https://www.bar.com");
join!(future_one, future_two);
}
事实上,async 和多线程并不是二选一,在同一应用中,可以根据情况两者一起使用。
异步入门
async 的底层实现非常复杂,且会导致编译后文件体积显著增加。因此 Rust 内置的异步特性并不完整。要完整的使用 async 异步编程,你需要依赖以下特性和外部库:
- 必须的特质(例如
Future)、类型和函数,由标准库提供实现
- 关键字
async/await由 Rust 语言提供,并进行了编译器层面的支持
- 众多实用的类型、宏和函数由官方开发的 futures 包提供(不属于 std),它们可以用于任何 async 应用中。
- async 代码的执行、IO 操作、任务创建和调度等等复杂功能由社区的 async 运行时提供,例如 tokio 和 async-std
Rust 为异步增加了很多限制,导致异步编程中很容易遇到在同步中没见过的问题。
作为入门,先来看看最简单的async/await。
第一件事是需要引入 futures 包:
[dependencies]
futures = "0.3"
async
使用async fn语法来创建一个异步函数。异步函数的返回值是一个 Future,若直接调用该函数,不会输出任何结果,因为Future还未被执行,需要使用一个执行器(executor)。
use futures::executor::block_on;
async fn hello_world() {
println!("hello, world!");
}
fn main() {
let future = hello_world(); block_on(future); }
block_on执行器使得这段异步代码看起来就和同步代码一样。
await
如果要在一个async fn函数中去调用另一个async fn并等待其完成后再执行后续的代码,该如何做?
use futures::executor::block_on;
async fn hello_world() {
hello_cat().await;
println!("hello, world!");
}
async fn hello_cat() {
println!("hello, kitty!");
}
fn main() {
let future = hello_world();
block_on(future);
}
在async fn函数中使用await方法可以等待另一个异步调用的完成。await并不会阻塞当前的线程,而是允许我们在同一个线程内并发地运行多个任务,而不是一个一个先后完成,最终实现了并发处理的效果。因此,我们在上面代码中使用同步的代码顺序实现了异步的执行效果,非常简单、高效,而且很好理解,未来也绝对不会有回调地狱的发生。
Future 的执行器和任务调度
Future是异步函数的返回值,是异步函数执行的关键。因此,Future特质是异步编程的核心。
Future 特质
Future是一个是一个能产出值的异步计算。下面是一个简化的特质:
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
Future需要被执行器轮询(poll)后才能运行。通过调用poll方法,可以推进Future的执行直到(Future不保证轮询一次就执行完)。如果在当前轮询中Future能够完成,就会返回一个Poll::Ready(result);反之则是Poll::Pending加wake。wake函数的作用是,当未来Future准备好进一步执行时,wake将被调用,然后Future的执行器会再次调用poll方法,让Future继续执行。也就是说,wake能够让Future主动通知执行器,让执行器精确地执行这个Future,而不是不断进行全遍历。
来看一个例子:
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
Poll::Ready(self.socket.read_buf())
} else {
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
这种Future模型允许将多个异步操作组合在一起,同时还无需任何内存分配。不仅仅如此,如果你需要同时运行多个Future或链式调用多个Future,也可以通过无内存分配的状态机实现:
pub struct AndThenFut<FutureA, FutureB> {
first: Option<FutureA>,
second: FutureB,
}
impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if let Some(first) = &mut self.first {
match first.poll(wake) {
Poll::Ready(()) => self.first.take(),
Poll::Pending => return Poll::Pending,
};
}
self.second.poll(wake)
}
}
上面是简化版的Future。真实的Future如下:
trait Future {
type Output;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
唤醒任务
Waker提供了一个wake()方法可以用于告诉执行器:相关的任务可以被唤醒了,此时执行器就可以对相应的Future再次进行poll操作。
想象一个简单场景:实现一个计时装置。当计时器创建时,启动一个线程并让该线程进入睡眠,等睡眠结束后再通知给Future。
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
struct SharedState {
completed: bool,
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
此时有了一个定时器Future,但是还没有调用者。自然的,调用者就是一个执行器。
执行器
Future是懒的,不会主动启动。await可以唤醒async函数,但是await本身也是一个async函数的方法,最外层的async函数就需要执行器来调用。
执行器(Executor)管理一批Future,然后通过不停地poll推动它们直到完成。执行器会先poll一次,如果未能成功,后面就会等待Future通过调用wake函数来通知它可以继续,直到Future完成。
执行器需要从一个消息通道(channel)中拉取事件,然后运行它们。当一个任务准备好后(可以继续执行),它会将自己放入消息通道中,然后等待执行器poll。
use {
futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
},
std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
},
timer_future::TimerFuture,
};
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("任务队列已满");
}
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("任务队列已满");
}
}
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
if future.as_mut().poll(context).is_pending() {
*future_slot = Some(future);
}
}
}
}
}
fn main() {
let (executor, spawner) = new_executor_and_spawner();
spawner.spawn(async {
println!("howdy!");
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
drop(spawner);
executor.run();
}
执行器和 IO 复用
之前的 SocketRead 的例子中,Future将从 Socket 读取数据,若当前还没有数据,则会让出当前线程的所有权,允许执行器去执行其它的Future。当数据准备好后,会调用wake()函数将该Future的任务放入任务通道中,等待执行器的poll。这个例子中有一个 callback 方法是关键,只有它知道 socket 中的数据已经可以被读取了。那它的原理是什么?
其中一个简单粗暴的方法就是使用一个新线程不停的检查socket中是否有了数据,但 Rust 显然是不会这么干的,因为这样性能太低了。操作系统会提供 IO 多路复用机制,借助 IO 多路复用机制,可以实现一个线程同时阻塞地去等待多个异步 IO 事件,一旦某个事件完成就立即退出阻塞并返回数据。这是一个例子:
struct IoBlocker {
}
struct Event {
id: usize,
signals: Signals,
}
impl IoBlocker {
fn new() -> Self { }
fn add_io_event_interest(
&self,
io_object: &IoObject,
event: Event,
) { }
fn block(&self) -> Event { }
}
let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
&socket_1,
Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
&socket_2,
Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();
println!("Socket {:?} is now {:?}", event.id, event.signals);
这样,我们只需要一个执行器线程,它会接收 IO 事件并将其分发到对应的Waker中,接着后者会唤醒相关的任务,最终通过执行器poll后,任务可以顺利地继续执行, 这种 IO 读取流程可以不停的循环,直到 socket 关闭。
Pin
在之前的例子中看到过Pin,它可以防止一个类型在内存中被移动。还有一个UnPin表示类型可以在内存中安全地移动。
吐槽一下,这一小节简直难炸了,实在是恶心……辅助理解,可以看看这个 Rust 的 Pin 与 Unpin 和 Rust Async: Pin概念解析
Why Pin?
这段意义有限,为防止理解出现偏差,先折叠了
一个`async`会创建一个实现了`Future`的匿名类型,并提供了一个`poll`方法:
let fut_one = ; let fut_two = ; async move {
fut_one.await;
fut_two.await;
}
struct AsyncFuture {
fut_one: FutOne,
fut_two: FutTwo,
state: State,
}
enum State {
AwaitingFutOne,
AwaitingFutTwo,
Done,
}
impl Future for AsyncFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.state {
State::AwaitingFutOne => match self.fut_one.poll(..) {
Poll::Ready(()) => self.state = State::AwaitingFutTwo,
Poll::Pending => return Poll::Pending,
}
State::AwaitingFutTwo => match self.fut_two.poll(..) {
Poll::Ready(()) => self.state = State::Done,
Poll::Pending => return Poll::Pending,
}
State::Done => return Poll::Ready(()),
}
}
}
}
正常情况下,当 poll 第一次被调用时,它会去查询 fut_one 的状态,若 fut_one 无法完成,则 poll 方法会返回。未来对 poll 的调用将从上一次调用结束的地方开始。该过程会一直持续,直到 Future 完成为止。
然而,如果我们的 async 语句块中使用了引用类型
async {
let mut x = [0; 128];
let read_into_buf_fut = read_into_buf(&mut x);
read_into_buf_fut.await;
println!("{:?}", x);
}
这段代码会编译成下面的形式:
struct ReadIntoBuf<'a> {
buf: &'a mut [u8], }
struct AsyncFuture {
x: [u8; 128],
read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}
这里,ReadIntoBuf 拥有一个引用字段,指向了结构体的另一个字段 x ,一旦 AsyncFuture 被移动,那 x 的地址也将随之变化,此时对 x 的引用就变成了不合法的,也就是 read_into_buf_fut.buf 会变为不合法的。
若能将 Future 在内存中固定到一个位置,就可以避免这种问题的发生,也就可以安全的创建上面这种引用类型。
Unpin
大多数类型都自动实现了Unpin特质,它表明一个类型可以随意被移动。Unpin是一个标记特质(不定义任何行为),与之相对,Pin是一个结构体:
#[stable(feature = "pin", since = "1.33.0")]
#[lang = "pin"]
#[fundamental]
#[repr(transparent)]
#[derive(Copy, Clone)]
pub struct Pin<P> {
pointer: P,
}
#[stable(feature = "pin", since = "1.33.0")]
impl<P: Deref> Deref for Pin<P> {
type Target = P::Target;
fn deref(&self) -> &P::Target {
Pin::get_ref(Pin::as_ref(self))
}
}
#[stable(feature = "pin", since = "1.33.0")]
impl<P: DerefMut<Target: Unpin>> DerefMut for Pin<P> {
fn deref_mut(&mut self) -> &mut P::Target {
Pin::get_mut(Pin::as_mut(self))
}
}
可以被Pin住的类型会实现一个!Unpin特质,这个特质说明类型没有实现Unpin特质。对于实现了Unpin的类型,还是可以使用Pin,但是没有任何效果。因此,一个类型如果不能被移动,它必须实现!Unpin特质。
理解 Pin
Pin虽然名字叫“钉住”,但他的原理根本不是固定住。Pin的作用就是在实现了!Unpin的情况下阻止调用get_mut。
Pin<&mut T>,Pin<&T>,Pin<Box<T>>这样的数据结构,都能够确保 T 不会被移动。可以看出来,Pin是一个智能指针,包裹一个值,这个指针无法安全地获取到可变引用,那自然就不可移动了。至于为什么获取不到,可以看下上一节中Pin定义里DerefMut方法的签名。
Pin经常用于处理一些自引用的类型。自引用指的是了类型内部的某个成员是另一个成员的引用。来看一个例子:
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
}
impl Test {
fn new(txt: &str) -> Self {
Test {
a: String::from(txt),
b: std::ptr::null(),
}
}
fn init(&mut self) {
let self_ref: *const String = &self.a;
self.b = self_ref;
}
fn a(&self) -> &str {
&self.a
}
fn b(&self) -> &String {
assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
unsafe { &*(self.b) }
}
}
fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test2 = Test::new("test2");
test2.init();
println!("a: {}, b: {}", test1.a(), test1.b());
std::mem::swap(&mut test1, &mut test2);
println!("a: {}, b: {}", test2.a(), test2.b());
test1.a = "I've totally changed now!".to_string();
println!("a: {}, b: {}", test2.a(), test2.b());
}
下面来用Pin解决问题。注意,一旦类型实现了!Unpin,那将它的值固定到栈上就是不安全的行为。
use std::pin::Pin;
use std::marker::PhantomPinned;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Self {
Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned,
}
}
fn init(self: Pin<&mut Self>) {
let self_ptr: *const String = &self.a;
let this = unsafe { self.get_unchecked_mut() };
this.b = self_ptr;
}
fn a(self: Pin<&Self>) -> &str {
&self.get_ref().a
}
fn b(self: Pin<&Self>) -> &String {
assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
unsafe { &*(self.b) }
}
}
pub fn main() {
let mut test1 = Test::new("test1");
let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
Test::init(test1.as_mut());
let mut test2 = Test::new("test2");
let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
Test::init(test2.as_mut());
println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref()));
std::mem::swap(test1.get_mut(), test2.get_mut());
println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref()));
}
与栈不同,将一个!Unpin类型的值固定到堆上会给予该值一个稳定的内存地址,它指向的堆中的值在Pin后是无法被移动的,并且堆上的值在整个生命周期内都会被稳稳地固定住。
use std::pin::Pin;
use std::marker::PhantomPinned;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Pin<Box<Self>> {
let t = Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned,
};
let mut boxed = Box::pin(t);
let self_ptr: *const String = &boxed.as_ref().a;
unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
boxed
}
fn a(self: Pin<&Self>) -> &str {
&self.get_ref().a
}
fn b(self: Pin<&Self>) -> &String {
unsafe { &*(self.b) }
}
}
pub fn main() {
let test1 = Test::new("test1");
let test2 = Test::new("test2");
println!("a: {}, b: {}", test1.as_ref().a(), test1.as_ref().b());
println!("a: {}, b: {}", test2.as_ref().a(), test2.as_ref().b());
}
解冻 Future
async函数返回的Future默认就是!Unpin的。在实际应用中,一些函数会要求它们处理的Future是Unpin的,此时需要这样:
Box::pin, 创建一个Pin<Box<T>>
pin_utils::pin_mut!, 创建一个Pin<&mut T>
use pin_utils::pin_mut;
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { }
let fut = async { };
let fut = async { };
let fut = Box::pin(fut);
execute_unpin_future(fut);
let fut = async { };
pin_mut!(fut);
execute_unpin_future(fut);
固定后获得的Pin<Box<T>>和Pin<&mut T>既可以用于Future,又会自动实现Unpin。
总结
- 若
T: Unpin(Rust 类型的默认实现),那么Pin<'a, T>跟&'a mut T完全相同,也就是Pin将没有任何效果, 该移动还是照常移动
- 绝大多数标准库类型都实现了
Unpin,事实上,对于 Rust 中能遇到的绝大多数类型,该结论依然成立。其中一个例外就是:async/await生成的Future没有实现Unpin
- 可以通过以下方法为自己的类型添加 !Unpin 约束:
- 使用 std::marker::PhantomPinned
- 不稳定版有
impl !Unpin的功能
- 可以将值固定到栈上,也可以固定到堆上
- 将
!Unpin值固定到栈上需要使用unsafe
- 将
!Unpin值固定到堆上无需unsafe,可以通过Box来简单的实现
- 当固定类型
T: !Unpin时,你需要保证数据从被固定到被 drop 这段时期内,其内存不会变得非法或者被重用
异步的所有权和流处理
async/await在遇到阻塞操作时会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。
生命周期
有两种方式可以使用async:async fn用于声明函数,async { ... }用于声明语句块,它们会返回一个实现Future特质的值。
async fn函数如果拥有引用类型的参数,那它返回的Future的生命周期就会被这些参数的生命周期所限制。也就是说,被引用的数据生命周期至少与引用它的Future一样长。
async fn foo(x: &u8) -> u8 { *x }
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
若Future被先存起来或发送到另一个任务或者线程,而不是调用await,就可能存在问题了:
use std::future::Future;
fn bad() -> impl Future<Output = u8> {
let x = 5;
borrow_x(&x) }
async fn borrow_x(x: &u8) -> u8 { *x }
常用的解决方法是使用具有静态生命周期的块:
use std::future::Future;
async fn borrow_x(x: &u8) -> u8 { *x }
fn good() -> impl Future<Output = u8> {
async {
let x = 5;
borrow_x(&x).await
}
}
所有权移动
允许使用async move关键字来将环境中变量的所有权转移到语句块内,就像闭包那样。
async fn blocks() {
let my_string = "foo".to_string();
let future_one = async {
println!("{my_string}");
};
let future_two = async {
println!("{my_string}");
};
let ((), ()) = futures::join!(future_one, future_two);
}
fn move_block() -> impl Future<Output = ()> {
let my_string = "foo".to_string();
async move {
println!("{my_string}");
}
}
多线程执行器
如果执行器是多线程的,Future内部的任何.await都可能导致它被切换到一个新线程上去执行。这就是说,Future可能会在线程间移动,因此async块中的变量必须要能在线程间传递。这会导致Rc、RefCell、没有实现Send的所有权类型、没有实现Sync的引用类型都不安全,无法在.await调用期间使用(不在作用域内还是可以用的)。同样的,普通的锁如Mutex也不可用(线程池会死锁),需要使用futures::lock来替代Mutex完成任务。
流处理
Stream特质类似于Future特质,但是在完成前可以生成多个值,这种行为跟标准库中的Iterator特质颇为相似。
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
消息通道的接收者经常能够用到Stream:
async fn send_recv() {
const BUFFER_SIZE: usize = 10;
let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
drop(tx);
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
当然也可以像Iterator那样迭代一个Stream,比如他支持map,filter,fold方法,以及它们对应的try_map,try_filter,try_fold。此外还有next和try_next:
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
但是一次处理一个值还叫什么并发。于是还有并发方法for_each_concurrent和它对应的try_for_each_concurrent:
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
多 Future 处理
await只能排队完成Future,如果希望同时运行多个任务,就要考虑其他方法。
join!
在上面的例子中已经见过了 futures 包中的join!。join!宏允许同时等待多个不同Future的完成,并且可以让他们并发运行。
use futures::join;
async fn enjoy_book_and_music() -> (Book, Music) {
let book_fut = enjoy_book();
let music_fut = enjoy_music();
join!(book_fut, music_fut)
}
join!会返回一个元组,里面的值是对应的Future执行结束后输出的值。
如果希望同时运行一个数组里的多个异步任务,可以使用futures::future::join_all方法。
try_join!
有了join!自然就有try_join!。如果希望在某一个Future报错后就立即停止所有Future的执行,就可以使用try_join!,特别是当Future返回Result时:
use futures::try_join;
async fn get_book() -> Result<Book, String> { Ok(Book) }
async fn get_music() -> Result<Music, String> { Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book();
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
注意,所有Future都必须拥有相同的错误类型。如果错误类型不同,可以考虑使用来自futures::future::TryFutureExt模块的map_err和err_info方法将错误转换成相同类型。
use futures::{
future::TryFutureExt,
try_join,
};
async fn get_book() -> Result<Book, ()> { Ok(Book) }
async fn get_music() -> Result<Music, String> { Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
select!
join!只有等所有Future结束后,才能集中处理结果,如果想同时等待多个Future,且任何一个Future结束后,都可以立即被处理,可以考虑使用futures::select!。
use futures::{
future::FutureExt, pin_mut,
select,
};
async fn task_one() { }
async fn task_two() { }
async fn race_tasks() {
let t1 = task_one().fuse();
let t2 = task_two().fuse();
pin_mut!(t1, t2);
select! {
() = t1 => println!("任务1率先完成"),
() = t2 => println!("任务2率先完成"),
}
}
select!在“选择”一个分支调用后会立即结束,不会等待其他任务的完成。
select!也支持 complete 和 default 分支:
use futures::future;
use futures::select;
pub fn main() {
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;
loop {
select! {
a = a_fut => total += a,
b = b_fut => total += b,
complete => break,
default => panic!(), };
}
assert_eq!(total, 10);
}
FuseFuture
之前的例子中出现过 fuse,这里将其展开。
.fuse方法可以让Future实现FusedFuture特质,而pin_mut!宏会为Future实现Unpin特质,这两个特质恰恰是使用select!所必须的:
- Unpin:由于 select 不会通过拿走所有权的方式使用
Future,而是通过可变引用的方式去使用,这样当 select 结束后,该Future若没有被完成,它的所有权还可以继续被其它代码使用。注意,对于来自表达式的 future 可以放宽Unpin的限制(会自动变换)。
- FusedFuture:
Future一旦完成后,那 select 就不能再对其进行轮询使用。Fuse 意味着熔断,相当于Future一旦完成,再次调用 poll 会直接返回 Poll::Pending。
只有实现了FusedFuture,select 才能配合 loop 一起使用。假如没有实现,就算一个 Future 已经完成了,它依然会被 select 不停的轮询执行。
Stream 稍有不同,它们使用的特质是FusedStream。通过 fuse 实现了该特质的 Stream,对其调用 next 或 try_next 方法可以获取实现了FusedFuture特质的Future:
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};
async fn add_two_streams(
mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
let mut total = 0;
loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
};
if let Some(next_num) = item {
total += next_num;
}
}
total
}
select 时并发
Fuse::terminated()函数可以构建一个空的Future。
考虑以下场景:在 select 循环中运行一个任务,但是该任务却也是在 select 循环内部创建的。
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { 5 }
async fn run_on_new_num(_: u8) { }
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
let get_new_num_fut = Fuse::terminated();
pin_mut!(run_on_new_num_fut, get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
},
() = run_on_new_num_fut => {},
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
当某个 future 有多个拷贝都需要同时运行时,可以使用FuturesUnordered类型。下面的例子会将run_on_new_num_fut的每一个拷贝都运行到完成,而不是像之前那样一旦创建新的就终止旧的。
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { 5 }
async fn run_on_new_num(_: u8) -> u8 { 5 }
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let mut run_on_new_num_futs = FuturesUnordered::new();
run_on_new_num_futs.push(run_on_new_num(starting_num));
let get_new_num_fut = Fuse::terminated();
pin_mut!(get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
run_on_new_num_futs.push(run_on_new_num(new_num));
},
res = run_on_new_num_futs.select_next_some() => {
println!("run_on_new_num_fut returned {:?}", res);
},
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
异步疑难杂症
原文还有一节 Rust 异步疑难杂症,里面都是异步编程的限制。按我目前浅薄的见识根本理解不了。俺不中嘞,感兴趣的直接看原文吧。
阅读材料
Asynchronous Programming in Rust