diff --git a/src/runtime/block_on.rs b/src/runtime/block_on.rs index 9a96d98..d0bbc5c 100644 --- a/src/runtime/block_on.rs +++ b/src/runtime/block_on.rs @@ -4,6 +4,7 @@ use core::future::Future; use core::pin::pin; use core::task::Waker; use core::task::{Context, Poll}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::Wake; @@ -24,7 +25,8 @@ where let mut fut = pin!(fut); // Create a new context to be passed to the future. - let waker = noop_waker(); + let waker_impl = Arc::new(ReactorWaker::new()); + let waker = Waker::from(Arc::clone(&waker_impl)); let mut cx = Context::from_waker(&waker); // Either the future completes and we return, or some IO is happening @@ -32,7 +34,10 @@ where let res = loop { match fut.as_mut().poll(&mut cx) { Poll::Ready(res) => break res, - Poll::Pending => reactor.block_until(), + Poll::Pending => { + reactor.block_until(waker_impl.awake()); + waker_impl.set_awake(false); + } } }; // Clear the singleton @@ -40,14 +45,30 @@ where res } -/// Construct a new no-op waker -// NOTE: we can remove this once lands -fn noop_waker() -> Waker { - struct NoopWaker; +struct ReactorWaker { + awake: AtomicBool, +} + +impl ReactorWaker { + fn new() -> Self { + Self { + awake: AtomicBool::new(false), + } + } - impl Wake for NoopWaker { - fn wake(self: Arc) {} + #[inline] + fn set_awake(&self, awake: bool) { + self.awake.store(awake, Ordering::Relaxed); } - Waker::from(Arc::new(NoopWaker)) + #[inline] + fn awake(&self) -> bool { + self.awake.load(Ordering::Relaxed) + } +} + +impl Wake for ReactorWaker { + fn wake(self: Arc) { + self.set_awake(true); + } } diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index afe3deb..9222861 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -5,8 +5,11 @@ use core::future; use core::pin::Pin; use core::task::{Context, Poll, Waker}; use slab::Slab; +use std::cell::LazyCell; use std::collections::HashMap; use std::rc::Rc; +use std::sync::Arc; +use std::task::Wake; use wasi::io::poll::Pollable; /// A key for a `Pollable`, which is an index into the `Slab` in `Reactor`. @@ -100,6 +103,7 @@ pub struct Reactor { struct InnerReactor { pollables: Slab, wakers: HashMap, + immediate: LazyCell<(Pollable, Waker)>, } impl Reactor { @@ -122,6 +126,12 @@ impl Reactor { inner: Rc::new(RefCell::new(InnerReactor { pollables: Slab::new(), wakers: HashMap::new(), + immediate: LazyCell::new(|| { + ( + wasi::clocks::monotonic_clock::subscribe_duration(0), + noop_waker(), + ) + }), })), } } @@ -131,30 +141,49 @@ impl Reactor { /// # On Wakers and single-threaded runtimes /// /// At first glance it might seem silly that this goes through the motions - /// of calling the wakers. The main waker we create here is a `noop` waker: - /// it does nothing. However, it is common and encouraged to use wakers to + /// of calling the wakers. However, it is common and encouraged to use wakers to /// distinguish between events. Concurrency primitives may construct their /// own wakers to keep track of identity and wake more precisely. We do not /// control the wakers construted by other libraries, and it is for this /// reason that we have to call all the wakers - even if by default they /// will do nothing. - pub(crate) fn block_until(&self) { + /// + /// The [awake] argument indicates whether the main task is ready to be re-polled + /// to make progress. If it is, we will just poll all pollables without blocking + /// by including an always ready pollable, or choose to skip calling poll at all + /// if no pollables are registered. + pub(crate) fn block_until(&self, awake: bool) { let reactor = self.inner.borrow(); + // If no tasks are interested in any pollables currently, and the main task + // is already awake, run the next poll loop instead + if reactor.wakers.is_empty() && awake { + return; + } + // We're about to wait for a number of pollables. When they wake we get // the *indexes* back for the pollables whose events were available - so // we need to be able to associate the index with the right waker. // We start by iterating over the pollables, and keeping note of which // pollable belongs to which waker - let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len()); - let mut targets = Vec::with_capacity(reactor.wakers.len()); + let capacity = reactor.wakers.len() + 1; + let mut indexed_wakers = Vec::with_capacity(capacity); + let mut targets = Vec::with_capacity(capacity); for (waitee, waker) in reactor.wakers.iter() { let pollable_index = waitee.pollable.0.key; indexed_wakers.push(waker); targets.push(&reactor.pollables[pollable_index.0]); } + // If the task is already awake, don't blockingly wait for the pollables, + // instead ask the host to give us an update immediately + if awake { + let (immediate, waker) = &*reactor.immediate; + indexed_wakers.push(waker); + targets.push(immediate); + } + debug_assert_ne!( targets.len(), 0, @@ -209,6 +238,18 @@ impl Reactor { } } +/// Construct a new no-op waker +// NOTE: we can remove this once lands +fn noop_waker() -> Waker { + struct NoopWaker; + + impl Wake for NoopWaker { + fn wake(self: Arc) {} + } + + Waker::from(Arc::new(NoopWaker)) +} + #[cfg(test)] mod test { use super::*; @@ -281,4 +322,36 @@ mod test { .await; }) } + + #[test] + fn progresses_wasi_independent_futures() { + crate::runtime::block_on(async { + let reactor = Reactor::current(); + let later = wasi::clocks::monotonic_clock::subscribe_duration(1_000_000_000); + let later = reactor.schedule(later); + let mut polled_before = false; + let wasi_independent_future = futures_lite::future::poll_fn(|cx| { + if polled_before { + std::task::Poll::Ready(true) + } else { + polled_before = true; + cx.waker().wake_by_ref(); + std::task::Poll::Pending + } + }); + let later = async { + later.wait_for().await; + false + }; + let wasi_independent_future_won = + futures_lite::future::race(wasi_independent_future, later).await; + assert!( + wasi_independent_future_won, + "wasi_independent_future should win the race" + ); + let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000); + let soon = reactor.schedule(soon); + soon.wait_for().await; + }) + } }