Skip to content

Commit 080c7ca

Browse files
authored
Merge pull request #24 from yoshuawuyts/pch/futures-time
import futures-time
2 parents 5ce367a + d0b37f7 commit 080c7ca

File tree

15 files changed

+639
-33
lines changed

15 files changed

+639
-33
lines changed

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
name = "wstd"
33
version.workspace = true
44
license.workspace = true
5-
repository = "https://github.com/yoshuawuyts/wstd"
65
documentation = "https://docs.rs/wstd"
76
description = "An async standard library for Wasm Components and WASI 0.2"
87
readme = "README.md"
@@ -16,13 +15,16 @@ authors = [
1615
[features]
1716

1817
[dependencies]
18+
futures-core.workspace = true
1919
http.workspace = true
20+
pin-project-lite.workspace = true
2021
slab.workspace = true
2122
wasi.workspace = true
2223
wstd-macro.workspace = true
2324

2425
[dev-dependencies]
2526
anyhow.workspace = true
27+
futures-lite.workspace = true
2628
serde_json.workspace = true
2729
test-programs-artifacts.workspace = true
2830
wasmtime.workspace = true
@@ -41,12 +43,16 @@ resolver = "2"
4143
version = "0.4.0"
4244
edition = "2021"
4345
license = "MIT OR Apache-2.0 OR Apache-2.0 WITH LLVM-exception"
46+
repository = "https://github.com/yoshuawuyts/wstd"
4447

4548
[workspace.dependencies]
4649
anyhow = "1"
4750
cargo_metadata = "0.18.1"
51+
futures-core = "0.3.19"
52+
futures-lite = "1.12.0"
4853
heck = "0.5"
4954
http = "1.1"
55+
pin-project-lite = "0.2.8"
5056
quote = "1.0"
5157
serde_json = "1"
5258
slab = "0.4.9"

src/future/delay.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use futures_core::ready;
2+
use std::future::Future;
3+
use std::pin::Pin;
4+
use std::task::{Context, Poll};
5+
6+
use pin_project_lite::pin_project;
7+
8+
pin_project! {
9+
/// Suspends a future until the specified deadline.
10+
///
11+
/// This `struct` is created by the [`delay`] method on [`FutureExt`]. See its
12+
/// documentation for more.
13+
///
14+
/// [`delay`]: crate::future::FutureExt::delay
15+
/// [`FutureExt`]: crate::future::futureExt
16+
#[must_use = "futures do nothing unless polled or .awaited"]
17+
pub struct Delay<F, D> {
18+
#[pin]
19+
future: F,
20+
#[pin]
21+
deadline: D,
22+
state: State,
23+
}
24+
}
25+
26+
/// The internal state
27+
#[derive(Debug)]
28+
enum State {
29+
Started,
30+
PollFuture,
31+
Completed,
32+
}
33+
34+
impl<F, D> Delay<F, D> {
35+
pub(super) fn new(future: F, deadline: D) -> Self {
36+
Self {
37+
future,
38+
deadline,
39+
state: State::Started,
40+
}
41+
}
42+
}
43+
44+
impl<F: Future, D: Future> Future for Delay<F, D> {
45+
type Output = F::Output;
46+
47+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48+
let mut this = self.project();
49+
loop {
50+
match this.state {
51+
State::Started => {
52+
ready!(this.deadline.as_mut().poll(cx));
53+
*this.state = State::PollFuture;
54+
}
55+
State::PollFuture => {
56+
let value = ready!(this.future.as_mut().poll(cx));
57+
*this.state = State::Completed;
58+
return Poll::Ready(value);
59+
}
60+
State::Completed => panic!("future polled after completing"),
61+
}
62+
}
63+
}
64+
}

