中国建设银行网站查工资,深圳关键词推广,上海集团平台,荆州百度推广文章目录 Async 编程简介async/.await 简单入门 Future 执行器与任务调度Future 特征使用 Waker 来唤醒任务构建一个定时器执行器 Executor构建执行器 完整代码 Async 编程简介
OS 线程, 它最简单#xff0c;也无需改变任何编程模型(业务/代码逻辑)#xff0c;因此非常适合作… 文章目录 Async 编程简介async/.await 简单入门 Future 执行器与任务调度Future 特征使用 Waker 来唤醒任务构建一个定时器执行器 Executor构建执行器 完整代码 Async 编程简介
OS 线程, 它最简单也无需改变任何编程模型(业务/代码逻辑)因此非常适合作为语言的原生并发模型我们在多线程章节也提到过Rust 就选择了原生支持线程级的并发编程。但是这种模型也有缺点例如线程间的同步将变得更加困难线程间的上下文切换损耗较大。使用线程池在一定程度上可以提升性能但是对于 IO 密集的场景来说线程池还是不够。事件驱动(Event driven),如果说事件驱动常常跟回调( Callback )一起使用相信大家就恍然大悟了。这种模型性能相当的好但最大的问题就是存在回调地狱的风险非线性的控制流和结果处理导致了数据流向和错误传播变得难以掌控还会导致代码可维护性和可读性的大幅降低。协程(Coroutines) Go 语言的协程设计就非常优秀这也是 Go 语言能够迅速火遍全球的杀手锏之一。协程跟线程类似无需改变编程模型同时它也跟 async 类似可以支持大量的任务并发运行。但协程抽象层次过高导致用户无法接触到底层的细节这对于系统编程语言和自定义异步运行时是难以接受的actor 模型是 erlang 的杀手锏之一它将所有并发计算分割成一个一个单元这些单元被称为 actor , 单元之间通过消息传递的方式进行通信和数据传递跟分布式系统的设计理念非常相像。由于 actor 模型跟现实很贴近因此它相对来说更容易实现但是一旦遇到流控制、失败重试等场景时就会变得不太好用async/await 该模型性能高还能支持底层编程同时又像线程和协程那样无需过多的改变编程模型但有得必有失async 模型的问题就是内部实现机制过于复杂对于用户来说理解和使用起来也没有线程和协程简单好在前者的复杂性开发者们已经帮我们封装好而理解和使用起来不够简单。
对于长时间运行的 CPU 密集型任务例如并行计算使用线程将更有优势。 这种密集任务往往会让所在的线程持续运行任何不必要的线程切换都会带来性能损耗因此高并发反而在此时成为了一种多余。同时你所创建的线程数应该等于 CPU 核心数充分利用 CPU 的并行能力甚至还可以将线程绑定到 CPU 核心上进一步减少线程上下文切换。
而高并发更适合 IO 密集型任务例如 web 服务器、数据库连接等等网络服务因为这些任务绝大部分时间都处于等待状态如果使用多线程那线程大量时间会处于无所事事的状态再加上线程上下文切换的高昂代价让多线程做 IO 密集任务变成了一件非常奢侈的事。而使用async既可以有效的降低 CPU 和内存的负担又可以让大量的任务并发的运行一个任务一旦处于IO或者其他等待(阻塞)状态就会被立刻切走并执行另一个任务而这里的任务切换的性能开销要远远低于使用多线程时的线程上下文切换。
async/.await 简单入门
async/.await 是 Rust 内置的语言特性可以让我们用同步的方式去编写异步的代码。 通过 async 标记的语法块会被转换成实现了Future特征的状态机。 与同步调用阻塞当前线程不同当Future执行并遇到阻塞时它会让出当前线程的控制权这样其它的Future就可以在该线程中运行这种方式完全不会导致当前线程的阻塞。 下面我们来通过例子学习 async/.await 关键字该如何使用在开始之前需要先引入 futures 包。编辑 Cargo.toml 文件并添加以下内容:
[dependencies]
futures 0.3// block_on会阻塞当前线程直到指定的Future执行完成这种阻塞当前线程以等待任务完成的方式较为简单、粗暴
// 好在其它运行时的执行器(executor)会提供更加复杂的行为例如将多个future调度到同一个线程上执行。
use futures::executor::block_on;async fn hello_world() {hello_cat().await;//等待异步方法完成println!(hello, world!);
}async fn hello_cat() {println!(hello, kitty!);
}
fn main() {let future hello_world(); // 返回一个Future, 因此不会打印任何输出block_on(future); // 执行Future并等待其运行完成此时hello, world!会被打印输出
}总之在async fn函数中使用.await可以等待另一个异步调用的完成。但是与block_on不同.await并不会阻塞当前的线程而是异步的等待Future A的完成在等待的过程中该线程还可以继续执行其它的Future B最终实现了并发处理的效果。
Future 执行器与任务调度
Future 特征
首先来给出 Future 的定义它是一个能产出值的异步计算(虽然该值可能为空例如 () )。光看这个定义一个简化版的 Future 特征:
trait SimpleFuture {// 设置关联类型type Output;// 输出计算完成的结果或者输出Pending表示本次不能够完成计算fn poll(mut self, wake: fn()) - PollSelf::Output;
}// Future 需要被执行器poll(轮询)后才能运行并不能保证在一次 poll 中就被执行完
enum PollT {Ready(T),Pending,
}若在当前 poll 中 Future 可以被完成则会返回 Poll::Ready(result) 反之则返回 Poll::Pending 并且安排一个 wake 函数当未来 Future 准备好进一步执行时 该函数会被调用然后管理该 Future 的执行器会再次调用 poll 方法此时 Future 就可以继续执行了。
用一个例子来说明下。考虑一个需要从 socket 读取数据的场景如果有数据可以直接读取数据并返回 Poll::Ready(data) 但如果没有数据Future 会被阻塞且不会再继续执行此时它会注册一个 wake 函数当 socket 数据准备好时该函数将被调用以通知执行器我们的 Future 已经准备好了可以继续执行。
pub struct SocketReada {socket: a Socket,
}impl SimpleFuture for SocketRead_ {type Output Vecu8;fn poll(mut self, wake: fn()) - PollSelf::Output {if self.socket.has_data_to_read() {// socket有数据写入buffer中并返回Poll::Ready(self.socket.read_buf())} else {// socket中还没数据// 注册一个wake函数当数据可用时该函数会被调用// 然后当前Future的执行器会再次调用poll方法此时就可以读取到数据self.socket.set_readable_callback(wake);Poll::Pending}}
}
Future 模型允许将多个异步操作组合在一起同时还无需任何内存分配。不仅仅如此如果你需要同时运行多个 Future或链式调用多个 Future 也可以通过无内存分配的状态机实现例如 trait SimpleFuture {// 关联类型是 trait 定义中的类型占位符。定义的时候并不定义它的具体的类型是什么。在 impl 这个 trait 的时候才为这个关联类型赋予确定的类型。type Output;//关联类型fn poll(mut self, wake: fn()) - PollSelf::Output;
}enum PollT {Ready(T),Pending,
}/// 一个SimpleFuture它会并发地运行两个Future直到它们完成
///
/// 之所以可以并发是因为两个Future的轮询可以交替进行一个阻塞另一个就可以立刻执行反之亦然
pub struct JoinFutureA, FutureB {// 结构体的每个字段都包含一个Future可以运行直到完成.// 等到Future完成后字段会被设置为 None. 这样Future完成后就不会再被轮询a: OptionFutureA,b: OptionFutureB,
}implFutureA, FutureB SimpleFuture for JoinFutureA, FutureB
whereFutureA: SimpleFutureOutput (),FutureB: SimpleFutureOutput (),
{type Output ();fn poll(mut self, wake: fn()) - PollSelf::Output {// 尝试去完成一个 Future aif let Some(a) mut self.a {if let Poll::Ready(()) a.poll(wake) {self.a.take();}}// 尝试去完成一个 Future bif let Some(b) mut self.b {if let Poll::Ready(()) b.poll(wake) {self.b.take();}}if self.a.is_none() self.b.is_none() {// 两个 Future都已完成 - 我们可以成功地返回了Poll::Ready(())} else {// 至少还有一个 Future 没有完成任务因此返回 Poll::Pending.// 当该 Future 再次准备好时通过调用wake()函数来继续执行Poll::Pending}}
}上面代码展示了如何同时运行多个 Future 且在此过程中没有任何内存分配让并发编程更加高效。 类似的多个Future也可以一个接一个的连续运行
/// 一个SimpleFuture, 它使用顺序的方式一个接一个地运行两个Future
//
// 而真实的Andthen允许根据第一个Future的输出来创建第二个Future因此复杂的多。
pub struct AndThenFutFutureA, FutureB {first: OptionFutureA,second: FutureB,
}implFutureA, FutureB SimpleFuture for AndThenFutFutureA, FutureB
whereFutureA: SimpleFutureOutput (),FutureB: SimpleFutureOutput (),
{type Output ();fn poll(mut self, wake: fn()) - PollSelf::Output {if let Some(first) mut self.first {match first.poll(wake) {// 我们已经完成了第一个 Future 可以将它移除 然后准备开始运行第二个Poll::Ready(()) self.first.take(),// 第一个 Future 还不能完成Poll::Pending return Poll::Pending,};}// 运行到这里说明第一个Future已经完成尝试去完成第二个self.second.poll(wake)}
}
这些例子展示了在不需要内存对象分配以及深层嵌套回调的情况下该如何使用 Future 特征去表达异步控制流。 在了解了基础的控制流后我们再来看看真实的 Future 特征有何不同之处。
trait Future {type Output;fn poll(// 首先值得注意的地方是self的类型从mut self变成了Pinmut Self:self: Pinmut Self,// 其次将wake: fn() 修改为 cx: mut Context_:cx: mut Context_,) - PollSelf::Output;
}
首先这里多了一个 Pin 现在你只需要知道使用它可以创建一个无法被移动的 Future 因为无法被移动所以它将具有固定的内存地址意味着我们可以存储它的指针(如果内存地址可能会变动那存储指针地址将毫无意义)也意味着可以实现一个自引用数据结构: struct MyFut { a: i32, ptr_to_a: *const i32 }。
其次从 wake: fn() 变成了 mut Context’_ 。意味着 wake 函数可以携带数据了。
使用 Waker 来唤醒任务
对于 Future 来说第一次被 poll 时无法完成任务是很正常的。但它需要确保在未来一旦准备好时可以通知执行器再次对其进行 poll 进而继续往下执行该通知就是通过 Waker 类型完成的。
Waker 提供了一个 wake() 方法可以用于告诉执行器相关的任务可以被唤醒了此时执行器就可以对相应的 Future 再次进行 poll 操作。
构建一个定时器
下面一起来实现一个简单的定时器 Future 。为了让例子尽量简单当计时器创建时我们会启动一个线程接着让该线程进入睡眠等睡眠结束后再通知给 Future 。
注意本例子还会在后面继续使用因此我们重新创建一个工程来演示使用 cargo new --lib timer_future 来创建一个新工程在 lib 包的根路径 src/lib.rs 中添加以下内容
use std::{future::Future,pin::Pin,sync::{Arc, Mutex},task::{Context, Poll, Waker},thread,time::Duration,
};继续来实现 Future 定时器之前提到: 新建线程在睡眠结束后会需要将状态同步给定时器 Future 由于是多线程环境我们需要使用 Arc// 来作为一个共享状态用于在新线程和 Future 定时器间共享。
pub struct TimerFuture {// Arc是一种能够使得数据在线程间安全共享的智能指针.它的工作方式从本质上来讲是对将要共享的数据进行包装并表现为此数据的一个指针。// Arc会追踪这个指针的所有拷贝当最后一份拷贝离开作用域时它就会安全释放内存。shared_state: ArcMutexSharedState,
}/// 在Future和等待的线程间共享状态
struct SharedState {/// 定时(睡眠)是否结束completed: bool,/// 当睡眠结束后线程可以用waker通知TimerFuture来唤醒任务waker: OptionWaker,
}下面给出 Future 的具体实现:
impl Future for TimerFuture {type Output ();// 函数没有返回值那么返回一个 ()// 通过 ; 结尾的表达式返回一个 ()fn poll(self: Pinmut Self, cx: mut Context_) - PollSelf::Output {// 通过检查共享状态来确定定时器是否已经完成let mut shared_state self.shared_state.lock().unwrap();if shared_state.completed {// 计算完成弹出计算数据Poll::Ready(())} else {// 设置waker这样新线程在睡眠(计时)结束后可以唤醒当前的任务接着再次对Future进行poll操作,// 下面的clone每次被poll时都会发生一次实际上应该是只clone一次更加合理。// 选择每次都clone的原因是 TimerFuture可以在执行器的不同任务间移动如果只克隆一次// 那么获取到的waker可能已经被篡改并指向了其它任务最终导致执行器运行了错误的任务shared_state.waker Some(cx.waker().clone());// 设置pending状态Poll::Pending}}
}代码很简单只要新线程设置了 shared_state.completed true 那任务就能顺利结束。如果没有设置会为当前的任务克隆一份 Waker 这样新线程就可以使用它来唤醒当前的任务。
最后再来创建一个 API 用于构建定时器和启动计时线程:
impl TimerFuture {/// 创建一个新的TimerFuture在指定的时间结束后该Future可以完成pub fn new(duration: Duration) - Self {let shared_state Arc::new(Mutex::new(SharedState {completed: false,waker: None,}));// 创建新线程let thread_shared_state shared_state.clone();thread::spawn(move || {// 睡眠指定时间实现计时功能thread::sleep(duration);let mut shared_state thread_shared_state.lock().unwrap();// 通知执行器定时器已经完成可以继续poll对应的Future了shared_state.completed true;if let Some(waker) shared_state.waker.take() {waker.wake()}});TimerFuture { shared_state }}
}
执行器 Executor
Rust 的 Future 是惰性的。其中一个推动它的方式就是在 async 函数中使用 .await 来调用另一个 async 函数但是这个只能解决 async 内部的问题那么这些最外层的 async 函数由执行器 executor控制 。
执行器会管理一批 Future (最外层的 async 函数)然后通过不停地 poll 推动它们直到完成。 最开始执行器会先 poll 一次 Future 后面就不会主动去 poll 了而是等待 Future 通过调用 wake 函数来通知它可以继续它才会继续去 poll 。这种 wake 通知然后 poll 的方式会不断重复直到 Future 完成。
构建执行器
下面我们将实现一个简单的执行器它可以同时并发运行多个 Future 。例子中需要用到 futures 包的 ArcWake 特征它可以提供一个方便的途径去构建一个 Waker 。编辑 Cargo.toml 添加下面依赖:
[dependencies]
futures 0.3在之前的内容中我们在 src/lib.rs 中创建了定时器 Future 现在在 src/main.rs 中来创建程序的主体内容开始之前先引入所需的包 use {futures::{future::{BoxFuture, FutureExt},task::{waker_ref, ArcWake},},std::{future::Future,sync::mpsc::{sync_channel, Receiver, SyncSender},sync::{Arc, Mutex},task::{Context, Poll},time::Duration,},// 引入之前实现的定时器模块timer_future::TimerFuture,
};执行器需要从一个消息通道( channel )中拉取事件然后运行它们。当一个任务准备好后可以继续执行它会将自己放入消息通道中然后等待执行器 poll 。
/// 任务执行器负责从通道中接收任务然后执行
struct Executor {ready_queue: ReceiverArcTask,
}/// Spawner负责创建新的Future然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {task_sender: SyncSenderArcTask,
}/// 一个Future它可以调度自己(将自己放入任务通道中)然后等待执行器去poll
struct Task {/// 进行中的Future在未来的某个时间点会被完成////// 按理来说Mutex在这里是多余的因为我们只有一个线程来执行任务。但是由于/// Rust并不聪明它无法知道Future只会在一个线程内被修改并不会被跨线程修改。因此/// 我们需要使用Mutex来满足这个笨笨的编译器对线程安全的执着。////// 如果是生产级的执行器实现不会使用Mutex因为会带来性能上的开销取而代之的是使用UnsafeCellfuture: MutexOptionBoxFuturestatic, (),/// 可以将该任务自身放回到任务通道中等待执行器的polltask_sender: SyncSenderArcTask,
}fn new_executor_and_spawner() - (Executor, Spawner) {// 任务通道允许的最大缓冲数(任务队列的最大长度)// 当前的实现仅仅是为了简单在实际的执行中并不会这么使用const MAX_QUEUED_TASKS: usize 10_000;let (task_sender, ready_queue) sync_channel(MAX_QUEUED_TASKS);(Executor { ready_queue }, Spawner { task_sender })
}下面再来添加一个方法用于生成 Future , 然后将它放入任务通道中:
impl Spawner {fn spawn(self, future: impl FutureOutput () static Send) {let future future.boxed();let task Arc::new(Task {future: Mutex::new(Some(future)),task_sender: self.task_sender.clone(),});self.task_sender.send(task).expect(任务队列已满);}
}在执行器 poll 一个 Future 之前首先需要调用 wake 方法进行唤醒然后再由 Waker 负责调度该任务并将其放入任务通道中。创建 Waker 的最简单的方式就是实现 ArcWake 特征先来为我们的任务实现 ArcWake 特征这样它们就能被转变成 Waker 然后被唤醒:
impl ArcWake for Task {fn wake_by_ref(arc_self: ArcSelf) {// 通过发送任务到任务管道的方式来实现wake这样wake后任务就能被执行器polllet cloned arc_self.clone();arc_self.task_sender.send(cloned).expect(任务队列已满);}
}当任务实现了 ArcWake 特征后它就变成了 Waker 在调用 wake() 对其唤醒后会将任务复制一份所有权( Arc )然后将其发送到任务通道中。最后我们的执行器将从通道中获取任务然后进行 poll 执行
impl Executor {fn run(self) {while let Ok(task) self.ready_queue.recv() {// 获取一个future若它还没有完成(仍然是Some不是None)则对它进行一次poll并尝试完成它let mut future_slot task.future.lock().unwrap();if let Some(mut future) future_slot.take() {// 基于任务自身创建一个 LocalWakerlet waker waker_ref(task);let context mut Context::from_waker(*waker);// BoxFutureT是PinBoxdyn FutureOutput T Send static的类型别名// 通过调用as_mut方法可以将上面的类型转换成Pinmut dyn Future Send staticif future.as_mut().poll(context).is_pending() {// Future还没执行完因此将它放回任务中等待下次被poll*future_slot Some(future);}}}}
}恭喜我们终于拥有了自己的执行器下面再来写一段代码使用该执行器去运行之前的定时器 Future
fn main() {let (executor, spawner) new_executor_and_spawner();// 生成一个任务spawner.spawn(async {println!(howdy!);// 创建定时器Future并等待它完成TimerFuture::new(Duration::new(2, 0)).await;println!(done!);});// drop掉任务这样执行器就知道任务已经完成不会再有新的任务进来drop(spawner);// 运行执行器直到任务队列为空// 任务运行后会先打印howdy!, 暂停2秒接着打印 done!executor.run();
}完整代码
main.rs
use {futures::{future::{BoxFuture, FutureExt},task::{waker_ref, ArcWake},},std::{future::Future,sync::mpsc::{sync_channel, Receiver, SyncSender},sync::{Arc, Mutex},task::{Context, Poll},time::Duration,},// 引入之前实现的定时器模块timer_future::TimerFuture,
};/// 任务执行器负责从通道中接收任务然后执行
struct Executor {ready_queue: ReceiverArcTask,
}/// Spawner负责创建新的Future然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {task_sender: SyncSenderArcTask,
}/// 一个Future它可以调度自己(将自己放入任务通道中)然后等待执行器去poll
struct Task {/// 进行中的Future在未来的某个时间点会被完成////// 按理来说Mutex在这里是多余的因为我们只有一个线程来执行任务。但是由于/// Rust并不聪明它无法知道Future只会在一个线程内被修改并不会被跨线程修改。因此/// 我们需要使用Mutex来满足这个笨笨的编译器对线程安全的执着。////// 如果是生产级的执行器实现不会使用Mutex因为会带来性能上的开销取而代之的是使用UnsafeCellfuture: MutexOptionBoxFuturestatic, (),/// 可以将该任务自身放回到任务通道中等待执行器的polltask_sender: SyncSenderArcTask,
}fn new_executor_and_spawner() - (Executor, Spawner) {// 任务通道允许的最大缓冲数(任务队列的最大长度)// 当前的实现仅仅是为了简单在实际的执行中并不会这么使用const MAX_QUEUED_TASKS: usize 10_000;let (task_sender, ready_queue) sync_channel(MAX_QUEUED_TASKS);//设置缓冲通道(Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {// 派生任务函数fn spawn(self, future: impl FutureOutput () static Send) {let future future.boxed();let task Arc::new(Task {future: Mutex::new(Some(future)),task_sender: self.task_sender.clone(),});self.task_sender.send(task).expect(任务队列已满);}
}
impl ArcWake for Task {fn wake_by_ref(arc_self: ArcSelf) {// 通过发送任务到任务管道的方式来实现wake这样wake后任务就能被执行器polllet cloned arc_self.clone();arc_self.task_sender.send(cloned).expect(任务队列已满);}
}
impl Executor {fn run(self) {while let Ok(task) self.ready_queue.recv() {// 获取一个future若它还没有完成(仍然是Some不是None)则对它进行一次poll并尝试完成它let mut future_slot task.future.lock().unwrap();if let Some(mut future) future_slot.take() {// 基于任务自身创建一个 LocalWakerlet waker waker_ref(task);let context mut Context::from_waker(*waker);// BoxFutureT是PinBoxdyn FutureOutput T Send static的类型别名// 通过调用as_mut方法可以将上面的类型转换成Pinmut dyn Future Send staticif future.as_mut().poll(context).is_pending() {// Future还没执行完因此将它放回任务中等待下次被poll*future_slot Some(future);}}}}
}fn main() {let (executor, spawner) new_executor_and_spawner();// 生成一个任务spawner.spawn(async {println!(howdy!);// 创建定时器Future并等待它完成TimerFuture::new(Duration::new(2, 0)).await;println!(done!);});// drop掉任务这样执行器就知道任务已经完成不会再有新的任务进来drop(spawner);// 运行执行器直到任务队列为空// 任务运行后会先打印howdy!, 暂停2秒接着打印 done!executor.run();
}lib.rs
use std::{future::Future,pin::Pin,sync::{Arc, Mutex},task::{Context, Poll, Waker},thread,time::Duration,
};
pub struct TimerFuture {// Arc是一种能够使得数据在线程间安全共享的智能指针.它的工作方式从本质上来讲是对将要共享的数据进行包装并表现为此数据的一个指针。// Arc会追踪这个指针的所有拷贝当最后一份拷贝离开作用域时它就会安全释放内存。shared_state: ArcMutexSharedState,
}/// 在Future和等待的线程间共享状态
struct SharedState {/// 定时(睡眠)是否结束completed: bool,/// 当睡眠结束后线程可以用waker通知TimerFuture来唤醒任务waker: OptionWaker,
}
impl Future for TimerFuture {type Output ();// 函数没有返回值那么返回一个 ()// 通过 ; 结尾的表达式返回一个 ()fn poll(self: Pinmut Self, cx: mut Context_) - PollSelf::Output {// 通过检查共享状态来确定定时器是否已经完成let mut shared_state self.shared_state.lock().unwrap();if shared_state.completed {// 计算完成弹出计算数据Poll::Ready(())} else {// 设置waker这样新线程在睡眠(计时)结束后可以唤醒当前的任务接着再次对Future进行poll操作,// 下面的clone每次被poll时都会发生一次实际上应该是只clone一次更加合理。// 选择每次都clone的原因是 TimerFuture可以在执行器的不同任务间移动如果只克隆一次// 那么获取到的waker可能已经被篡改并指向了其它任务最终导致执行器运行了错误的任务shared_state.waker Some(cx.waker().clone());// 设置pending状态Poll::Pending}}
}impl TimerFuture {/// 创建一个新的TimerFuture在指定的时间结束后该Future可以完成pub fn new(duration: Duration) - Self {// 设置状态和唤醒函数let shared_state Arc::new(Mutex::new(SharedState {completed: false,waker: None,}));// 创建新线程let thread_shared_state shared_state.clone();// thread::spawn 函数的返回值类型是 JoinHandle 通过 JoinHandle 来等待所有线程完成就可以解决上面执行不完的问题。thread::spawn(move || { //move 闭包通常和 thread::spawn 函数一起使用它允许你使用其它线程的数据。创建线程时把值的所有权从一个线程转移到另一个线程。// 睡眠指定时间实现计时功能thread::sleep(duration);// 线程加锁let mut shared_state thread_shared_state.lock().unwrap();// 通知执行器定时器已经完成可以继续poll对应的Future了shared_state.completed true;if let Some(waker) shared_state.waker.take() {waker.wake()}});// 返回创建的对象TimerFuture { shared_state }}
}