Skip to content

Commit 90b4a8e

Browse files
authored
Add TryStreamExt::try_buffered (#2245)
1 parent 7667251 commit 90b4a8e

File tree

4 files changed

+172
-3
lines changed

4 files changed

+172
-3
lines changed

futures-util/src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub use self::try_stream::IntoAsyncRead;
5555

5656
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
5757
#[cfg(feature = "alloc")]
58-
pub use self::try_stream::{TryBufferUnordered, TryForEachConcurrent};
58+
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
5959

6060
// Primitive streams
6161

futures-util/src/stream/try_stream/mod.rs

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ cfg_target_has_atomic! {
114114
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
115115
pub use self::try_buffer_unordered::TryBufferUnordered;
116116

117+
#[cfg(feature = "alloc")]
118+
mod try_buffered;
119+
#[cfg(feature = "alloc")]
120+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
121+
pub use self::try_buffered::TryBuffered;
122+
117123
#[cfg(feature = "alloc")]
118124
mod try_for_each_concurrent;
119125
#[cfg(feature = "alloc")]
@@ -773,7 +779,7 @@ pub trait TryStreamExt: TryStream {
773779
assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
774780
}
775781

776-
/// Attempt to execute several futures from a stream concurrently.
782+
/// Attempt to execute several futures from a stream concurrently (unordered).
777783
///
778784
/// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
779785
/// that matches the stream's `Error` type.
@@ -842,6 +848,80 @@ pub trait TryStreamExt: TryStream {
842848
)
843849
}
844850

851+
/// Attempt to execute several futures from a stream concurrently.
852+
///
853+
/// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
854+
/// that matches the stream's `Error` type.
855+
///
856+
/// This adaptor will buffer up to `n` futures and then return their
857+
/// outputs in the order. If the underlying stream returns an error, it will
858+
/// be immediately propagated.
859+
///
860+
/// The returned stream will be a stream of results, each containing either
861+
/// an error or a future's output. An error can be produced either by the
862+
/// underlying stream itself or by one of the futures it yielded.
863+
///
864+
/// This method is only available when the `std` or `alloc` feature of this
865+
/// library is activated, and it is activated by default.
866+
///
867+
/// # Examples
868+
///
869+
/// Results are returned in the order of addition:
870+
/// ```
871+
/// # futures::executor::block_on(async {
872+
/// use futures::channel::oneshot;
873+
/// use futures::future::lazy;
874+
/// use futures::stream::{self, StreamExt, TryStreamExt};
875+
///
876+
/// let (send_one, recv_one) = oneshot::channel();
877+
/// let (send_two, recv_two) = oneshot::channel();
878+
///
879+
/// let mut buffered = lazy(move |cx| {
880+
/// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
881+
///
882+
/// let mut buffered = stream_of_futures.try_buffered(10);
883+
///
884+
/// assert!(buffered.try_poll_next_unpin(cx).is_pending());
885+
///
886+
/// send_two.send(2i32)?;
887+
/// assert!(buffered.try_poll_next_unpin(cx).is_pending());
888+
/// Ok::<_, i32>(buffered)
889+
/// }).await?;
890+
///
891+
/// send_one.send(1i32)?;
892+
/// assert_eq!(buffered.next().await, Some(Ok(1i32)));
893+
/// assert_eq!(buffered.next().await, Some(Ok(2i32)));
894+
///
895+
/// assert_eq!(buffered.next().await, None);
896+
/// # Ok::<(), i32>(()) }).unwrap();
897+
/// ```
898+
///
899+
/// Errors from the underlying stream itself are propagated:
900+
/// ```
901+
/// # futures::executor::block_on(async {
902+
/// use futures::channel::mpsc;
903+
/// use futures::stream::{StreamExt, TryStreamExt};
904+
///
905+
/// let (sink, stream_of_futures) = mpsc::unbounded();
906+
/// let mut buffered = stream_of_futures.try_buffered(10);
907+
///
908+
/// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
909+
/// assert_eq!(buffered.next().await, Some(Ok(7i32)));
910+
///
911+
/// sink.unbounded_send(Err("error in the stream"))?;
912+
/// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
913+
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
914+
/// ```
915+
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
916+
#[cfg(feature = "alloc")]
917+
fn try_buffered(self, n: usize) -> TryBuffered<Self>
918+
where
919+
Self::Ok: TryFuture<Error = Self::Error>,
920+
Self: Sized,
921+
{
922+
TryBuffered::new(self, n)
923+
}
924+
845925
// TODO: false positive warning from rustdoc. Verify once #43466 settles
846926
//
847927
/// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use crate::stream::{Fuse, FuturesOrdered, StreamExt, IntoStream};
2+
use crate::future::{IntoFuture, TryFutureExt};
3+
use futures_core::future::TryFuture;
4+
use futures_core::stream::{Stream, TryStream};
5+
use futures_core::task::{Context, Poll};
6+
#[cfg(feature = "sink")]
7+
use futures_sink::Sink;
8+
use pin_project::pin_project;
9+
use core::pin::Pin;
10+
11+
/// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method.
12+
#[pin_project]
13+
#[derive(Debug)]
14+
#[must_use = "streams do nothing unless polled"]
15+
pub struct TryBuffered<St>
16+
where
17+
St: TryStream,
18+
St::Ok: TryFuture,
19+
{
20+
#[pin]
21+
stream: Fuse<IntoStream<St>>,
22+
in_progress_queue: FuturesOrdered<IntoFuture<St::Ok>>,
23+
max: usize,
24+
}
25+
26+
impl<St> TryBuffered<St>
27+
where
28+
St: TryStream,
29+
St::Ok: TryFuture,
30+
{
31+
pub(super) fn new(stream: St, n: usize) -> TryBuffered<St> {
32+
TryBuffered {
33+
stream: IntoStream::new(stream).fuse(),
34+
in_progress_queue: FuturesOrdered::new(),
35+
max: n,
36+
}
37+
}
38+
39+
delegate_access_inner!(stream, St, (. .));
40+
}
41+
42+
impl<St> Stream for TryBuffered<St>
43+
where
44+
St: TryStream,
45+
St::Ok: TryFuture<Error = St::Error>,
46+
{
47+
type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
48+
49+
fn poll_next(
50+
self: Pin<&mut Self>,
51+
cx: &mut Context<'_>,
52+
) -> Poll<Option<Self::Item>> {
53+
let mut this = self.project();
54+
55+
// First up, try to spawn off as many futures as possible by filling up
56+
// our queue of futures. Propagate errors from the stream immediately.
57+
while this.in_progress_queue.len() < *this.max {
58+
match this.stream.as_mut().poll_next(cx)? {
59+
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
60+
Poll::Ready(None) | Poll::Pending => break,
61+
}
62+
}
63+
64+
// Attempt to pull the next value from the in_progress_queue
65+
match this.in_progress_queue.poll_next_unpin(cx) {
66+
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
67+
Poll::Ready(None) => {}
68+
}
69+
70+
// If more values are still coming from the stream, we're not done yet
71+
if this.stream.is_done() {
72+
Poll::Ready(None)
73+
} else {
74+
Poll::Pending
75+
}
76+
}
77+
}
78+
79+
// Forwarding impl of Sink from the underlying stream
80+
#[cfg(feature = "sink")]
81+
impl<S, Item, E> Sink<Item> for TryBuffered<S>
82+
where
83+
S: TryStream + Sink<Item, Error = E>,
84+
S::Ok: TryFuture<Error = E>,
85+
{
86+
type Error = E;
87+
88+
delegate_sink!(stream, Item);
89+
}

futures/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ pub mod stream {
498498
#[cfg(feature = "alloc")]
499499
pub use futures_util::stream::{
500500
// For TryStreamExt:
501-
TryBufferUnordered, TryForEachConcurrent,
501+
TryBufferUnordered, TryBuffered, TryForEachConcurrent,
502502
};
503503

504504
#[cfg(feature = "std")]

0 commit comments

Comments
 (0)