diff --git a/futures-util/src/stream/forward.rs b/futures-util/src/stream/forward.rs index 0ae2dfe52f..3a4e215853 100644 --- a/futures-util/src/stream/forward.rs +++ b/futures-util/src/stream/forward.rs @@ -9,16 +9,10 @@ use pin_utils::{unsafe_pinned, unsafe_unpinned}; const INVALID_POLL: &str = "polled `Forward` after completion"; /// Future for the `Stream::forward` combinator, which sends a stream of values -/// to a sink and then flushes the sink. -/// -/// Note: this is only usable with `Unpin` sinks, so `Sink`s that aren't `Unpin` -/// will need to be pinned in order to be used with this combinator. -// -// This limitation is necessary in order to return the sink after the forwarding -// has completed so that it can be used again. +/// to a sink and then flushes and closes the sink. #[derive(Debug)] #[must_use = "steams do nothing unless polled"] -pub struct Forward { +pub struct Forward { sink: Option, stream: Fuse, buffered_item: Option, @@ -28,7 +22,7 @@ impl Unpin for Forward {} impl Forward where - Si: Sink + Unpin, + Si: Sink, St: Stream>, { unsafe_pinned!(sink: Option); @@ -68,10 +62,10 @@ impl FusedFuture for Forward { impl Future for Forward where - Si: Sink + Unpin, + Si: Sink, St: Stream>, { - type Output = Result; + type Output = Result<(), Si::SinkError>; fn poll( mut self: Pin<&mut Self>, @@ -91,7 +85,8 @@ where Poll::Ready(None) => { try_ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL) .poll_close(waker)); - return Poll::Ready(Ok(self.as_mut().sink().take().unwrap())) + self.as_mut().sink().set(None); + return Poll::Ready(Ok(())) } Poll::Pending => { try_ready!(self.as_mut().sink().as_pin_mut().expect(INVALID_POLL) diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 28308246d8..c3eafdcfc5 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -970,21 +970,15 @@ pub trait StreamExt: Stream { } /// A future that completes after the given stream has been fully processed - /// into the sink, including flushing. + /// into the sink and the sink has been flushed and closed. /// /// This future will drive the stream to keep producing items until it is - /// exhausted, sending each item to the sink. It will complete once both the - /// stream is exhausted and the sink has received and flushed all items. - /// Note that the sink is **not** closed. - /// - /// On completion, the sink is returned. - /// - /// Note that this combinator is only usable with `Unpin` sinks. - /// Sinks that are not `Unpin` will need to be pinned in order to be used - /// with `forward`. + /// exhausted, sending each item to the sink. It will complete once the + /// stream is exhausted, the sink has received and flushed all items, and + /// the sink is closed. fn forward(self, sink: S) -> Forward where - S: Sink + Unpin, + S: Sink, Self: Stream> + Sized, { Forward::new(self, sink)