Waker로 타스크 깨우기

futures가 처음에 poll 되어서 일을 완성 할 수 없는 경우가 일반적 입니다. 이 경우 futures는 더 진전 될 준비가 되면 polling 되도록 해야 합니다. 이것은 Waker type으로 이루어집니다.

futures가 polling 될 때마다 "task"의 일부로 polling됩니다. Task는 executor에게 제출 된 최상위 future 입니다.

Waker는 executor에게 해당 task가 깨어나야 한다고 말해주는 wake()메소드를 제공 합니다. wake()가 호출되면 executor는 Waker와 관련된 작업이 진행될 준비가 되었음을 알고 future는 다시 폴링 됩니다.

Wakerclone()도 구현하여 복사하고 저장할 수 있습니다.

Waker를 사용하여 간단한 타이머 future를 구현해 봅시다.

응용 : 타이머 만들기

예제를 단순회하기 위해 타이머가 시작될 때 새 스레드를 실행 시킵니다. 필요한 시간 동안 sleep 모드로 전환 한 다음, 시간 구간이 경과했을 때 타이머에 신호를 보냅니다.

시작하면서 해야 할 imports는 다음과 같습니다.


#![allow(unused)]
fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
}

future type 자체를 정의하며 시작하겠습니다. 우리의 future는 타이머가 경과하고 future가 완료 되어야 함을 스레드에게 알려주어야 할 방법이 있어야 합니다. 공유 된 Arc<Mutex<.. >> 값을 사용하여 스레드와 future 간의 통신을 하도록 하겠습니다.

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}

이제, 실제로 Future를 구현하도록 하겠습니다!

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

아주 간단 하죠? 스레드가 shared_state.completed = true를 설정 한 경우 우리는 다한 것 입니다! 그렇지 않으면 현재 작업에 대한 Waker를 복제하여 스레드가 작업을 다시 시작할 수 있도록 shared_state.waker에게 넘겨주어야 합니다.

중요한 것은 future가 poll 될 때마다 Waker를 업데이트 해야 한다는 것입니다. future는 다른 Waker를 가지고 다른 작업으로 이동 했을 수 있기 때문 입니다. 이것은 poll된 이후의 작업들 사이에 futures가 전달 될 때 발생합니다.

마지막으로 실제로 타이머를 구성하고 스레드를 시작하려면 관련 API가 필요합니다.

impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

우와! 이것이 간단한 타이머 future를 만드는 데 필요한 전부입니다. 만약 우리가 future를 실행할 executor를 가지고 있다면 말이죠...