시작하면서...

Rust의 비동기 프로그래밍에 오신 것을 환영합니다! 비동기 Rust 코드를 작성하기 시작하려는 경우, 당신은 올바른 장소에 왔습니다. 당신이 웹 서버, 데이터베이스 또는 운영 체제 등 무엇을 만들려고 하는 경우 이 책은 Rust의 비동기 프로그래밍 도구를 사용하여 하드웨어를 최대한 활용하는 방법을 보여줄 것입니다.

이 책이 다루는 범위

이 책은 Rust의 비동기 사용에 대한 포괄적인 최신 안내서를 목표로합니다. 언어 기능 및 라이브러리에 대한 설명은 초보자와 고급 경험자 모두에게 적합한 내용이 될 것입니다.

  • 처음 부분에서는 일반적으로 비동기 프로그래밍에 대해 소개합니다. 그리고 Rust의 특별한 특성에 대해 다룹니다.

  • 중간 장에서는 사용할 수있는 주요 유틸리티 및 제어 흐름 도구에 대해 설명합니다. 비동기 코드 작성 및 라이브러리 구조화를 위한 모범 사례를 설명하며 성능 및 재사용성을 극대화하는 응용 프로그램의 작성에 대해 논합니다.

  • 이 책의 마지막 부분은 더 넓은 비동기 생태계를 다루고 있으며 일반적인 작업을 수행하는 방법에 대한 많은 예를 다룹니다.

이제, 비동기 Rust 프로그래밍의 흥미 진진한 세계를 탐험 해 봅시다.

왜 비동기인가?

우리 모두 Rust가 빠르고 안전한 소프트웨어를 작성하는 방법을 좋아합니다. 하지만 비동기 코드를 왜 쓰지?

비동기 코드를 사용하면 동일한 OS 스레드에서 여러 작업을 동시에 실행할 수 있습니다. 일반적인 스레드 응용 프로그램에서 두 개의 웹 페이지를 동시에 다운로드하려는 경우 당신은 작업을 아래의 코드처럼 두 개의 스레드에 분산시켜야 합니다 :

fn get_two_sites() {
    // Spawn two threads to do work.
    let thread_one = thread::spawn(|| download("https://www.foo.com"));
    let thread_two = thread::spawn(|| download("https://www.bar.com"));

    // Wait for both threads to complete.
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}

이것은 많은 응용 프로그램에서 잘 작동합니다. 결국 단지 다음과 같은 목적을 가지고 스레드가 설계되었습니다 : 한 번에 여러 다른 작업을 실행하고 싶다. 그러나 그들은 또한 몇 가지 제한이 있습니다. 서로 다른 스레드간에 전환하고 서로간에 데이터를 공유하는 과정에서 많은 오버 헤드가 발생합니다. 그냥 앉아서 아무것도하지 않는 스레드 조차도 귀중한 시스템 자원을 소모합니다. 이것은 비동기 코드가 설계되는 이유 입니다. Rust의 async/.await표기법을 사용하여 위의 함수를 다시 작성할 수 있습니다. 이렇게 함으로써 여러 스레드를 만들지 않고 한 번에 여러 작업을 실행할 수 있습니다 :

async fn get_two_sites_async() {
    // Create two different "futures" which, when run to completion,
    // will asynchronously download the webpages.
    let future_one = download_async("https://www.foo.com");
    let future_two = download_async("https://www.bar.com");

    // Run both futures to completion at the same time.
    join!(future_one, future_two);
}

전체적으로 비동기 애플리케이션은 훨씬 더 빠르며 동일한 기능의 스레드 구현보다 적은 리소스를 사용합니다. 하지만, 비용이 있습니다. 스레드는 운영 체제에서 기본적으로 지원합니다. 그것들을 사용하기 위해서는 특별한 프로그래밍 모델이 필요하지 않습니다. 스레드를 만들 수 있으며 스레드를 사용하는 함수를 호출하는 것은 일반적으로 일반 함수를 호출하는 것만큼 쉽습니다. 그러나 비동기 함수는 언어 또는 라이브러리의 특별한 지원이 필요합니다. Rust에서 async fnFuture를 반환하는 비동기 함수를 만듭니다. 함수 본문을 실행하려면 반환 된 Future를 실행해서 동작을 완성해야 합니다.

기존의 스레드 응용 프로그램 방식은 상당히 효과적일 수 있음을 기억해야합니다. 그리고 Rust의 작은 메모리 공간과 예측 가능성은 비동기를 사용하지 않고도 멀리 갈 수 있습니다. 비동기 프로그래밍 모델의 복잡성의 증가는 항상 가치가 있는 것은 아니며 더 간단한 스레드 모델 방법으로 응용 프로그램을 더 잘 제공 할 수 있는지 고려하십시오.

비동기 Rust의 상황

비동기식 Rust 생태계는 시간이 지남에 따라 많은 진화를 겪었습니다. 따라서 사용할 도구, 투자 할 라이브러리, 또는 읽을 문서를 알아내기란 어려운 일 입니다. 그러나 Rust 표준 라이브러리 내에서 Future trait 와 async/await 언어 기능이 최근에 안정화 되었습니다. 전체 생태계는 따라서 새롭게 안정화 된 API를 향한 이주의 한가운데에 있습니다.

그러나 현재 생태계는 여전히 급속한 발전을 겪고 있고 비동기식 Rust 경험은 충분히 연마되지 않았습니다. 여전히 대부분의 라이브러리는 Future crate의 0.1 버젼를 사용합니다. 개발자는 종종 버젼 0.3 Future crate의 compat 기능에 도달해야 합니다. async /await 언어 기능은 여전히 ​​새로운 기능입니다. 특성 메소드의 async fn 구문과 같은 중요한 확장은 여전히 구현되지 않았으며 현재의 컴파일러 오류 메시지는 이해하기 어려울 수 있습니다.

러스트는 가장 뛰어난 성능과 비동기 프로그래밍을 위한 인간 친화적 지원을 발휘할 수 있는 길을 가고 있습니다. 삐걱 거리는 소리가 두렵지 않다면, Rust 비동기 프로그래밍 세계로의 다이빙을 즐기십시오!

async/.await 입문

async/.await는 비동기 함수를 동기 코드처럼 보이도록 해주기 위한 Rust의 내장 툴입니다. async는 코드 블록을 Future라는 trait을 구현하는 상태 머신으로 변환해 줍니다. 동기 함수에서 blocking함수의 호출은 전체 스레드를 blocking하는 반면에 blocking된 Future는 스레드의 제어를 내어 놓아 다른 타스크의 실행을 허용합니다. 이는 다른 Future의 실행을 허용합니다.

비동기 함수를 만들려면 async fn 구문을 사용할 수 있습니다 :


#![allow(unused)]
fn main() {
async fn do_something() { /* ... */ }
}

