diff --git a/tokio/src/task/coop/mod.rs b/tokio/src/task/coop/mod.rs index 6c4588ccf92..70cf6ea9673 100644 --- a/tokio/src/task/coop/mod.rs +++ b/tokio/src/task/coop/mod.rs @@ -306,74 +306,38 @@ cfg_coop! { /// /// # Examples /// - /// This example shows a simple countdown latch that uses [`poll_proceed`] to participate in - /// cooperative scheduling. + /// This example wraps the `futures::channel::mpsc::UnboundedReceiver` to + /// cooperate with the Tokio scheduler. Each time a value is received, task budget + /// is consumed. If no budget is available, the task yields to the scheduler. /// /// ``` - /// use std::future::{Future}; /// use std::pin::Pin; - /// use std::task::{ready, Context, Poll, Waker}; + /// use std::task::{ready, Context, Poll}; /// use tokio::task::coop; + /// use futures::stream::{Stream, StreamExt}; + /// use futures::channel::mpsc::UnboundedReceiver; /// - /// struct CountdownLatch { - /// counter: usize, - /// value: Option, - /// waker: Option + /// struct CoopUnboundedReceiver { + /// receiver: UnboundedReceiver, /// } /// - /// impl CountdownLatch { - /// fn new(value: T, count: usize) -> Self { - /// CountdownLatch { - /// counter: count, - /// value: Some(value), - /// waker: None - /// } - /// } - /// fn count_down(&mut self) { - /// if self.counter <= 0 { - /// return; - /// } - /// - /// self.counter -= 1; - /// if self.counter == 0 { - /// if let Some(w) = self.waker.take() { - /// w.wake(); - /// } - /// } - /// } - /// } - /// - /// impl Future for CountdownLatch { - /// type Output = T; - /// - /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - /// // `poll_proceed` checks with the runtime if this task is still allowed to proceed - /// // with performing work. - /// // If not, `Pending` is returned and `ready!` ensures this function returns. - /// // If we are allowed to proceed, coop now represents the budget consumption + /// impl Stream for CoopUnboundedReceiver { + /// type Item = T; + /// fn poll_next( + /// mut self: Pin<&mut Self>, + /// cx: &mut Context<'_> + /// ) -> Poll> { /// let coop = ready!(coop::poll_proceed(cx)); - /// - /// // Get a mutable reference to the CountdownLatch - /// let this = Pin::get_mut(self); - /// - /// // Next we check if the latch is ready to release its value - /// if this.counter == 0 { - /// let t = this.value.take(); - /// // The latch made progress so call `made_progress` to ensure the budget - /// // is not reverted. - /// coop.made_progress(); - /// Poll::Ready(t.unwrap()) - /// } else { - /// // If the latch is not ready so return pending and simply drop `coop`. - /// // This will restore the budget making it available again to perform any - /// // other work. - /// this.waker = Some(cx.waker().clone()); - /// Poll::Pending - /// } + /// match self.receiver.poll_next_unpin(cx) { + /// Poll::Ready(v) => { + /// // We received a value, so consume budget. + /// coop.made_progress(); + /// Poll::Ready(v) + /// } + /// Poll::Pending => Poll::Pending, + /// } /// } /// } - /// - /// impl Unpin for CountdownLatch {} /// ``` #[inline] pub fn poll_proceed(cx: &mut Context<'_>) -> Poll {