Rust異步之Future

2{icon} {views}

對異步的學習,我們先從Future開始,學習異步的實現原理。等理解了異步是怎麼實現的后,再學習Rust異步編程涉及的2個庫(futures、tokio)的時候就容易理解多了。

Future

rust中Future的定義如下,一個Future可以理解為一段供將來調度執行的代碼。我們為什麼需要異步呢,異步相比同步高效在哪裡呢?就是異步環境下,當前調用就緒時則執行,沒有就緒時則不等待任務就緒,而是返回一個Future,等待將來任務就緒時再調度執行。當然,這裏返回Future時關鍵的是要聲明事件什麼時候就緒,就緒后怎麼喚醒這個任務到調度器去調度執行。

#[must_use = "futures do nothing unless you `.await` or poll them"]
#[lang = "future_trait"]
pub trait Future {  // A future represents an asynchronous computation.
    type Output;
    /* The core method of future, poll, attempts to resolve the future into a final value. This method does not block if the value is not ready. Instead, the current task is scheduled to be woken up when it's possible to make further progress by polling again. */ 
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

可以看到執行后的返回結果,一個是就緒返回執行結果,另一個是未就緒待定。

#[must_use = "this `Poll` may be a `Pending` variant, which should be handled"]
pub enum Poll<T> {
    Ready(T),
    Pending,
}

可能到這裏你還是雲里霧裡,我們寫一段代碼,幫助你理解。完整代碼見:future_study

use futures;
use std::{future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration};

fn main() {
    // 我們現在還沒有實現調度器,所以要用一下futues庫里的一個調度器。
    futures::executor::block_on(TimerFuture::new(Duration::new(10, 0)));    
}

struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

// 我們想要實現一個定時器Future
pub struct TimerFuture {
    share_state: Arc<Mutex<SharedState>>,
}

// impl Future trait for TimerFuture.
impl Future for TimerFuture {
    type Output = ();
    // executor will run this poll ,and Context is to tell future how to wakeup the task.
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut share_state = self.share_state.lock().unwrap();
        if share_state.completed {
            println!("future ready. execute poll to return.");
            Poll::Ready(())
        } else {
            println!("future not ready, tell the future task how to wakeup to executor");
            // 你要告訴future,當事件就緒后怎麼喚醒任務去調度執行,而這個waker根具體的調度器有關
            // 調度器執行的時候會將上下文信息傳進來,裏面最重要的一項就是Waker
            share_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let share_state = Arc::new(Mutex::new(SharedState{completed:false, waker:None}));
        let thread_shared_state = share_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut share_state = thread_shared_state.lock().unwrap();
            share_state.completed = true;
            if let Some(waker) = share_state.waker.take() {
                println!("detect future is ready, wakeup the future task to executor.");
                waker.wake()    // wakeup the future task to executor.
            }
        });

        TimerFuture {share_state}
    }
}

執行結果如下:

future not ready, tell the future task how to wakeup to executor
detect future is ready, wakeup the future task to executor.
future ready. execute poll to return.

可以看到,剛開始的時候,定時10s事件還未完成,處在Pending狀態,這時要告訴這個任務後面就緒后怎麼喚醒去調度執行。等10s后,定時事件完成了,通過前面的設置的Waker,喚醒這個Future任務去調度執行。這裏,我們看一下ContextWaker是怎麼定義的:

/// The `Context` of an asynchronous task.
///
/// Currently, `Context` only serves to provide access to a `&Waker`
/// which can be used to wake the current task.
#[stable(feature = "futures_api", since = "1.36.0")]
pub struct Context<'a> {
    waker: &'a Waker,
    // Ensure we future-proof against variance changes by forcing
    // the lifetime to be invariant (argument-position lifetimes
    // are contravariant while return-position lifetimes are
    // covariant).
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

// A Waker is a handle for waking up a task by notifying its executor that it is ready to be run.
#[repr(transparent)]
#[stable(feature = "futures_api", since = "1.36.0")]
pub struct Waker {
    waker: RawWaker,
}