async fn에 의해 반환되는 값은 Future입니다. 무슨 일이 일어나려면 Future는 executor 프로그램에서 실행 되어야 합니다.

// `block_on` blocks the current thread until the provided future has run to
// completion. Other executors provide more complex behavior, like scheduling
// multiple futures onto the same thread.
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // Nothing is printed
    block_on(future); // `future` is run and "hello, world!" is printed
}

async fn 안에서 .await를 사용하여 Future trait을 구현하는 다른 type (예 : 또 다른async fn의 반환 값)의 실행이 완성되기를 기다릴 수 있습니다. block_on과 달리.await는 현재의 스레드를 block하지 않습니다. 대신 Future가 완료 될 때 까지 비동기 적으로 기다립니다. 이는 만약 현재의 future가 진행될 수 없는 경우 다른 타스크가 실행되도록 해줍니다.

예를 들어, 우리가 3개의 async fn: learn_song, sing_song, 그리고 dance 를 가지고 있다고 해봅시다.

async fn learn_song() -> Song { /* ... */ }
async fn sing_song(song: Song) { /* ... */ }
async fn dance() { /* ... */ }

배우고, 노래하고, 춤을 추는 한 가지 방법은 각각을 개별적으로 block하는 것입니다:

fn main() {
    let song = block_on(learn_song());
    block_on(sing_song(song));
    block_on(dance());
}

그러나 우리는 이런 식으로 최고의 성능을 제공하지 않습니다. 한 번에 한 가지 작업만 수행하고 있습니다! 분명히 우리는 노래를 부르기 전에 노래를 배워야 합니다. 하지만 우리는 노래를 배우거나 부르는 동시에 춤을 출 수 있습니다. 이를 위해 동시에 실행할 수 있는 두 개의 async fn을 만들 수 있습니다:

async fn learn_and_sing() {
    // Wait until the song has been learned before singing it.
    // We use `.await` here rather than `block_on` to prevent blocking the
    // thread, which makes it possible to `dance` at the same time.
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // `join!` is like `.await` but can wait for multiple futures concurrently.
    // If we're temporarily blocked in the `learn_and_sing` future, the `dance`
    // future will take over the current thread. If `dance` becomes blocked,
    // `learn_and_sing` can take back over. If both futures are blocked, then
    // `async_main` is blocked and will yield to the executor.
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

이 예에서 노래를 부르기 전에 노래를 배우는 것이 필요하지만 노래 학습과 노래 부르기는 춤과 동시에 일어날 수 있습니다. 우리가 learn_and_singlearn_song().await 대신 block_on(learn_song())을 사용했다면, learn_song이 실행되는 동안 스레드는 다른 작업을 수행 할 수 없습니다. 이렇게 하면 노래와 동시에 춤을 출 수 없게됩니다. learn_song future를 .await~ing 함으로써, 우리는 learn_song이 block 된 경우 다른 작업이 현재 스레드를 인계받을 수 있도록 할 수 있습니다. 이를 통해 여러 타스크를 동일한 스레드에서 동시에 운영 할 수 있습니다. 이제 async/await의 기본 사항을 배웠으므로 예제를 살펴봅시다.

응용: 간단한 HTTP 서버

async/.await 를 사용하여 에코 서버를 만들어 보겠습니다.

본 예제를 시작하려면 rustup update stable을 실행하여 Rust stable 1.39 이상을 유지하십시오. 일단 완료하면 cargo new async-await-echo를 실행하여 새로운 프로젝트를 만들고 여십시오. 실행의 결과는 async-await-echo 폴더에 만들어 집니다.

Cargo.toml 파일에 몇몇 의존성을 추가해 봅시다 :

[dependencies]
# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP
# server and to make HTTP requests.
hyper = "0.13.0"
# To setup some sort of runtime needed by Hyper, we will use the Tokio runtime.
tokio = { version = "0.2", features = ["full"] }

# (only for testing)
failure = "0.1.6"
reqwest = "0.9.24"

이제 의존성이 해결 되었으므로 이제 코딩을 시작하겠습니다. 다음의 imports를 추가합니다 :

use {
    hyper::{
        // Following functions are used by Hyper to handle a `Request`
        // and returning a `Response` in an asynchronous manner by using a Future
        service::{make_service_fn, service_fn},
        // Miscellaneous types from Hyper for working with HTTP.
        Body,
        Client,
        Request,
        Response,
        Server,
        Uri,
    },
    std::net::SocketAddr,
};

이제 의존성이 해결 되었으므로 모든 것을 모아서 리퀘스트를 처리하도록 하는 기본 토대를 만드는 작업을 시작하겠습니다 :

async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    // Always return successfully with a response containing a body with
    // a friendly greeting ;)
    Ok(Response::new(Body::from("hello, world!")))
}

async fn run_server(addr: SocketAddr) {
    println!("Listening on http://{}", addr);

    // Create a server bound on the provided address
    let serve_future = Server::bind(&addr)
        // Serve requests using our `async serve_req` function.
        // `serve` takes a closure which returns a type implementing the
        // `Service` trait. `service_fn` returns a value implementing the
        // `Service` trait, and accepts a closure which goes from request
        // to a future of the response.
        .serve(make_service_fn(|_| {
            async {
                {
                    Ok::<_, hyper::Error>(service_fn(serve_req))
                }
            }
        }));

    // Wait for the server to complete serving or exit with an error.
    // If an error occurred, print it to stderr.
    if let Err(e) = serve_future.await {
        eprintln!("server error: {}", e);
    }
}

#[tokio::main]
async fn main() {
  // Set the address to run our socket on.
  let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

  // Call our `run_server` function, which returns a future.
  // As with every `async fn`, for `run_server` to do anything,
  // the returned future needs to be run using `await`;
  run_server(addr).await;
}

지금 cargo run하면, "Listening on http://127.0.0.1:3000" 이라는 메시지가 터미널에 나타납니다. 여러분이 선택한 브라우저에서 위 주소를 열면 "hello, world!"가 브라우저에 나타납니다. 축하합니다! 방금 Rust에서 첫 번째 비동기 웹 서버를 작성했습니다.

다음과 같은 정보(request URI, HTTP version, headers, and other metadata)가 포함 된 요청을 검사 할 수도 있습니다. 예를 들어, 다음과 같이 요청의 URI를 인쇄 할 수 있습니다.

println!("Got request at {:?}", req.uri());

우리가 요청을 처리 할 때 비동기적인 것을 아무 것도 아직 하고 있지 않은 것을 눈치 채셨나요? -우리는 단지 즉시 응답합니다. 따라서 우리는 async fn이 제공하는 유연성을 활용하지 않습니다. 정적 메시지를 반환하는 대신 Hyper의 HTTP 클라이언트를 사용하여 다른 웹 사이트에 사용자의 요청을 프록시 하도록 시도해 봅시다.

