Skip to content

Commit bdb873a

Browse files
committed
add Stream::wait_until
1 parent 1db1b23 commit bdb873a

File tree

3 files changed

+103
-1
lines changed

3 files changed

+103
-1
lines changed

src/stream/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub use stream_ext::StreamExt;
5454
#[doc(inline)]
5555
#[cfg(feature = "alloc")]
5656
pub use stream_group::StreamGroup;
57+
pub use wait_until::WaitUntil;
5758
pub use zip::Zip;
5859

5960
/// A growable group of streams which act as a single unit.
@@ -64,4 +65,5 @@ pub(crate) mod chain;
6465
mod into_stream;
6566
pub(crate) mod merge;
6667
mod stream_ext;
68+
pub(crate) mod wait_until;
6769
pub(crate) mod zip;

src/stream/stream_ext.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
use core::future::IntoFuture;
2+
13
use crate::stream::{IntoStream, Merge};
24
use futures_core::Stream;
35

46
#[cfg(feature = "alloc")]
57
use crate::concurrent_stream::FromStream;
68

7-
use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip};
9+
use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, WaitUntil, Zip};
810

911
/// An extension trait for the `Stream` trait.
1012
pub trait StreamExt: Stream {
@@ -34,6 +36,41 @@ pub trait StreamExt: Stream {
3436
{
3537
FromStream::new(self)
3638
}
39+
40+
/// Delay the yielding of items from the stream until the given deadline.
41+
///
42+
/// The underlying stream will not be polled until the deadline has expired. In addition
43+
/// to using a time source as a deadline, any future can be used as a
44+
/// deadline too. When used in combination with a multi-consumer channel,
45+
/// this method can be used to synchronize the start of multiple streams and futures.
46+
///
47+
/// # Example
48+
/// ```
49+
/// use async_io::Timer;
50+
/// use futures_concurrency::prelude::*;
51+
/// use futures_lite::{future::block_on, stream};
52+
/// use futures_lite::prelude::*;
53+
/// use std::time::{Duration, Instant};
54+
///
55+
/// block_on(async {
56+
/// let now = Instant::now();
57+
/// let duration = Duration::from_millis(100);
58+
///
59+
/// stream::once("meow")
60+
/// .wait_until(Timer::after(duration))
61+
/// .next()
62+
/// .await;
63+
///
64+
/// assert!(now.elapsed() >= duration);
65+
/// });
66+
/// ```
67+
fn wait_until<D>(self, deadline: D) -> WaitUntil<Self, D::IntoFuture>
68+
where
69+
Self: Sized,
70+
D: IntoFuture,
71+
{
72+
WaitUntil::new(self, deadline.into_future())
73+
}
3774
}
3875

3976
impl<S1> StreamExt for S1

src/stream/wait_until.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use core::future::Future;
2+
use core::pin::Pin;
3+
use core::task::{Context, Poll};
4+
5+
use futures_core::stream::Stream;
6+
use pin_project::pin_project;
7+
8+
/// Delay execution of a stream once for the specified duration.
9+
///
10+
/// This `struct` is created by the [`wait_until`] method on [`StreamExt`]. See its
11+
/// documentation for more.
12+
///
13+
/// [`wait_until`]: crate::stream::StreamExt::wait_until
14+
/// [`StreamExt`]: crate::stream::StreamExt
15+
#[derive(Debug)]
16+
#[must_use = "streams do nothing unless polled or .awaited"]
17+
#[pin_project]
18+
pub struct WaitUntil<S, D> {
19+
#[pin]
20+
stream: S,
21+
#[pin]
22+
deadline: D,
23+
state: State,
24+
}
25+
26+
#[derive(Debug)]
27+
enum State {
28+
Timer,
29+
Streaming,
30+
}
31+
32+
impl<S, D> WaitUntil<S, D> {
33+
pub(crate) fn new(stream: S, deadline: D) -> Self {
34+
WaitUntil {
35+
stream,
36+
deadline,
37+
state: State::Timer,
38+
}
39+
}
40+
}
41+
42+
impl<S, D> Stream for WaitUntil<S, D>
43+
where
44+
S: Stream,
45+
D: Future,
46+
{
47+
type Item = S::Item;
48+
49+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50+
let this = self.project();
51+
52+
match this.state {
53+
State::Timer => match this.deadline.poll(cx) {
54+
Poll::Pending => return Poll::Pending,
55+
Poll::Ready(_) => {
56+
*this.state = State::Streaming;
57+
this.stream.poll_next(cx)
58+
}
59+
},
60+
State::Streaming => this.stream.poll_next(cx),
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)