diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7547e67..62d6569 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -42,13 +42,13 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: -p wstd --target wasm32-wasip2 + args: -p wstd --target wasm32-wasip2 -- --nocapture - name: example tests uses: actions-rs/cargo@v1 with: command: test - args: -p test-programs-artifacts + args: -p test-programs-artifacts -- --nocapture check_fmt_and_docs: diff --git a/src/runtime/block_on.rs b/src/runtime/block_on.rs index 9a96d98..36c6fe1 100644 --- a/src/runtime/block_on.rs +++ b/src/runtime/block_on.rs @@ -1,11 +1,10 @@ use super::{Reactor, REACTOR}; -use core::future::Future; -use core::pin::pin; -use core::task::Waker; -use core::task::{Context, Poll}; +use std::future::Future; +use std::pin::pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::task::Wake; +use std::task::{Context, Poll, Wake, Waker}; /// Start the event loop pub fn block_on(fut: Fut) -> Fut::Output @@ -24,7 +23,8 @@ where let mut fut = pin!(fut); // Create a new context to be passed to the future. - let waker = noop_waker(); + let root = Arc::new(RootWaker::new()); + let waker = Waker::from(root.clone()); let mut cx = Context::from_waker(&waker); // Either the future completes and we return, or some IO is happening @@ -32,7 +32,16 @@ where let res = loop { match fut.as_mut().poll(&mut cx) { Poll::Ready(res) => break res, - Poll::Pending => reactor.block_until(), + Poll::Pending => { + // If some non-pollable based future has marked the root task + // as awake, reset and poll again. otherwise, block until a + // pollable wakes a future. + if root.is_awake() { + root.reset() + } else { + reactor.block_on_pollables() + } + } } }; // Clear the singleton @@ -40,14 +49,28 @@ where res } -/// Construct a new no-op waker -// NOTE: we can remove this once lands -fn noop_waker() -> Waker { - struct NoopWaker; - - impl Wake for NoopWaker { - fn wake(self: Arc) {} +/// This waker is used in the Context of block_on. If a Future executing in +/// the block_on calls context.wake(), it sets this boolean state so that +/// block_on's Future is polled again immediately, rather than waiting for +/// an external (WASI pollable) event before polling again. +struct RootWaker { + wake: AtomicBool, +} +impl RootWaker { + fn new() -> Self { + Self { + wake: AtomicBool::new(false), + } + } + fn is_awake(&self) -> bool { + self.wake.load(Ordering::Relaxed) + } + fn reset(&self) { + self.wake.store(false, Ordering::Relaxed); + } +} +impl Wake for RootWaker { + fn wake(self: Arc) { + self.wake.store(true, Ordering::Relaxed); } - - Waker::from(Arc::new(NoopWaker)) } diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index afe3deb..8462bd9 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -126,19 +126,8 @@ impl Reactor { } } - /// Block until new events are ready. Calls the respective wakers once done. - /// - /// # On Wakers and single-threaded runtimes - /// - /// At first glance it might seem silly that this goes through the motions - /// of calling the wakers. The main waker we create here is a `noop` waker: - /// it does nothing. However, it is common and encouraged to use wakers to - /// distinguish between events. Concurrency primitives may construct their - /// own wakers to keep track of identity and wake more precisely. We do not - /// control the wakers construted by other libraries, and it is for this - /// reason that we have to call all the wakers - even if by default they - /// will do nothing. - pub(crate) fn block_until(&self) { + /// Block until at least one pending pollable is ready, waking a pending future. + pub(crate) fn block_on_pollables(&self) { let reactor = self.inner.borrow(); // We're about to wait for a number of pollables. When they wake we get @@ -281,4 +270,55 @@ mod test { .await; }) } + + #[test] + fn progresses_wasi_independent_futures() { + crate::runtime::block_on(async { + let start = wasi::clocks::monotonic_clock::now(); + + let reactor = Reactor::current(); + const LONG_DURATION: u64 = 1_000_000_000; + let later = wasi::clocks::monotonic_clock::subscribe_duration(LONG_DURATION); + let later = reactor.schedule(later); + let mut polled_before = false; + // This is basically futures_lite::future::yield_now, except with a boolean + // `polled_before` so we can definitively observe what happened + let wasi_independent_future = futures_lite::future::poll_fn(|cx| { + if polled_before { + std::task::Poll::Ready(true) + } else { + polled_before = true; + cx.waker().wake_by_ref(); + std::task::Poll::Pending + } + }); + let later = async { + later.wait_for().await; + false + }; + let wasi_independent_future_won = + futures_lite::future::race(wasi_independent_future, later).await; + assert!( + wasi_independent_future_won, + "wasi_independent_future should win the race" + ); + const SHORT_DURATION: u64 = LONG_DURATION / 100; + let soon = wasi::clocks::monotonic_clock::subscribe_duration(SHORT_DURATION); + let soon = reactor.schedule(soon); + soon.wait_for().await; + + let end = wasi::clocks::monotonic_clock::now(); + + let duration = end - start; + assert!( + duration > SHORT_DURATION, + "{duration} greater than short duration shows awaited for `soon` properly" + ); + // Upper bound is high enough that even the very poor windows CI machines meet it + assert!( + duration < (5 * SHORT_DURATION), + "{duration} less than a reasonable multiple of short duration {SHORT_DURATION} shows did not await for `later`" + ); + }) + } }