요청하려는 URL을 파싱하여 시작합니다.

        let url_str = "http://www.rust-lang.org/en-US/";
        let url = url_str.parse::<Uri>().expect("failed to parse URL");

그런 다음 새로운 hyper::Client를 작성하고 이를 사용하여 GET 요청을 할 수 있습니다. 이는 응답을 사용자에게 반환합니다 :

        let res = Client::new().get(url).await?;
        // Return the result of the request directly to the user
        println!("request finished-- returning response");
        Ok(res)

Client::getFuture<Output = Result<Response<Body>>>을 구현하는 hyper::client::ResponseFuture를 반환합니다. (또는 futures 0.1 버젼에서 Future<Item = Response<Body>, Error = Error>). 우리가 그 future를 .await 하면 현재 작업 중인 HTTP 요청이 전송됩니다. 현재의 타스크는 일시 중지되고 큐에 들아가서 대기하다가 응답이 오면 계속 실행 됩니다.

이제 cargo run을 하고 브라우저에서 http://127.0.0.1:3000/foo를 연다면, Rust 홈페이지와 다음 문자열이 터미널에 나타납니다.

Listening on http://127.0.0.1:3000
Got request at /foo
making request to http://www.rust-lang.org/en-US/
request finished-- returning response

축하합니닥! 여러분은 HTTP 요청을 프록시 했습니다.

내부의 작동: Future와 Task의 실행

이 섹션에서는 Future와 비동기 작업이 어떻게 스케쥴링 되는지 그 하부 구조에 대해 다룹니다. 기존의 Future type을 사용하는 상위 레벨 코드를 작성하는 방법의 학습에만 관심이 있다면 Future type의 작동 방식에 대한 자세한 내용은 건너 뛰고 async/await 장으로 갈 수 있습니다. 그러나 여기에서 논의 된 몇 가지 주제는 async/await 코드의 작동 방식을 이해하는 데 유용합니다. async/await 코드의 런타임 및 성능 속성 이해와 새로운 비동기 프리미티브 구축에도 또한 유용합니다. 이 섹션을 건너 뛰기로 결정한 경우 이제 나중에 다시 방문하기 위해 북마크를 지정할 수 있습니다.

이제 다른 것 보다 Future trait에 대해 이야기합시다.

Future Trait

Future trait은 Rust의 비동기 프로그래밍의 중심에 있습니다. Future는 값을 생성 할 수있는 비동기 연산입니다. (이 값은 비어있을 수 있습니다. 예 :()). simplified 버전의 future trait은 다음과 같습니다.


#![allow(unused)]
fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

poll 함수를 호출하여 Future을 실행할 수 있습니다. 이러한 호출은 future를 가능한 한 완수를 목표로 합니다. Future가 완료되면 Poll::Ready(result)를 반환합니다. Future가 아직 완료되지 않았으면 Poll::Pending을 반환하고 Future가 더 진행 할 준비가 되었을 때wake()함수가 호출되도록 해 주어야 합니다. wake()가 호출되면 Future를 실행하는 executor는 Future를 다시 호출하여 Future가 중단되었던 지점에서 실행을 계속 하도록 합니다.

wake()가 없으면 executor는 특정 future가 언제 다시 실행 가능한 지를 알 방법이 없으므로, 항상 모든 future를 polling 해야 합니다. wake()를 사용하면, executor는 어떤 future가 실행 가능한지를 polling 해야 하는지 정확히 알 수 있습니다.

예를 들어, 사용 가능한 데이터가 있을 수도, 없을 수도 있는 소켓을 읽는 경우를 고려하십시오. 데이터가 있으면 읽을 수 있고 Poll::Ready(data)를 반환하지만 준비된 데이터가 없으면 future는 block되며 더 이상 진행할 수 없습니다. 사용 가능한 데이터가 없으면 소켓에서 데이터가 준비되면 호출 될 wake를 등록해야 합니다. 그것은 우리의 future가 재실행 할 준비가 되었다고 ezecutor에게 알려줄 것입니다. 간단한 SocketRead future는 다음과 같습니다.

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

Future 모델은 중간 할당이 필요없는 여러 개의 비동기 동작을 하는 여러 future를 구성 할 수 있습니다. 한 번에 여러 개의 futures, 또는 chained futures를 allocation-free state machines을 사용하여 실행되는 코드를 구현할 수 있습니다.

/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed-- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}

이것은 여러 futures를 독립적인 각각의 할당 없이 동시에 실행할 수 있는 방법을 보여 줍니다. 이러한 기능은 보다 효율적인 비동기식 프로그램을 허용합니다. 마찬가지로 다음과 같이 여러 순차 futures을 차례로 실행할 수도 있습니다.

/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future-- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}

이 예는 Future trait을 사용하여 여러 개의 할당된 객체와 깊이 중첩된 콜백 없이 비동기 제어를 표현하는 방법을 보여줍니다. 기본적인 제어 흐름에 대해 설명하였으니. 실제 Future trait과 그것의 차별성을 살펴보겠습니다.

trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

가장 먼저 눈에 띄는 변화는 self type이 더 이상 &mut self가 아니라는 것입니다. 이 것은 Pin<&mut Self>로 변경 되었습니다. 나중에 Pin에 대해 자세히 설명하겠습니다pinning. 지금은 우리가 만드는 future가 재배치 될 수 없다는 것 정도로 이해하고 넘어 갑시다. 재배치 될 수 없는 객체는 필드 사이에 포인터를 저장할 수 있습니다. 예 : struct MyFut { a: i32, ptr_to_a: * const i32 }. Pinning은 async/await를 활성화 하는데 필요합니다.

둘째, wake: fn()&mut Context<'_>로 변경되었습니다. SimpleFuture에서 우리는 함수 포인터 (fn())을 호출하여 future의 executor에게 해당 future는 poll되어야 한다고 알려 주었습니다. 그러나 fn()은 함수 포인터일 뿐이고 이를 사용하면 future에 대한 호출과 관련된 데이터를 저장할 수 없습니다.

실제 시나리오에서, 웹 서버와 같은 복잡한 응용 프로그램은 수천 개의 서로 다른 커넥션과 관련된 웨이크 업이 모두 별도로 관리 되어야 됩니다. Context type은 특정 작업을 깨우는 데 사용할 수있는 Waker type의 값에 접근하도록 해 주어서 이 문제를 해결 합니다.

Waker로 타스크 깨우기

futures가 처음에 poll 되어서 일을 완성 할 수 없는 경우가 일반적 입니다. 이 경우 futures는 더 진전 될 준비가 되면 polling 되도록 해야 합니다. 이것은 Waker type으로 이루어집니다.

futures가 polling 될 때마다 "task"의 일부로 polling됩니다. Task는 executor에게 제출 된 최상위 future 입니다.

Waker는 executor에게 해당 task가 깨어나야 한다고 말해주는 wake()메소드를 제공 합니다. wake()가 호출되면 executor는 Waker와 관련된 작업이 진행될 준비가 되었음을 알고 future는 다시 폴링 됩니다.