src/future/future_ext.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use super::{Delay, Timeout};
2+
use std::future::{Future, IntoFuture};
3+
4+
/// Extend `Future` with time-based operations.
5+
pub trait FutureExt: Future {
6+
/// Return an error if a future does not complete within a given time span.
7+
///
8+
/// Typically timeouts are, as the name implies, based on _time_. However
9+
/// this method can time out based on any future. This can be useful in
10+
/// combination with channels, as it allows (long-lived) futures to be
11+
/// cancelled based on some external event.
12+
///
13+
/// When a timeout is returned, the future will be dropped and destructors
14+
/// will be run.
15+
///
16+
/// # Example
17+
///
18+
/// ```no_run
19+
/// use wstd::prelude::*;
20+
/// use wstd::time::{Instant, Duration};
21+
/// use std::io;
22+
///
23+
/// #[wstd::main]
24+
/// async fn main() {
25+
/// let res = async { "meow" }
26+
/// .delay(Duration::from_millis(100)) // longer delay
27+
/// .timeout(Duration::from_millis(50)) // shorter timeout
28+
/// .await;
29+
/// assert_eq!(res.unwrap_err().kind(), io::ErrorKind::TimedOut); // error
30+
///
31+
/// let res = async { "meow" }
32+
/// .delay(Duration::from_millis(50)) // shorter delay
33+
/// .timeout(Duration::from_millis(100)) // longer timeout
34+
/// .await;
35+
/// assert_eq!(res.unwrap(), "meow"); // success
36+
/// }
37+
/// ```
38+
fn timeout<D>(self, deadline: D) -> Timeout<Self, D::IntoFuture>
39+
where
40+
Self: Sized,
41+
D: IntoFuture,
42+
{
43+
Timeout::new(self, deadline.into_future())
44+
}
45+
46+
/// Delay resolving the future until the given deadline.
47+
///
48+
/// The underlying future will not be polled until the deadline has expired. In addition
49+
/// to using a time source as a deadline, any future can be used as a
50+
/// deadline too. When used in combination with a multi-consumer channel,
51+
/// this method can be used to synchronize the start of multiple futures and streams.
52+
///
53+
/// # Example
54+
///
55+
/// ```no_run
56+
/// use wstd::prelude::*;
57+
/// use wstd::time::{Instant, Duration};
58+
///
59+
/// #[wstd::main]
60+
/// async fn main() {
61+
/// let now = Instant::now();
62+
/// let delay = Duration::from_millis(100);
63+
/// let _ = async { "meow" }.delay(delay).await;
64+
/// assert!(now.elapsed() >= delay);
65+
/// }
66+
/// ```
67+
fn delay<D>(self, deadline: D) -> Delay<Self, D::IntoFuture>
68+
where
69+
Self: Sized,
70+
D: IntoFuture,
71+
{
72+
Delay::new(self, deadline.into_future())
73+
}
74+
}
75+
76+
impl<T> FutureExt for T where T: Future {}

src/future/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//! Asynchronous values.
2+
//!
3+
//! # Cancellation
4+
//!
5+
//! Futures can be cancelled by dropping them before they finish executing. This
6+
//! is useful when we're no longer interested in the result of an operation, as
7+
//! it allows us to stop doing needless work. This also means that a future may cancel at any `.await` point, and so just
8+
//! like with `?` we have to be careful to roll back local state if our future
9+
//! halts there.
10+
//!
11+
//!
12+
//! ```no_run
13+
//! use futures_lite::prelude::*;
14+
//! use wstd::prelude::*;
15+
//! use wstd::time::Duration;
16+
//!
17+
//! #[wstd::main]
18+
//! async fn main() {
19+
//! let mut counter = 0;
20+
//! let value = async { "meow" }
21+
//! .delay(Duration::from_millis(100))
22+
//! .timeout(Duration::from_millis(200))
23+
//! .await;
24+
//!
25+
//! assert_eq!(value.unwrap(), "meow");
26+
//! }
27+
//! ```
28+
29+
mod delay;
30+
mod future_ext;
31+
mod timeout;
32+
33+
pub use delay::Delay;
34+
pub use future_ext::FutureExt;
35+
pub use timeout::Timeout;

