Executor와 시스템 IO

앞의 Future Trait 섹션에서, 우리는 소켓에서 비동기 읽기를 수행 한 future에 대해 논의한 적이 있습니다 :

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

이 future는 소켓에서 사용 가능한 데이터를 읽을 수 있으며 사용 가능한 데이터가 없으면 executor에게 제어를 내어주어 소켓이 다시 읽기가 가능해질 때 작업을 깨울 것을 요청합니다. 그러나 이 예제에서 어떻게 Socket type이 구현되었는지 명확하지 않습니다. 특히 set_readable_callback 함수가 어떻게 작동하는지 명확하지 않습니다. 소켓을 읽을 수 있게 되면 wake()를 어떻게 호출 할 수 있습니까? 한 가지 옵션은 Socket이 읽기 가능한지 계속 확인 하는 스레드를 만들어서 적절한 경우 wake()를 호출하도록 하는 것입니다. 그러나 이 것은 매우 비효율적이며 block된 각 IO future에 대한 각각 별도의 스레드 를 필요로 합니다. 이것은 우리의 비동기 코드의 효율성을 크게 줄일 것입니다.

실제로 이 문제는 IO-aware 시스템 blocking 프리미티브 와의 통합을 통해 해결됩니다. Linux의 epoll, FreeBSD의 kqueue와 같은 시스템 blocking 프리미티브, Mac OS, Windows의 IOCP 및 Fuchsia의 포트 (모두 크로스 플랫폼 Rust crate mio를 통해 제공됨 )와 같은 것이 그것 입니다. 이 프리미티브는 모두 여러 비동기 IO 이벤트를 block하는 스레드로, 이벤트가 하나라도 완료돠면 리턴합니다. 실제로 이러한 API는 일반적으로 다음과 같습니다 :

struct IoBlocker {
    /* ... */
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    id: usize,

    // A set of signals to wait for, or which occurred.
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    fn new() -> Self { /* ... */ }

    /// Express an interest in a particular IO event.
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { /* ... */ }

    /// Block until one of the events occurs.
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);

Futures executor는 이러한 프리미티브를 사용하여 비동기 IO 오브젝트를 제공 할 수 있습니다. 이 오브젝트는 소켓과 같이 특정 IO 이벤트가 발생할 때 콜백이 실행되도록 구성 할 수 있습니다. 위의 SocketRead 예제의 경우 Socket::set_readable_callback 함수는 다음의 의사 코드 처럼 보일 수 있습니다.

impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

이제 적절한 Waker에 IO 이벤트를 분배하는 하나의 executor 스레드만 가질 수 있습니다. 이 스레드는 해당 타스크를 깨우고, executor가 리턴하기 전에 더 많은 타스크를 완료 할 수 있도록 해주기 위하여 더 IO 이벤트가 있는지 체크합니다 (그리고 이 사이클은 계속됩니다 ...).