diff --git a/futures-util/src/stream/stream/buffer_unordered.rs b/futures-util/src/stream/stream/buffer_unordered.rs index 22a525cde..8a7056321 100644 --- a/futures-util/src/stream/stream/buffer_unordered.rs +++ b/futures-util/src/stream/stream/buffer_unordered.rs @@ -1,4 +1,5 @@ use crate::stream::{Fuse, FuturesUnordered, StreamExt}; +use alloc::vec::Vec; use core::fmt; use core::num::NonZeroUsize; use core::pin::Pin; @@ -13,20 +14,23 @@ pin_project! { /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) /// method. #[must_use = "streams do nothing unless polled"] - pub struct BufferUnordered + pub struct BufferUnordered where - St: Stream, + St: Stream, + F: Future, { #[pin] stream: Fuse, in_progress_queue: FuturesUnordered, + ready_queue: Vec, max: Option, } } -impl fmt::Debug for BufferUnordered +impl fmt::Debug for BufferUnordered where - St: Stream + fmt::Debug, + St: Stream + fmt::Debug, + F: Future, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufferUnordered") @@ -37,15 +41,16 @@ where } } -impl BufferUnordered +impl BufferUnordered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { pub(super) fn new(stream: St, n: Option) -> Self { Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesUnordered::new(), + ready_queue: Vec::new(), max: n.and_then(NonZeroUsize::new), } } @@ -53,10 +58,10 @@ where delegate_access_inner!(stream, St, (.)); } -impl Stream for BufferUnordered +impl Stream for BufferUnordered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { type Item = ::Output; @@ -72,14 +77,21 @@ where } } - // Attempt to pull the next value from the in_progress_queue - match this.in_progress_queue.poll_next_unpin(cx) { - x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, - Poll::Ready(None) => {} + // Try to poll all ready futures in the in_progress_queue. + loop { + match this.in_progress_queue.poll_next_unpin(cx) { + Poll::Ready(Some(output)) => { + this.ready_queue.push(output); + } + Poll::Ready(None) => break, + Poll::Pending => break, + } } - // If more values are still coming from the stream, we're not done yet - if this.stream.is_done() { + if let Some(output) = this.ready_queue.pop() { + // If we have any ready outputs, return the first one. + Poll::Ready(Some(output)) + } else if this.stream.is_done() && this.in_progress_queue.is_empty() { Poll::Ready(None) } else { Poll::Pending @@ -98,10 +110,10 @@ where } } -impl FusedStream for BufferUnordered +impl FusedStream for BufferUnordered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { fn is_terminated(&self) -> bool { self.in_progress_queue.is_terminated() && self.stream.is_terminated() @@ -110,10 +122,10 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for BufferUnordered +impl Sink for BufferUnordered where - S: Stream + Sink, - S::Item: Future, + S: Stream + Sink, + F: Future, { type Error = S::Error; diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 41e294863..803c9ceba 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -1,9 +1,9 @@ use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt}; +use alloc::collections::VecDeque; use core::fmt; use core::num::NonZeroUsize; use core::pin::Pin; use futures_core::future::Future; -use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] @@ -13,22 +13,23 @@ use pin_project_lite::pin_project; pin_project! { /// Stream for the [`buffered`](super::StreamExt::buffered) method. #[must_use = "streams do nothing unless polled"] - pub struct Buffered + pub struct Buffered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { #[pin] stream: Fuse, in_progress_queue: FuturesOrdered, + ready_queue: VecDeque, max: Option, } } -impl fmt::Debug for Buffered +impl fmt::Debug for Buffered where - St: Stream + fmt::Debug, - St::Item: Future, + St: Stream + fmt::Debug, + F: Future, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Buffered") @@ -39,15 +40,16 @@ where } } -impl Buffered +impl Buffered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { pub(super) fn new(stream: St, n: Option) -> Self { Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), + ready_queue: VecDeque::new(), max: n.and_then(NonZeroUsize::new), } } @@ -55,10 +57,10 @@ where delegate_access_inner!(stream, St, (.)); } -impl Stream for Buffered +impl Stream for Buffered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { type Item = ::Output; @@ -74,14 +76,21 @@ where } } - // Attempt to pull the next value from the in_progress_queue - let res = this.in_progress_queue.poll_next_unpin(cx); - if let Some(val) = ready!(res) { - return Poll::Ready(Some(val)); + // Try to poll all ready futures in the in_progress_queue. + loop { + match this.in_progress_queue.poll_next_unpin(cx) { + Poll::Ready(Some(output)) => { + this.ready_queue.push_back(output); + } + Poll::Ready(None) => break, + Poll::Pending => break, + } } - // If more values are still coming from the stream, we're not done yet - if this.stream.is_done() { + if let Some(output) = this.ready_queue.pop_front() { + // If we have any ready outputs, return the first one. + Poll::Ready(Some(output)) + } else if this.stream.is_done() && this.in_progress_queue.is_empty() { Poll::Ready(None) } else { Poll::Pending @@ -100,10 +109,10 @@ where } } -impl FusedStream for Buffered +impl FusedStream for Buffered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { fn is_terminated(&self) -> bool { self.stream.is_done() && self.in_progress_queue.is_terminated() @@ -112,10 +121,10 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for Buffered +impl Sink for Buffered where - S: Stream + Sink, - S::Item: Future, + S: Stream + Sink, + F: Future, { type Error = S::Error; diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index ee30f8da6..c018e4b24 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -1487,10 +1487,11 @@ pub trait StreamExt: Stream { /// library is activated, and it is activated by default. #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] - fn buffered(self, n: impl Into>) -> Buffered + fn buffered(self, n: impl Into>) -> Buffered where - Self::Item: Future, Self: Sized, + Self: Stream, + F: Future, { assert_stream::<::Output, _>(Buffered::new(self, n.into())) } @@ -1536,10 +1537,11 @@ pub trait StreamExt: Stream { /// ``` #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] - fn buffer_unordered(self, n: impl Into>) -> BufferUnordered + fn buffer_unordered(self, n: impl Into>) -> BufferUnordered where - Self::Item: Future, Self: Sized, + Self: Stream, + F: Future, { assert_stream::<::Output, _>(BufferUnordered::new(self, n.into())) } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 8d15fa28e..ee481b3be 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1112,24 +1112,27 @@ mod stream { assert_not_impl!(AndThen: Unpin); assert_not_impl!(AndThen<(), PhantomPinned, ()>: Unpin); - assert_impl!(BufferUnordered>: Send); - assert_not_impl!(BufferUnordered: Send); - assert_not_impl!(BufferUnordered: Send); - assert_impl!(BufferUnordered>: Sync); - assert_not_impl!(BufferUnordered: Sync); - assert_not_impl!(BufferUnordered: Sync); - assert_impl!(BufferUnordered: Unpin); - assert_not_impl!(BufferUnordered: Unpin); - - assert_impl!(Buffered>>: Send); - assert_not_impl!(Buffered>: Send); - assert_not_impl!(Buffered>: Send); - assert_not_impl!(Buffered>>: Send); - assert_impl!(Buffered>>: Sync); - assert_not_impl!(Buffered>>: Sync); - assert_not_impl!(Buffered>>: Sync); - assert_impl!(Buffered>: Unpin); - assert_not_impl!(Buffered>: Unpin); + assert_impl!(BufferUnordered>, SendFuture<()>>: Send); + assert_not_impl!(BufferUnordered, SendFuture>: Send); + assert_not_impl!(BufferUnordered, LocalFuture>: Send); + assert_not_impl!(BufferUnordered, LocalFuture>: Send); + assert_impl!(BufferUnordered>, SendSyncFuture<()>>: Sync); + assert_not_impl!(BufferUnordered>, SyncFuture<()>>: Sync); + assert_not_impl!(BufferUnordered, LocalFuture>: Sync); + assert_not_impl!(BufferUnordered, LocalFuture>: Sync); + assert_impl!(BufferUnordered, UnpinFuture>: Unpin); + assert_not_impl!(BufferUnordered, PinnedFuture>: Unpin); + + assert_impl!(Buffered>, SendFuture<()>>: Send); + assert_not_impl!(Buffered, SendFuture>: Send); + assert_not_impl!(Buffered, LocalFuture>: Send); + assert_not_impl!(Buffered, LocalFuture>: Send); + assert_impl!(Buffered>, SendSyncFuture<()>>: Sync); + assert_not_impl!(Buffered>, SyncFuture<()>>: Sync); + assert_not_impl!(Buffered, LocalFuture>: Sync); + assert_not_impl!(Buffered, LocalFuture>: Sync); + assert_impl!(Buffered, UnpinFuture>: Unpin); + assert_not_impl!(Buffered, PinnedFuture>: Unpin); assert_impl!(CatchUnwind: Send); assert_not_impl!(CatchUnwind: Send);