Wakerclone()도 구현하여 복사하고 저장할 수 있습니다.

Waker를 사용하여 간단한 타이머 future를 구현해 봅시다.

응용 : 타이머 만들기

예제를 단순회하기 위해 타이머가 시작될 때 새 스레드를 실행 시킵니다. 필요한 시간 동안 sleep 모드로 전환 한 다음, 시간 구간이 경과했을 때 타이머에 신호를 보냅니다.

시작하면서 해야 할 imports는 다음과 같습니다.


#![allow(unused)]
fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
}

future type 자체를 정의하며 시작하겠습니다. 우리의 future는 타이머가 경과하고 future가 완료 되어야 함을 스레드에게 알려주어야 할 방법이 있어야 합니다. 공유 된 Arc<Mutex<.. >> 값을 사용하여 스레드와 future 간의 통신을 하도록 하겠습니다.

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}

이제, 실제로 Future를 구현하도록 하겠습니다!

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

아주 간단 하죠? 스레드가 shared_state.completed = true를 설정 한 경우 우리는 다한 것 입니다! 그렇지 않으면 현재 작업에 대한 Waker를 복제하여 스레드가 작업을 다시 시작할 수 있도록 shared_state.waker에게 넘겨주어야 합니다.

중요한 것은 future가 poll 될 때마다 Waker를 업데이트 해야 한다는 것입니다. future는 다른 Waker를 가지고 다른 작업으로 이동 했을 수 있기 때문 입니다. 이것은 poll된 이후의 작업들 사이에 futures가 전달 될 때 발생합니다.

마지막으로 실제로 타이머를 구성하고 스레드를 시작하려면 관련 API가 필요합니다.

impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

우와! 이것이 간단한 타이머 future를 만드는 데 필요한 전부입니다. 만약 우리가 future를 실행할 executor를 가지고 있다면 말이죠...

응용: Executor 만들기

Rust의 Future는 게으르다 : 그들은 적극적으로 추진하지 않으면 아무 것도 하지 않을 것이다. future를 완료시키는 한 가지 방법은 async 함수 안에서 .await를 사용 하는 것 입니다. 이지만 문제를 한 단계 위로 올릴 뿐입니다. 최상위 async 함수에서 반환 된 future을 누가 실행 합니까? 정답은 우리는Future executor가 필요합니다.

Future executor 들은 최상위 Future 세트를 가져 와서 완성까지 Future가 진행될 수 있을 때 마다 poll을 호출하여 타스크를 진행 시킵니다. 일반적으로 executor는 future에 먼저 한 번 polling을 시작합니다. Futurewake()를 호출하여 진행할 준비가 되었음을 표시하면, 그들은 큐에 다시 배치되고 poll이 다시 호출되어 Future가 완료될 때까지 이 과정이 반복됩니다.

이 섹션에서는 대규모로 future를 실행할 수 있는 간단한 executor 프로그램을 작성합니다.

이 예에서는 ArcWake trait을 구현한 futures crate에 의존합니다. 이 것은 Waker를 구성하는 쉬운 방법을 제공합니다.

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures = "0.3"

다음으로, 우리는 다음의 의존성을 src/main.rs의 맨 위에 추가합니다 :

use {
    futures::{
        future::{FutureExt, BoxFuture},
        task::{ArcWake, waker_ref},
    },
    std::{
        future::Future,
        sync::{Arc, Mutex},
        sync::mpsc::{sync_channel, SyncSender, Receiver},
        task::{Context, Poll},
        time::Duration,
    },
    // The timer we wrote in the previous section:
    timer_future::TimerFuture,
};

우리의 executor는 채널을 통해 실행할 작업을 보내서 작동합니다. Executor는 채널에서 이벤트를 가져 와서 실행합니다. 작업이 재 실행 될 준비가 되면(awoken) 그것은 자신을 재 스케즆링 하여 다시 폴링되도록 예약 할 수 있습니다. 재 스케쥴링은 자신을 채널로 다시 넘겨 줌으로써 이루어 집니다.

이 디자인에서 executor 프로그램 자체는 타스크 채널의 수신 end-point만 필요합니다. 사용자는 새로운 futures을 spawn할 수 있도록 송신 end-point을 얻습니다. 타스크 자체는 자신을 재 스케쥴링이 가능한 future 일 뿐이므로 작업을 다시 큐에 보내는 데 사용할 수 있는 sender와 pairing 된 future로 저장 됩니다.

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

새로운 futures을 쉽게 spawn 할 수 있는 method을 spawner에 추가합시다. 이 방법은 future의 type을 취하여 box에 넣고, 다음과 같이 새로운 Arc<Task>를 만듭니다. 이것은 executor의 큐에 넣을 수 있는 객체가 됩니다.

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

future을 폴링하려면 Waker를 만들어야 합니다. Task 깨우기 섹션에서 논의한 바와 같이, Wakerwake가 호출되면 작업이 다시 폴링되도록 예약하는 책임이 있습니다. Waker는 executor에게 어떤 작업이 준비 되었는지 정확하게 알려 준다는 점을 기억하세요. 그들은 전진 할 준비가 되어 있는 future에 대해서만 polling을 합니다. 새로운 Waker를 만들려면 가장 쉬운 방법이 ArcWake trait을 구현 한 다음 waker_ref 또는 .into_waker() 함수를 사용하여 Arc<impl ArcWake>Waker로 만드는 것입니다. 우리의 타스크를 위해 ArcWake를 구현해 봅시다.

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("too many tasks queued");
    }
}

Arc<Task>에서 Waker가 생성되면 wake()를 호출하는 행위는 Arc의 복사본을 만들게 되고, 이 복사본은 타스크 채널로 전송 되도록 합니다. 우리의 executor는 그 다음 타스크를 선택하고 폴링 해야 합니다. 그것을 구현해 봅시다 :

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

축하합니다! 우리는 이제 futures executor를 갖고 있습니다. 우리는 그것을 Async/.await코드와 우리가 이미 작성 해본 TimerFuture와 같은 커스텀 future을 실행하기 위해 사용할 수 있습니다 :

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}

Executor와 시스템 IO

앞의 Future Trait 섹션에서, 우리는 소켓에서 비동기 읽기를 수행 한 future에 대해 논의한 적이 있습니다 :

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

이 future는 소켓에서 사용 가능한 데이터를 읽을 수 있으며 사용 가능한 데이터가 없으면 executor에게 제어를 내어주어 소켓이 다시 읽기가 가능해질 때 작업을 깨울 것을 요청합니다. 그러나 이 예제에서 어떻게 Socket type이 구현되었는지 명확하지 않습니다. 특히 set_readable_callback 함수가 어떻게 작동하는지 명확하지 않습니다. 소켓을 읽을 수 있게 되면 wake()를 어떻게 호출 할 수 있습니까? 한 가지 옵션은 Socket이 읽기 가능한지 계속 확인 하는 스레드를 만들어서 적절한 경우 wake()를 호출하도록 하는 것입니다. 그러나 이 것은 매우 비효율적이며 block된 각 IO future에 대한 각각 별도의 스레드 를 필요로 합니다. 이것은 우리의 비동기 코드의 효율성을 크게 줄일 것입니다.

