반복자와 동시성

동기 Iterator와 유사하게 Stream에서 값을 처리하고 반복하는 방법에는 여러 가지가 있습니다. 콤비네이터 스타일의 map, filterfold와 early-exit-on-error 류의 try_map, try_filtertry_fold 메소드가 있습니다.

불행히도 for 루프는 Stream과 함께 사용할 수 없지만 imperative-style 코드, whilenext/try_next 함수는 사용될 수 있습니다 :

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

그러나 한 번에 하나의 요소만 처리하면 잠재적으로 동시성에 대한 기회를 버리게 되고 결국 처음으로 돌아가서 우리가 왜 비동기 코드를 작성하는지를 묻게 됩니다. 스트림에서 여러 항목을 동시에 처리하려면 for_each_concurrenttry_for_each_concurrent를 사용하십시오 :

async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}