Skip to content

Commit 1db1b23

Browse files
committed
add Future::wait_until
1 parent 6949e9b commit 1db1b23

File tree

4 files changed

+99
-0
lines changed

4 files changed

+99
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ slab = { version = "0.4.8", optional = true }
3939
smallvec = { version = "1.11.0", optional = true }
4040

4141
[dev-dependencies]
42+
async-io = "2.3.2"
4243
async-std = { version = "1.12.0", features = ["attributes"] }
4344
criterion = { version = "0.3", features = [
4445
"async",

src/future/futures_ext.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use futures_core::Future;
55

66
use super::join::tuple::Join2;
77
use super::race::tuple::Race2;
8+
use super::WaitUntil;
89

910
/// An extension trait for the `Future` trait.
1011
pub trait FutureExt: Future {
@@ -19,6 +20,40 @@ pub trait FutureExt: Future {
1920
where
2021
Self: Future<Output = T> + Sized,
2122
S2: IntoFuture<Output = T>;
23+
24+
/// Delay resolving the future until the given deadline.
25+
///
26+
/// The underlying future will not be polled until the deadline has expired. In addition
27+
/// to using a time source as a deadline, any future can be used as a
28+
/// deadline too. When used in combination with a multi-consumer channel,
29+
/// this method can be used to synchronize the start of multiple futures and streams.
30+
///
31+
/// # Example
32+
///
33+
/// ```
34+
/// use async_io::Timer;
35+
/// use futures_concurrency::prelude::*;
36+
/// use futures_lite::future::block_on;
37+
/// use std::time::{Duration, Instant};
38+
///
39+
/// block_on(async {
40+
/// let now = Instant::now();
41+
/// let duration = Duration::from_millis(100);
42+
///
43+
/// async { "meow" }
44+
/// .wait_until(Timer::after(duration))
45+
/// .await;
46+
///
47+
/// assert!(now.elapsed() >= duration);
48+
/// });
49+
/// ```
50+
fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
51+
where
52+
Self: Sized,
53+
D: IntoFuture,
54+
{
55+
WaitUntil::new(self, deadline.into_future())
56+
}
2257
}
2358

2459
impl<F1> FutureExt for F1

src/future/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ pub use join::Join;
7676
pub use race::Race;
7777
pub use race_ok::RaceOk;
7878
pub use try_join::TryJoin;
79+
pub use wait_until::WaitUntil;
7980

8081
/// A growable group of futures which act as a single unit.
8182
#[cfg(feature = "alloc")]
@@ -86,3 +87,4 @@ pub(crate) mod join;
8687
pub(crate) mod race;
8788
pub(crate) mod race_ok;
8889
pub(crate) mod try_join;
90+
pub(crate) mod wait_until;

src/future/wait_until.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use core::future::Future;
2+
use core::pin::Pin;
3+
use core::task::{ready, Context, Poll};
4+
5+
/// Suspends a future until the specified deadline.
6+
///
7+
/// This `struct` is created by the [`wait_until`] method on [`FutureExt`]. See its
8+
/// documentation for more.
9+
///
10+
/// [`wait_until`]: crate::future::FutureExt::wait_until
11+
/// [`FutureExt`]: crate::future::FutureExt
12+
#[derive(Debug)]
13+
#[pin_project::pin_project]
14+
#[must_use = "futures do nothing unless polled or .awaited"]
15+
pub struct WaitUntil<F, D> {
16+
#[pin]
17+
future: F,
18+
#[pin]
19+
deadline: D,
20+
state: State,
21+
}
22+
23+
/// The internal state
24+
#[derive(Debug)]
25+
enum State {
26+
Started,
27+
PollFuture,
28+
Completed,
29+
}
30+
31+
impl<F, D> WaitUntil<F, D> {
32+
pub(super) fn new(future: F, deadline: D) -> Self {
33+
Self {
34+
future,
35+
deadline,
36+
state: State::Started,
37+
}
38+
}
39+
}
40+
41+
impl<F: Future, D: Future> Future for WaitUntil<F, D> {
42+
type Output = F::Output;
43+
44+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45+
let mut this = self.project();
46+
loop {
47+
match this.state {
48+
State::Started => {
49+
ready!(this.deadline.as_mut().poll(cx));
50+
*this.state = State::PollFuture;
51+
}
52+
State::PollFuture => {
53+
let value = ready!(this.future.as_mut().poll(cx));
54+
*this.state = State::Completed;
55+
return Poll::Ready(value);
56+
}
57+
State::Completed => panic!("future polled after completing"),
58+
}
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)