실제로 이 문제는 IO-aware 시스템 blocking 프리미티브 와의 통합을 통해 해결됩니다. Linux의 epoll, FreeBSD의 kqueue와 같은 시스템 blocking 프리미티브, Mac OS, Windows의 IOCP 및 Fuchsia의 포트 (모두 크로스 플랫폼 Rust crate mio를 통해 제공됨 )와 같은 것이 그것 입니다. 이 프리미티브는 모두 여러 비동기 IO 이벤트를 block하는 스레드로, 이벤트가 하나라도 완료돠면 리턴합니다. 실제로 이러한 API는 일반적으로 다음과 같습니다 :

struct IoBlocker {
    /* ... */
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    id: usize,

    // A set of signals to wait for, or which occurred.
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    fn new() -> Self { /* ... */ }

    /// Express an interest in a particular IO event.
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { /* ... */ }

    /// Block until one of the events occurs.
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);

Futures executor는 이러한 프리미티브를 사용하여 비동기 IO 오브젝트를 제공 할 수 있습니다. 이 오브젝트는 소켓과 같이 특정 IO 이벤트가 발생할 때 콜백이 실행되도록 구성 할 수 있습니다. 위의 SocketRead 예제의 경우 Socket::set_readable_callback 함수는 다음의 의사 코드 처럼 보일 수 있습니다.

impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

이제 적절한 Waker에 IO 이벤트를 분배하는 하나의 executor 스레드만 가질 수 있습니다. 이 스레드는 해당 타스크를 깨우고, executor가 리턴하기 전에 더 많은 타스크를 완료 할 수 있도록 해주기 위하여 더 IO 이벤트가 있는지 체크합니다 (그리고 이 사이클은 계속됩니다 ...).

async/.await

1 장에서 우리는 async/.await에 대해 간단히 살펴 보았고 간단한 서버를 구축하는데 사용했습니다. 이 장에서는 async/.await에 대해 일반론 적인 설명과 작동 방식 및 비동기 코드와 전통적인 Rust 프로그램 과의 차이점에 대해 자세히 설명합니다.

async/.await는 Rust 구문의 특별한 부분으로 block하지 않고 현재 스레드의 제어를 내어 놓아서 작업이 완료되기를 기다리는 동안 다른 코드가 진행될 수 있게 해줍니다.

async를 사용하는 두 가지 주요 방법이 있습니다 : async fnasync 블록. 각각은 Future trait을 구현하는 값을 반환합니다 :


// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    async {
        let x: u8 = foo().await;
        x + 5
    }
}

첫 장에서 보았듯이 async 본문과 다른 futures는 게으릅니다. 그들은 execute 될 때까지 아무 것도 하지 않습니다. Future를 실행하는 가장 일반적인 방법은 .await 입니다. .awaitFuture 에서 호출되면 완료까지 실행을 시도합니다. Future가 block되면 제어권을 내어 놓아 다른 future가 제어를 얻을 수 있습니다. 더 많은 진전이 가능할 때 해당 Future가 executor에 의해 선택됩니다. Executor에 의해 실행을 재개하여 .await가 완결 되도록 합니다.

async 생명주기

전통적인 함수와 달리, reference 또는 non-static인 인수를 취하는 async fn는 생명주기가 인수의 생명주기에 묶여 있는 Future를 반환합니다 :

// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

이는 async fn에서 반환 된 future가 정적이 아닌 인수가 여전히 유효한 동안에 .await 되여야 함을 의미합니다. 함수를 호출 한 직후에 future를 기다리는 .await의 경우 (foo(&x).await 와 같이) 이것은 문제가 되지 않습니다. 그러나 future를 저장하거나 다른 작업이나 스레드로 전송하면 문제가 될 수 있습니다.

reference를 인수로 갖는 async fnstatic future로 설정하는 일반적인 해결 방법은 async 블록 안에서 async fn 을 호출하면서 인수를 함께 엮어 주는 것입니다 :

fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

인수를 async 블록으로 옮기면 수명이 연장되어 good에 대한 호출에서 리턴된 Future의 수명과 일치 되었습니다.

async move

async 블록과 클로저는 보통의 클로져와 마찬가지로 move 키워드를 허용합니다. async move 블록은 변수가 참조하는 데이터의 소유권을 갖고 , 현재 scope보다 오래 생염이 유지되도록 허용해 주지만 해당 변수를 다른 코드와 공유하는 기능을 포기합니다.

/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{}", my_string);
    };

    let future_two = async {
        // ...
        println!("{}", my_string);
    };

    // Run both futures to completion, printing "foo" twice:
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{}", my_string);
    }
}

멀티 스레드 Executor 에서의 .await

멀티 스레드 Future executor를 사용할 때 Future가 스레드 간에 이동 할 수 있습니다. 따라서 비동기 바디에 사용 된 모든 변수는 스레드 사이에 이동 가능 해야 합니다. .await는 잠재적으로 새로운 스레드로 전환 되는 결과를 가질 수 있습니다.

이것은 Rc, &RefCell 또는 다른 Send trait을 구현하지 않는 type을 사용하는 것이 안전하지 않다는 것을 의미합니다. 이 것은 Sync를 구현하지 않은 type을 사용하는 것에도 해당됩니다.

(주의: 이 type이 .await를 호출하는 동안의 scope 내에 있지 않는한 이 type 들을 사용할 수 있습니다.)

마찬가지로, future를 인식하지 못하는 전통적인 lock을 .await를 가로 질러 유지하는 것은 좋지 않습니다. 왜냐하면 스레드 풀이 잠길 수 있기 때문입니다 : 하나의 작업이 .await lock을 갖고 있다가 executor에 제어를 양보하여 다른 작업을 수행 할 수 있는데 그 다른 작업이 lock하려고 하면 교착 상태를 유발합니다. 이것을 피하려면 std::sync에 있는 것이 아니라 futures::lock에 있는 Mutex를 사용하십시오.

Pinning

futures을 폴링하려면 Pin<T> 라는 특수 type을 사용하여 고정 해야 합니다. 이전 섹션 Future와 타스크의 실행에서 Future trait에 대한 설명을 읽으면 Future::poll 메소드 정의의 self: Pin<&mut Self>에서 Pin을 볼 수 있습니다. 그러나 그것은 무엇을 의미하며, 왜 우리는 그것을 필요로 합니까?

Pinning하는 이유