src/future/timeout.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use crate::time::utils::timeout_err;
2+
3+
use std::future::Future;
4+
use std::io;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
use pin_project_lite::pin_project;
9+
10+
pin_project! {
11+
/// A future that times out after a duration of time.
12+
///
13+
/// This `struct` is created by the [`timeout`] method on [`FutureExt`]. See its
14+
/// documentation for more.
15+
///
16+
/// [`timeout`]: crate::future::FutureExt::timeout
17+
/// [`FutureExt`]: crate::future::futureExt
18+
#[must_use = "futures do nothing unless polled or .awaited"]
19+
pub struct Timeout<F, D> {
20+
#[pin]
21+
future: F,
22+
#[pin]
23+
deadline: D,
24+
completed: bool,
25+
}
26+
}
27+
28+
impl<F, D> Timeout<F, D> {
29+
pub(super) fn new(future: F, deadline: D) -> Self {
30+
Self {
31+
future,
32+
deadline,
33+
completed: false,
34+
}
35+
}
36+
}
37+
38+
impl<F: Future, D: Future> Future for Timeout<F, D> {
39+
type Output = io::Result<F::Output>;
40+
41+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42+
let this = self.project();
43+
44+
assert!(!*this.completed, "future polled after completing");
45+
46+
match this.future.poll(cx) {
47+
Poll::Ready(v) => {
48+
*this.completed = true;
49+
Poll::Ready(Ok(v))
50+
}
51+
Poll::Pending => match this.deadline.poll(cx) {
52+
Poll::Ready(_) => {
53+
*this.completed = true;
54+
Poll::Ready(Err(timeout_err("future timed out")))
55+
}
56+
Poll::Pending => Poll::Pending,
57+
},
58+
}
59+
}
60+
}

src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
#![allow(async_fn_in_trait)]
2+
#![warn(future_incompatible, unreachable_pub)]
3+
//#![deny(missing_debug_implementations)]
4+
//#![warn(missing_docs)]
5+
//#![forbid(rustdoc::missing_doc_code_examples)]
26

37
//! An async standard library for Wasm Components and WASI 0.2
48
//!
@@ -44,12 +48,21 @@
4448
//! These are unique capabilities provided by WASI 0.2, and because this library
4549
//! is specific to that are exposed from here.
4650
51+
pub mod future;
4752
pub mod http;
4853
pub mod io;
4954
pub mod iter;
5055
pub mod net;
5156
pub mod rand;
5257
pub mod runtime;
58+
pub mod task;
5359
pub mod time;
5460

5561
pub use wstd_macro::attr_macro_main as main;
62+
63+
pub mod prelude {
64+
pub use crate::future::FutureExt as _;
65+
pub use crate::http::Body as _;
66+
pub use crate::io::AsyncRead as _;
67+
pub use crate::io::AsyncWrite as _;
68+
}

src/runtime/reactor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ impl Reactor {
7575
pub async fn wait_for(&self, pollable: Pollable) {
7676
let mut pollable = Some(pollable);
7777
let mut key = None;
78-
7978
// This function is the core loop of our function; it will be called
8079
// multiple times as the future is resolving.
8180
future::poll_fn(|cx| {

src/task/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
//! Types and Traits for working with asynchronous tasks.
2+
3+
mod sleep;
4+
mod sleep_until;
5+
6+
pub use sleep::{sleep, Sleep};
7+
pub use sleep_until::{sleep_until, SleepUntil};

src/task/sleep.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
use crate::time::Timer as AsyncTimer;
6+
use pin_project_lite::pin_project;
7+
8+
use crate::time::{Duration, Instant};
9+
10+
/// Sleeps for the specified amount of time.
11+
///
12+
/// This future can be `push_deadline` to be moved
13+
pub fn sleep(dur: Duration) -> Sleep {
14+
Sleep {
15+
dur,
16+
timer: AsyncTimer::after(dur.into()),
17+
completed: false,
18+
}
19+
}
20+
21+
pin_project! {
22+
/// Sleeps for the specified amount of time.
23+
#[must_use = "futures do nothing unless polled or .awaited"]
24+
pub struct Sleep {
25+
#[pin]
26+
timer: AsyncTimer,
27+
completed: bool,
28+
dur: Duration,
29+
}
30+
}
31+
32+
impl Future for Sleep {
33+
type Output = Instant;
34+
35+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36+
assert!(!self.completed, "future polled after completing");
37+
let this = self.project();
38+
match this.timer.poll(cx) {
39+
Poll::Ready(instant) => {
40+
*this.completed = true;
41+
Poll::Ready(instant.into())
42+
}
43+
Poll::Pending => Poll::Pending,
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)