Executor와 시스템 IO
앞의 Future Trait 섹션에서, 우리는 소켓에서 비동기 읽기를 수행 한 future에 대해 논의한 적이 있습니다 :
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data-- read it into a buffer and return it.
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data.
//
// Arrange for `wake` to be called once data is available.
// When data becomes available, `wake` will be called, and the
// user of this `Future` will know to call `poll` again and
// receive data.
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
이 future는 소켓에서 사용 가능한 데이터를 읽을 수 있으며 사용 가능한 데이터가 없으면
executor에게 제어를 내어주어 소켓이 다시 읽기가 가능해질 때 작업을 깨울 것을 요청합니다.
그러나 이 예제에서 어떻게 Socket
type이 구현되었는지 명확하지 않습니다.
특히 set_readable_callback
함수가 어떻게 작동하는지 명확하지 않습니다.
소켓을 읽을 수 있게 되면 wake()
를 어떻게 호출 할 수 있습니까?
한 가지 옵션은
Socket
이 읽기 가능한지 계속 확인 하는 스레드를 만들어서 적절한 경우 wake()
를 호출하도록 하는 것입니다.
그러나 이 것은 매우 비효율적이며
block된 각 IO future에 대한 각각 별도의 스레드 를 필요로 합니다.
이것은 우리의 비동기 코드의 효율성을 크게 줄일 것입니다.
실제로 이 문제는 IO-aware 시스템 blocking 프리미티브 와의 통합을 통해 해결됩니다.
Linux의 epoll
, FreeBSD의 kqueue
와 같은 시스템 blocking 프리미티브,
Mac OS, Windows의 IOCP 및 Fuchsia의 포트 (모두 크로스 플랫폼 Rust crate mio를 통해 제공됨
)와 같은 것이 그것 입니다. 이 프리미티브는 모두
여러 비동기 IO 이벤트를 block하는 스레드로, 이벤트가 하나라도 완료돠면 리턴합니다.
실제로 이러한 API는 일반적으로 다음과 같습니다 :
struct IoBlocker {
/* ... */
}
struct Event {
// An ID uniquely identifying the event that occurred and was listened for.
id: usize,
// A set of signals to wait for, or which occurred.
signals: Signals,
}
impl IoBlocker {
/// Create a new collection of asynchronous IO events to block on.
fn new() -> Self { /* ... */ }
/// Express an interest in a particular IO event.
fn add_io_event_interest(
&self,
/// The object on which the event will occur
io_object: &IoObject,
/// A set of signals that may appear on the `io_object` for
/// which an event should be triggered, paired with
/// an ID to give to events that result from this interest.
event: Event,
) { /* ... */ }
/// Block until one of the events occurs.
fn block(&self) -> Event { /* ... */ }
}
let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
&socket_1,
Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
&socket_2,
Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();
// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);
Futures executor는 이러한 프리미티브를 사용하여 비동기 IO 오브젝트를 제공 할 수 있습니다.
이 오브젝트는 소켓과 같이 특정 IO 이벤트가 발생할 때 콜백이 실행되도록 구성 할 수 있습니다.
위의 SocketRead
예제의 경우
Socket::set_readable_callback
함수는 다음의 의사 코드 처럼 보일 수 있습니다.
impl Socket {
fn set_readable_callback(&self, waker: Waker) {
// `local_executor` is a reference to the local executor.
// this could be provided at creation of the socket, but in practice
// many executor implementations pass it down through thread local
// storage for convenience.
let local_executor = self.local_executor;
// Unique ID for this IO object.
let id = self.id;
// Store the local waker in the executor's map so that it can be called
// once the IO event arrives.
local_executor.event_map.insert(id, waker);
local_executor.add_io_event_interest(
&self.socket_file_descriptor,
Event { id, signals: READABLE },
);
}
}
이제 적절한 Waker
에 IO 이벤트를 분배하는 하나의 executor 스레드만 가질 수 있습니다.
이 스레드는 해당 타스크를 깨우고, executor가 리턴하기 전에
더 많은 타스크를 완료 할 수 있도록 해주기 위하여 더 IO 이벤트가 있는지 체크합니다
(그리고 이 사이클은 계속됩니다 ...).