피닝을 사용하면 객체가 절대 움직이지 않을 수 있습니다. 이것이 왜 필요한지 이해하려면 async/.await 작동하는 방법을 기억해야합니다. 다음 코드를 고려하십시오.

let fut_one = /* ... */;
let fut_two = /* ... */;
async move {
    fut_one.await;
    fut_two.await;
}

내부적으로 그것은 Future를 구현하는 익명 type을 만들며 다음과 같은 poll 메소드를 제공합니다.

// The `Future` type generated by our `async { ... }` block
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}

// List of states our `async` block can be in
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}

impl Future for AsyncFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}

poll이 처음 호출되면 fut_one을 폴링합니다. fut_one이 완료할 수 없다면 AsyncFuture::poll이 반환됩니다. poll에 대한 Future의 호출은 이전 항목이 중단 된 위치로 선택됩니다. 이 과정은 future가 성공적으로 완료 할 때까지 계속됩니다.

그러나 reference를 사용하는 async 블록이 있으면 어떻게 됩니까? 예를 들면 다음과 같습니다.

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}

이것은 어떤 구조체로 컴파일 됩니까?

struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // points to `x` below
}

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}

여기서 ReadIntoBuf future는 우리 구조체의 다른 분야, x에 대한 참조를 보유 합니다. 그러나 AsyncFuture가 이동하면 x의 위치 또한 이동하여 read_into_buf_fut.buf에 저장된 포인터를 무효화 합니다.

메모리의 특정 지점에 future을 고정하면 이 문제를 방지 할 수 있습니다. 위치를 고정하는 방식으로 async 블록 내부의 값에 대한 참조를 생성하는 것이 안전합니다.

Pinning 사용법

Pin type은 포인터 유형을 감싸서 내부에 있는 값을 이동하지 않는다고 보장합니다. 예를 들어 Pin<&mut T>, Pin<&T>, Pin<Box<T>>T가 움직이지 않을 것을 보장합니다.

대부분의 type은 이동하는 데 문제가 없습니다. 이러한 type은 Unpin이라고 하는 trait을 자동으로 구현합니다. Unpin 타입에 대한 포인터는 자유롭게 삽입하거나 Pin으로 부터 가져갈 수 있습니다. 예를 들어, u8Unpin이므로 Pin<&mut u8>은 다음과 같이 정상적인 &mut u8처럼 동작합니다.

어떤 함수는 futures를 Unpin해야 합니다. Unpin이 아닌 Future 또는 StreamUnpin type을 필요로 하는 함수와 함께 사용하려면 Box::pin(Pin<Box<T>>생성) 또는 pin_utils::pin_mut! 매크로 (Pin<&mutT>를 만들기 위해)를 사용하여 값을 고정해야 합니다. Pin<Box<Fut>>Pin<&mut Fut>은 모두 futures로 사용되며 둘 다 Unpin을 구현합니다.

예를 들면 다음과 같습니다.

use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io

// A function which takes a `Future` that implements `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { /* ... */ }

let fut = async { /* ... */ };
execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait

// Pinning with `Box`:
let fut = async { /* ... */ };
let fut = Box::pin(fut);
execute_unpin_future(fut); // OK

// Pinning with `pin_mut!`:
let fut = async { /* ... */ };
pin_mut!(fut);
execute_unpin_future(fut); // OK

Stream Trait

Stream trait은 Future와 유사하지만 종료하기 이전에 여러 개의 값을 생성 할 수 있습니다. 이느 표준 라이브러리의 Iterator trait과 유사합니다 :

trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

Stream의 일반적인 예로는 futures crate의 channel type의 Receiver가 있습니다. Sender-end 에서 값이 전송 될 때마다 receiver에서는 Some(val)이 나옵니다. 그리고 Sender가 삭제되고 보류 중인 모든 메시지가 수신되면 None을 산출 할 것입니다.

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

반복자와 동시성

동기 Iterator와 유사하게 Stream에서 값을 처리하고 반복하는 방법에는 여러 가지가 있습니다. 콤비네이터 스타일의 map, filterfold와 early-exit-on-error 류의 try_map, try_filtertry_fold 메소드가 있습니다.

불행히도 for 루프는 Stream과 함께 사용할 수 없지만 imperative-style 코드, whilenext/try_next 함수는 사용될 수 있습니다 :

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

그러나 한 번에 하나의 요소만 처리하면 잠재적으로 동시성에 대한 기회를 버리게 되고 결국 처음으로 돌아가서 우리가 왜 비동기 코드를 작성하는지를 묻게 됩니다. 스트림에서 여러 항목을 동시에 처리하려면 for_each_concurrenttry_for_each_concurrent를 사용하십시오 :

async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

동시에 다양한 Future들을 실행하는 것

지금까지, 우리는 .await을 사용해서 하나의 특정 Future가 완료할때까지 현재의 Task를 막는 future들을 보통 실행해왔습니다. 그러나, 실제 비동기적인 어플리케이션들은 동시에 다른 작업들을 자주 실행해야합니다.

이번 챕터에서, 우리는 같은 시간에 여러 비동기 작업들을 실행하는 방법을 다룰것입니다.

  • join!: futures이 모두 완료될때까지 기다립니다.
  • select!: future들 중 하나가 완료될때까지 기다립니다.
  • Spawning: 하나의 future가 완료될때까지 주변에서 실행되는 top-level 작업을 만듭니다.
  • FuturesUnordered: 각 subfuture의 결과를 yield하는 future들의 그룹.

join!

futures::join매크로는 동시에 future 모두가 실행되는 동안, 다른 여러 future들이 완료 될 때까지 기다릴수 있게 합니다.

join!

여러 비동기 작업들이 수행되어지고 있을때, 단순히 일련의 .await 작업들을 하려는 경향이 있습니다:

async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}

그렇지만, get_book이 완료된 이후까지도 get_music이 시작하지 않을 것이기 때문에 이것은 필요 이상으로 느립니다. 몇몇 다른 언어들에서, future들은 주변에서 완료됩니다. 그래서 두 작업들은 future들을 시작하기 위해 각 async fn 우선 부름으로써 동시에 실행될 수 있습니다, 그리고 둘다 작업을 기다립니다:

// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}

그렇지만, 러스트 future들은 적극적으로 .await되어질때까지 작동하지 않을것입니다. 이것은 위의 두 코드 snippet들이 동시에 실행 되는것보다 차례대로 book_futuremusic_future이 둘 다 실행될것입니다. 두 future들을 동시에 정확하게 실행하기 위해서, futures::join!을 사용해보세요:

use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}

join!으로부터 리턴되는 값은 통과된 각 Future의 출력을 포함하는 하나의 튜플입니다.

try_join!

Result를 반환하는 future들은, join!보다 try_join!을 사용하는 것을 생각해봅시다.

join!은 모든 subfuture들이 완료하면 완료만 하기 때문에, subfuture들 중 하나가 Err를 리턴한 후에도 다른 future들을 처리하는것을 계속할것입니다.

