Skip to content

Commit 79965bd

Browse files
tylerhawkestaiki-e
authored andcommitted
New poll_immediate functions to immediately return from a poll (#2452)
1 parent 660530f commit 79965bd

File tree

4 files changed

+212
-0
lines changed

4 files changed

+212
-0
lines changed

futures-util/src/future/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ pub use self::option::OptionFuture;
6868
mod poll_fn;
6969
pub use self::poll_fn::{poll_fn, PollFn};
7070

71+
mod poll_immediate;
72+
pub use self::poll_immediate::{poll_immediate, PollImmediate};
73+
7174
mod ready;
7275
pub use self::ready::{err, ok, ready, Ready};
7376

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use super::assert_future;
2+
use core::pin::Pin;
3+
use futures_core::task::{Context, Poll};
4+
use futures_core::{FusedFuture, Future, Stream};
5+
use pin_project_lite::pin_project;
6+
7+
pin_project! {
8+
/// Future for the [`poll_immediate`](poll_immediate()) function.
9+
///
10+
/// It will never return [Poll::Pending](core::task::Poll::Pending)
11+
#[derive(Debug, Clone)]
12+
#[must_use = "futures do nothing unless you `.await` or poll them"]
13+
pub struct PollImmediate<T> {
14+
#[pin]
15+
future: Option<T>
16+
}
17+
}
18+
19+
impl<T, F> Future for PollImmediate<F>
20+
where
21+
F: Future<Output = T>,
22+
{
23+
type Output = Option<T>;
24+
25+
#[inline]
26+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
27+
let mut this = self.project();
28+
let inner =
29+
this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion");
30+
match inner.poll(cx) {
31+
Poll::Ready(t) => {
32+
this.future.set(None);
33+
Poll::Ready(Some(t))
34+
}
35+
Poll::Pending => Poll::Ready(None),
36+
}
37+
}
38+
}
39+
40+
impl<T: Future> FusedFuture for PollImmediate<T> {
41+
fn is_terminated(&self) -> bool {
42+
self.future.is_none()
43+
}
44+
}
45+
46+
/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done.
47+
/// The stream will never return [Poll::Pending](core::task::Poll::Pending)
48+
/// so polling it in a tight loop is worse than using a blocking synchronous function.
49+
/// ```
50+
/// # futures::executor::block_on(async {
51+
/// use futures::task::Poll;
52+
/// use futures::{StreamExt, future, pin_mut};
53+
/// use future::FusedFuture;
54+
///
55+
/// let f = async { 1_u32 };
56+
/// pin_mut!(f);
57+
/// let mut r = future::poll_immediate(f);
58+
/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
59+
///
60+
/// let f = async {futures::pending!(); 42_u8};
61+
/// pin_mut!(f);
62+
/// let mut p = future::poll_immediate(f);
63+
/// assert_eq!(p.next().await, Some(Poll::Pending));
64+
/// assert!(!p.is_terminated());
65+
/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
66+
/// assert!(p.is_terminated());
67+
/// assert_eq!(p.next().await, None);
68+
/// # });
69+
/// ```
70+
impl<T, F> Stream for PollImmediate<F>
71+
where
72+
F: Future<Output = T>,
73+
{
74+
type Item = Poll<T>;
75+
76+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77+
let mut this = self.project();
78+
match this.future.as_mut().as_pin_mut() {
79+
// inner is gone, so we can signal that the stream is closed.
80+
None => Poll::Ready(None),
81+
Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| {
82+
this.future.set(None);
83+
t
84+
}))),
85+
}
86+
}
87+
}
88+
89+
/// Creates a future that is immediately ready with an Option of a value.
90+
/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready).
91+
///
92+
/// # Caution
93+
///
94+
/// When consuming the future by this function, note the following:
95+
///
96+
/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value.
97+
/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems.
98+
///
99+
/// # Examples
100+
///
101+
/// ```
102+
/// # futures::executor::block_on(async {
103+
/// use futures::future;
104+
///
105+
/// let r = future::poll_immediate(async { 1_u32 });
106+
/// assert_eq!(r.await, Some(1));
107+
///
108+
/// let p = future::poll_immediate(future::pending::<i32>());
109+
/// assert_eq!(p.await, None);
110+
/// # });
111+
/// ```
112+
///
113+
/// ### Reusing a future
114+
///
115+
/// ```
116+
/// # futures::executor::block_on(async {
117+
/// use futures::{future, pin_mut};
118+
/// let f = async {futures::pending!(); 42_u8};
119+
/// pin_mut!(f);
120+
/// assert_eq!(None, future::poll_immediate(&mut f).await);
121+
/// assert_eq!(42, f.await);
122+
/// # });
123+
/// ```
124+
pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> {
125+
assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) })
126+
}