現在你應該對Future有新的理解了,上面的代碼,我們並沒有實現調度器,而是使用的futures庫中提供的一個調度器去執行,下面自己實現一個調度器,看一下它的原理。而在Rust中,真正要用的話,還是要學習tokio庫,這裏我們只是為了講述一下實現原理,以便於理解異步是怎麼一回事。完整代碼見:future_study, 關鍵代碼如下:

use std::{future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration};
use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
use futures::{future::{FutureExt, BoxFuture}, task::{ArcWake, waker_ref}};

use super::timefuture::*;

pub fn run_executor() {
    let (executor, spawner) = new_executor_and_spawner();
    // 將Future封裝成一個任務,分發到調度器去執行
    spawner.spawn( async {
        let v = TimerFuture::new(Duration::new(10, 0)).await;
        println!("return value: {}", v);
        v
    });

    drop(spawner);
    executor.run();
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    const MAX_QUEUE_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUE_TASKS);
    (Executor{ready_queue}, Spawner{task_sender})
}

// executor , received ready task to execute.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

impl Executor {
    // 實際運行具體的Future任務,不斷的接收Future task執行。
    fn run(&self) {
        let mut count = 0;
        while let Ok(task) = self.ready_queue.recv() {
            count = count + 1;
            println!("received task. {}", count);
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                if let Poll::Pending = future.as_mut().poll(context) {
                    *future_slot = Some(future);
                    println!("executor run the future task, but is not ready, create a future again.");
                } else {
                    println!("executor run the future task, is ready. the future task is done.");
                }
            }
        }
    }
}

// 負責將一個Future封裝成一個Task,分發到調度器去執行。
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

impl Spawner {
    // encapsul a future object to task , wakeup to executor.
    fn spawn(&self, future: impl Future<Output = String> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        println!("first dispatch the future task to executor.");
        self.task_sender.send(task).expect("too many tasks queued.");
    }
}

// 等待調度執行的Future任務,這個任務必須要實現ArcWake,表明怎麼去喚醒任務去調度執行。
struct Task {
    future: Mutex<Option<BoxFuture<'static, String>>>,
    task_sender: SyncSender<Arc<Task>>,
}

impl ArcWake for Task {
    // A way of waking up a specific task.
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let clone = arc_self.clone();
        arc_self.task_sender.send(clone).expect("too many tasks queued");
    }
}

運行結果如下:

first dispatch the future task to executor.     
received task. 1                                
future not ready, tell the future task how to wakeup to executor
executor run the future task, but is not ready, create a future again.
detect future is ready, wakeup the future task to executor.     
received task. 2
future ready. execute poll to return.
return value: timer done.
executor run the future task, is ready. the future task is done.

第一次調度的時候,因為還沒有就緒,在Pending狀態,告訴這個任務,後面就緒是怎麼喚醒該任務。然後當事件就緒的時候,因為前面告訴了如何喚醒,按方法喚醒了該任務去調度執行。其實,在實際應用場景中,難的地方還在於,你怎麼知道什麼時候事件就緒,去喚醒任務,我們很容易聯想到Linux系統的epoll,tokio等底層,也是基於epoll實現的。通過epoll,我們就能方便的知道事件什麼時候就緒了。

參考資料

主要學習資料如下:

  • Asynchronous Programming in Rust
  • Futures Explained in 200 Lines of Rust
  • 200行代碼講透RUST FUTURES

上面的文章主要是學習異步的實現原理,理解異步是怎麼實現的,而進行Rust異步編程時的具體實現,則主要依賴下面2個庫:

  • future —— 主要完成了對異步的抽象
  • tokio —— 異步Future運行時

學習這兩個庫的時候,一定要注意版本問題,這兩個庫最近變化的比較快,一定要學最新的。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計最專業,超強功能平台可客製化

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※回頭車貨運收費標準

※推薦評價好的iphone維修中心

※教你寫出一流的銷售文案?

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

台中搬家公司費用怎麼算?