join!과 달리, try_join!은 subfuture들 중 하나가 에러를 리턴하면 즉시 완료할 것입니다.

use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}

try_join!을 통과한 future들은 모두 같은 에러 타입을 가져야 된다는것을 알아야합니다. futures::future::TryFutureExt에서 에러 타입들을 합치기 위해서 .map_err(|e| ...).err_into()함수들을 사용하는 것을 생각해보세요.

use futures::{
    future::TryFutureExt,
    try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}

select!

futures::select 매크로는 어떤 future가 완료하자마자 사용자가 응답하는 것을 허락하는 다양한 future들을 실행합니다.


#![allow(unused)]
fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

위의 함수는 동시에 t1t2를 실행할것입니다.

t1t2가 끝날때, 해당하는 핸들러는 println!을 호출할것입니다.

그리고 함수는 남아있는 작업을 완료하는것 없이 끝낼것입니다.

select의 기본 구문은 당신이 선택하고 싶은만큼의 많은 future들로 반복 되어지는
<pattern> = <expression> => <code>,입니다.

default => ... and complete => ...

select 또한 defalutcomplete분기를 지원합니다.

default분기는 select된 future들이 아무것도 완료되지 않으면 실행할것입니다. default는 준비된 다른 future들이 없다면 실행되므로 default분기와 함께 select은 항상 즉시 반환할것입니다.

complete분기들은 select가 된 모든 future들이 완료되어 더 이상 진행되지 않는 경우에 처리를 하는데 사용될 수 있습니다. 이것은 select!에 대해서 looping 할 때 꽤 편합니다.


#![allow(unused)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
}

UnpinFusedFuture와의 상호작용

위의 첫번째 예제에서 당신이 알렸을지도 모르는 것은 우리가 pin_mut로 future들을 고정할뿐만 아니라 두개의 async fn로부터 반환되는 future들에서 .fuse()를 불렀어야 했던것입니다. 이 두 호출은 select에서 사용되는 future들이 Unpin트레잇 과 FusedFutre 트레잇 둘 다 구현해야 하기때문에 필연적입니다.

Unpinselect로부터 사용되는 future들이 값으로 가지는것이 아니라 mutable reference로 되는 것이기 때문에 필수적입니다. future의 소유권을 가지지 않음으로써, 완료되지 않은 future들은 select를 부른 이 후에 사용 될 수 있습니다.

마찬가지로, select가 완료한 후의 future를 폴링해서는 안되기 때문에 FusedFuture트레잇은 필요합니다. FusedFuture은 완료 여부를 추적하는 future들에 의해 구현됩니다. 이것은 아직 완료되지 않은 future들만 폴링하는 loop에서 select를 사용할 수 있게 합니다. 이것은 loop를 통해서 a_fut 또는 b_fut이 두번째 완료되는 곳을 위의 예제에서 볼 수 있습니다. future::ready로부터 리턴되는 future는 FusedFuture를 구현하기 때문에, 다시 poll하지 않는 select를 말할 수 있습니다.

스트림들은 상응하는 FusedStream을 가지는것을 알아야 합니다. 이 트레잇을 구현하거나 .fuse()를 사용하여 감싸진 스트림들은 스트림들의 .next()/ .try_next() 결합자들로부터 FusedFuture를 yiled할 것입니다.


#![allow(unused)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

Concurrent tasks in a select loop with Fuse and FuturesUnordered

FuseFuturesUnordered 와 함께 select 루프에서 동시발생 작업들

다소 발견하기 어렵지만 편리한 함수는 Fuse::terminated()입니다, 이 함수는 이미 종료된 텅빈 future를 생성하는것을 허락하며, 나중에 실행해야 하는 future로 채워질 수 있습니다.

이것은 select loop 내부에 스스로 만들어지고 select loop 동안 실행되야할 작업이 있을때, 편리할 수 있습니다.

.select_next_some()의 사용을 알아야합니다. 이것은 None들을 무시하는 스트림에서 반환되는 Some(_)값들의 분기만을 실행하기 위해 select와 함께 사용될 수 있습니다.


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

같은 future들의 많은 복사본들은 동시에 실행될때, FutureUnordered 타입을 사용하세요.

아래의 예제는 위에것과 비슷하지만, 새로운 것이 만들어질때 중단하는것보다 `run_on_new_num_fut`의 각 복사본을 완료할것입니다. 또한 `run_on_new_num_fut`으로 리턴된 값을 출력할 것입니다.

#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}

알고 사랑해야 할 방법들

Rust의 비동기 지원은 여전히 새로운 기술이며, 일부 하위 진단 기능과 함께 여전히 활발히 개발중인 많은 기능이 요청되고 있스빈다. 이 장에서는 몇 가지 일반적인 고통을 주는 포인트에 대해 논의하며 그 문제를 해결하는 방법을 설명할 것입니다.

타입 에러의 반환

일반적인 Rust 함수에서 잘못된 타입의 값을 반환하면 다음과 같은 오류가 발생했습니다.