futures-util/src/stream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ pub use self::pending::{pending, Pending};
8888
mod poll_fn;
8989
pub use self::poll_fn::{poll_fn, PollFn};
9090

91+
mod poll_immediate;
92+
pub use self::poll_immediate::{poll_immediate, PollImmediate};
93+
9194
mod select;
9295
pub use self::select::{select, Select};
9396

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use core::pin::Pin;
2+
use futures_core::task::{Context, Poll};
3+
use futures_core::Stream;
4+
use pin_project_lite::pin_project;
5+
6+
pin_project! {
7+
/// Stream for the [poll_immediate](poll_immediate()) function.
8+
///
9+
/// It will never return [Poll::Pending](core::task::Poll::Pending)
10+
#[derive(Debug, Clone)]
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct PollImmediate<S> {
13+
#[pin]
14+
stream: Option<S>
15+
}
16+
}
17+
18+
impl<T, S> Stream for PollImmediate<S>
19+
where
20+
S: Stream<Item = T>,
21+
{
22+
type Item = Poll<T>;
23+
24+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
25+
let mut this = self.project();
26+
let stream = match this.stream.as_mut().as_pin_mut() {
27+
// inner is gone, so we can continue to signal that the stream is closed.
28+
None => return Poll::Ready(None),
29+
Some(inner) => inner,
30+
};
31+
32+
match stream.poll_next(cx) {
33+
Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))),
34+
Poll::Ready(None) => {
35+
this.stream.set(None);
36+
Poll::Ready(None)
37+
}
38+
Poll::Pending => Poll::Ready(Some(Poll::Pending)),
39+
}
40+
}
41+
42+
fn size_hint(&self) -> (usize, Option<usize>) {
43+
self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint)
44+
}
45+
}
46+
47+
impl<S: Stream> super::FusedStream for PollImmediate<S> {
48+
fn is_terminated(&self) -> bool {
49+
self.stream.is_none()
50+
}
51+
}
52+
53+
/// Creates a new stream that always immediately returns [Poll::Ready](core::task::Poll::Ready) when awaiting it.
54+
///
55+
/// This is useful when immediacy is more important than waiting for the next item to be ready.
56+
///
57+
/// # Examples
58+
///
59+
/// ```
60+
/// # futures::executor::block_on(async {
61+
/// use futures::stream::{self, StreamExt};
62+
/// use futures::task::Poll;
63+
///
64+
/// let mut r = stream::poll_immediate(Box::pin(stream::iter(1_u32..3)));
65+
/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
66+
/// assert_eq!(r.next().await, Some(Poll::Ready(2)));
67+
/// assert_eq!(r.next().await, None);
68+
///
69+
/// let mut p = stream::poll_immediate(Box::pin(stream::once(async {
70+
/// futures::pending!();
71+
/// 42_u8
72+
/// })));
73+
/// assert_eq!(p.next().await, Some(Poll::Pending));
74+
/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
75+
/// assert_eq!(p.next().await, None);
76+
/// # });
77+
/// ```
78+
pub fn poll_immediate<S: Stream>(s: S) -> PollImmediate<S> {
79+
super::assert_stream::<Poll<S::Item>, PollImmediate<S>>(PollImmediate { stream: Some(s) })
80+
}

0 commit comments

Comments
 (0)