select!

futures::select 매크로는 어떤 future가 완료하자마자 사용자가 응답하는 것을 허락하는 다양한 future들을 실행합니다.


#![allow(unused)]
fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

위의 함수는 동시에 t1t2를 실행할것입니다.

t1t2가 끝날때, 해당하는 핸들러는 println!을 호출할것입니다.

그리고 함수는 남아있는 작업을 완료하는것 없이 끝낼것입니다.

select의 기본 구문은 당신이 선택하고 싶은만큼의 많은 future들로 반복 되어지는
<pattern> = <expression> => <code>,입니다.

default => ... and complete => ...

select 또한 defalutcomplete분기를 지원합니다.

default분기는 select된 future들이 아무것도 완료되지 않으면 실행할것입니다. default는 준비된 다른 future들이 없다면 실행되므로 default분기와 함께 select은 항상 즉시 반환할것입니다.

complete분기들은 select가 된 모든 future들이 완료되어 더 이상 진행되지 않는 경우에 처리를 하는데 사용될 수 있습니다. 이것은 select!에 대해서 looping 할 때 꽤 편합니다.


#![allow(unused)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
}

UnpinFusedFuture와의 상호작용

위의 첫번째 예제에서 당신이 알렸을지도 모르는 것은 우리가 pin_mut로 future들을 고정할뿐만 아니라 두개의 async fn로부터 반환되는 future들에서 .fuse()를 불렀어야 했던것입니다. 이 두 호출은 select에서 사용되는 future들이 Unpin트레잇 과 FusedFutre 트레잇 둘 다 구현해야 하기때문에 필연적입니다.

Unpinselect로부터 사용되는 future들이 값으로 가지는것이 아니라 mutable reference로 되는 것이기 때문에 필수적입니다. future의 소유권을 가지지 않음으로써, 완료되지 않은 future들은 select를 부른 이 후에 사용 될 수 있습니다.

마찬가지로, select가 완료한 후의 future를 폴링해서는 안되기 때문에 FusedFuture트레잇은 필요합니다. FusedFuture은 완료 여부를 추적하는 future들에 의해 구현됩니다. 이것은 아직 완료되지 않은 future들만 폴링하는 loop에서 select를 사용할 수 있게 합니다. 이것은 loop를 통해서 a_fut 또는 b_fut이 두번째 완료되는 곳을 위의 예제에서 볼 수 있습니다. future::ready로부터 리턴되는 future는 FusedFuture를 구현하기 때문에, 다시 poll하지 않는 select를 말할 수 있습니다.

스트림들은 상응하는 FusedStream을 가지는것을 알아야 합니다. 이 트레잇을 구현하거나 .fuse()를 사용하여 감싸진 스트림들은 스트림들의 .next()/ .try_next() 결합자들로부터 FusedFuture를 yiled할 것입니다.


#![allow(unused)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

Concurrent tasks in a select loop with Fuse and FuturesUnordered

FuseFuturesUnordered 와 함께 select 루프에서 동시발생 작업들

다소 발견하기 어렵지만 편리한 함수는 Fuse::terminated()입니다, 이 함수는 이미 종료된 텅빈 future를 생성하는것을 허락하며, 나중에 실행해야 하는 future로 채워질 수 있습니다.

이것은 select loop 내부에 스스로 만들어지고 select loop 동안 실행되야할 작업이 있을때, 편리할 수 있습니다.

.select_next_some()의 사용을 알아야합니다. 이것은 None들을 무시하는 스트림에서 반환되는 Some(_)값들의 분기만을 실행하기 위해 select와 함께 사용될 수 있습니다.


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

같은 future들의 많은 복사본들은 동시에 실행될때, FutureUnordered 타입을 사용하세요.

아래의 예제는 위에것과 비슷하지만, 새로운 것이 만들어질때 중단하는것보다 `run_on_new_num_fut`의 각 복사본을 완료할것입니다. 또한 `run_on_new_num_fut`으로 리턴된 값을 출력할 것입니다.

#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}