#![allow(unused)]
fn main() {
error[E0308]: mismatched types
 --> src/main.rs:2:12
  |
1 | fn foo() {
  |           - expected `()` because of default return type
2 |     return "foo"
  |            ^^^^^ expected (), found reference
  |
  = note: expected type `()`
             found type `&'static str`
}

그러나 현재 async fn지원은 리턴 타입을 "신뢰" 할 수 없습니다. 함수 시그니쳐와 타입이 일치하지 않거나, 심지어 리턴 값으로부터 추정되는 함수 시그니쳐 오류도 이러한 문제에 포함됩니다. 예를 들어, 함수 async fn foo () { "foo" }는 다음의 오류를 발생시킵니다 :


#![allow(unused)]
fn main() {
error[E0271]: type mismatch resolving `<impl std::future::Future as std::future::Future>::Output == ()`
 --> src/lib.rs:1:16
  |
1 | async fn foo() {
  |                ^ expected &str, found ()
  |
  = note: expected type `&str`
             found type `()`
  = note: the return type of a function must have a statically known size
}

에러 메시지는 &str을 예상하고 ()를 찾았다고 말합니다. 실제로 원하는 것과 정반대 입니다. 그 이유는 컴파일러가 함수 본문을 잘못 신뢰하여 본분이 올바른 타입을 리턴 한다고 생각하기 때문입니다.

이 문제의 해결 방법은 "SomeType을 예상하고OtherType을 찾았습니다"라는 메시지가있는 함수 시그니쳐를 가리키는 오류는 일반적으로 하나 이상의 리턴 포인트가 잘못 되었음을 나타낸다고 알아차리면 됩니다.

이 이슈는 현재 이 트랙에서 논의되고 있습니다. this bug.

? in async 블록

async fn 에서와 마찬가지로 async 블록 안에서 ? 를 사용하는 것이 일반적입니다. 그러나 async 블록의 반환 타입은 명시 적으로 언급되지 않았습니다. 이로 인해 컴파일러가 async 블록의 오류 타입을 유추하지 못할 수 있습니다.

예를 들자면 다음의 코드 입니다 :


#![allow(unused)]
fn main() {
struct MyError;
async fn foo() -> Result<(), MyError> { Ok(()) }
async fn bar() -> Result<(), MyError> { Ok(()) }
let fut = async {
    foo().await?;
    bar().await?;
    Ok(())
};
}

위 코드는 다음의 오류를 만들어 냅니다 :


#![allow(unused)]
fn main() {
error[E0282]: type annotations needed
 --> src/main.rs:5:9
  |
4 |     let fut = async {
  |         --- consider giving `fut` a type
5 |         foo().await?;
  |         ^^^^^^^^^^^^ cannot infer type
}

불행히도, 현재는 "타입을 fut에 부여" 하는 방법이 없습니다. 또한 async 블록의 반환 타입을 명시적으로 지정할 방법도 없습니다. 이 문제를 해결 하려면 "turbofish" 연산자를 사용하여 async 블록의 성공 및 오류 타입을 제공하여야 합니다 :


#![allow(unused)]
fn main() {
struct MyError;
async fn foo() -> Result<(), MyError> { Ok(()) }
async fn bar() -> Result<(), MyError> { Ok(()) }
let fut = async {
    foo().await?;
    bar().await?;
    Ok::<(), MyError>(()) // <- 여기에 명시적인 타입을 준 것을 눈여겨 보세요.
};
}

Send 추정

일부 async fn 상태 시스템은 스레드를 통해 전송되는 것이 안전하지만 다른 것들은 그렇지 않습니다. async fn FutureSend 인지 여부가 결정되는 것은 non-Send 타입이 .await 지점에서 유지되는지 여부에 의합니다. 컴파일러는 객체가 .await에 걸쳐있을 때 Send에 근사하기 위해 최선을 다합니다. 그러나 이 분석은 오늘날 여러 곳에서 너무 보수적입니다.

예를 들어, 단순한 non-Send타입, 아마도 Rc가 들어 있는 타입을 생각해 보세요 :


#![allow(unused)]
fn main() {
use std::rc::Rc;

#[derive(Default)]
struct NotSend(Rc<()>);
}

NotSend 타입의 변수는 async fn 들에서 async fn에 의해 리턴 된 결과 Future타입 값이 Send 여야 하는 경우에도 일시적으로 나타날 수 있습니다 :

use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    NotSend::default();
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}

하지만 만약 우리가 fooNotSend를 변수에 담도록 수정하면 이 예제는 더 이상 컴파일 되지 않습니다 :

use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    let x = NotSend::default();
    bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
   require_send(foo());
}

#![allow(unused)]
fn main() {
error[E0277]: `std::rc::Rc<()>` cannot be sent between threads safely
  --> src/main.rs:15:5
   |
15 |     require_send(foo());
   |     ^^^^^^^^^^^^ `std::rc::Rc<()>` cannot be sent between threads safely
   |
   = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<()>`
   = note: required because it appears within the type `NotSend`
   = note: required because it appears within the type `{NotSend, impl std::future::Future, ()}`
   = note: required because it appears within the type `[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]`
   = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]>`
   = note: required because it appears within the type `impl std::future::Future`
   = note: required because it appears within the type `impl std::future::Future`
note: required by `require_send`
  --> src/main.rs:12:1
   |
12 | fn require_send(_: impl Send) {}
   | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.
}

이 오류는 정확 합니다. x를 변수에 저장하면 .await 이후까지 삭제되지 않습니다. 그 시점에서 async fn은 다른 스레드에서 실행될 수 있습니다. RcSend가 아니기 때문에 스레드를 가로 질러 이동하면 매우 안 좋은 것이죠. 이것에 대한 간단한 해결책은 .await 이전에 Rcdrop 하는 것입니다. 하지만 불행히도 이 방법은 오늘날에는 작동하지 않습니다.

이 문제를 성공적으로 해결하려면 non-Send의 변수를 캡슐화하는 블록 스코프를 도입해야 할 수도 있습니다. 이것은 컴파일러에게는 이러한 변수가 .await 지점을 가로질러 살아 있지 않다고 말해주므로 더 쉬워집니다 :

use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    {
        let x = NotSend::default();
    }
    bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
   require_send(foo());
}

재귀

내부적으로 async fn.await하고 있는 sub-Future들 각각을 포함하는 상태 머신 타입을 만듭니다. 이것은 재귀적인 async fn을 조금 tricky하게 만드는데, 그 이유는 결과 상태 머신 타입은 자신를 포함해야하기 때문입니다.


#![allow(unused)]
fn main() {
async fn step_one() { /* ... */ }
async fn step_two() { /* ... */ }
struct StepOne;
struct StepTwo;
// This function:
async fn foo() {
    step_one().await;
    step_two().await;
}
// generates a type like this:
enum Foo {
    First(StepOne),
    Second(StepTwo),
}

// So this function:
async fn recursive() {
    recursive().await;
    recursive().await;
}

// generates a type like this:
enum Recursive {
    First(Recursive),
    Second(Recursive),
}
}

이것은 정상적으로 동작하지 않을 것입니다 - 우리는 무한한 크기의 타입을 만들었습니다! 컴파일러는 다음의 불평을 할 것입니다.

error[E0733]: recursion in an `async fn` requires boxing
 --> src/lib.rs:1:22
  |
1 | async fn recursive() {
  |                      ^ an `async fn` cannot invoke itself directly
  |
  = note: a recursive `async fn` must be rewritten to return a boxed future.

이것을 허용하기 위해서는, Box를 사용하여 우회해야 합니다. 불행히도, 컴파일러의 제한은 단순히 recursive() 에 대한 호출을 Box::pin으로 둘러 싼다고 해결되지 않을 것이라는 의미입니다. 이것을 해결하려면 우리는 .boxed() async 블록을 반환하는 non-async 함수에 재귀를 하도록 해야 합니다 :


#![allow(unused)]
fn main() {
use futures::future::{BoxFuture, FutureExt};

fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}
}

async in Traits

현재는, async fn은 트레잇에서 사용될 수 없습니다. 그 이유는 상당히 복잡한데, 앞으로 이러한 제약을 제거할 계획이 있습니다.

그 동안은, 그럼에도 불구하고, 이 크레이트는 잘 동작하고 이 문제를 해결할 것입니다. async_trait crate from crates.io.

이러한 trait 메소드를 사용하면 함수 호출 당 하나씩 힙 할당이 발생합니다. 이것은 대다수의 응용프로그램에게 큰 비용이 아닙니다. 하지만 초당 수백만 번 호출되는 저수준 함수의 공개 API에서 이 기능 사용 여부를 결정할 때 신중히 이 사항을 고려해야합니다.