Skip to content

Allow non-pollable woken futures to make progress #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: -p wstd --target wasm32-wasip2
args: -p wstd --target wasm32-wasip2 -- --nocapture

- name: example tests
uses: actions-rs/cargo@v1
with:
command: test
args: -p test-programs-artifacts
args: -p test-programs-artifacts -- --nocapture


check_fmt_and_docs:
Expand Down
55 changes: 39 additions & 16 deletions src/runtime/block_on.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use super::{Reactor, REACTOR};

use core::future::Future;
use core::pin::pin;
use core::task::Waker;
use core::task::{Context, Poll};
use std::future::Future;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Wake;
use std::task::{Context, Poll, Wake, Waker};

/// Start the event loop
pub fn block_on<Fut>(fut: Fut) -> Fut::Output
Expand All @@ -24,30 +23,54 @@ where
let mut fut = pin!(fut);

// Create a new context to be passed to the future.
let waker = noop_waker();
let root = Arc::new(RootWaker::new());
let waker = Waker::from(root.clone());
let mut cx = Context::from_waker(&waker);

// Either the future completes and we return, or some IO is happening
// and we wait.
let res = loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(res) => break res,
Poll::Pending => reactor.block_until(),
Poll::Pending => {
// If some non-pollable based future has marked the root task
// as awake, reset and poll again. otherwise, block until a
// pollable wakes a future.
if root.is_awake() {
root.reset()
} else {
reactor.block_on_pollables()
}
}
}
};
// Clear the singleton
REACTOR.replace(None);
res
}

/// Construct a new no-op waker
// NOTE: we can remove this once <https://github.com/rust-lang/rust/issues/98286> lands
fn noop_waker() -> Waker {
struct NoopWaker;

impl Wake for NoopWaker {
fn wake(self: Arc<Self>) {}
/// This waker is used in the Context of block_on. If a Future executing in
/// the block_on calls context.wake(), it sets this boolean state so that
/// block_on's Future is polled again immediately, rather than waiting for
/// an external (WASI pollable) event before polling again.
struct RootWaker {
wake: AtomicBool,
}
impl RootWaker {
fn new() -> Self {
Self {
wake: AtomicBool::new(false),
}
}
fn is_awake(&self) -> bool {
self.wake.load(Ordering::Relaxed)
}
fn reset(&self) {
self.wake.store(false, Ordering::Relaxed);
}
}
impl Wake for RootWaker {
fn wake(self: Arc<Self>) {
self.wake.store(true, Ordering::Relaxed);
}

Waker::from(Arc::new(NoopWaker))
}
66 changes: 53 additions & 13 deletions src/runtime/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,8 @@ impl Reactor {
}
}

/// Block until new events are ready. Calls the respective wakers once done.
///
/// # On Wakers and single-threaded runtimes
///
/// At first glance it might seem silly that this goes through the motions
/// of calling the wakers. The main waker we create here is a `noop` waker:
/// it does nothing. However, it is common and encouraged to use wakers to
/// distinguish between events. Concurrency primitives may construct their
/// own wakers to keep track of identity and wake more precisely. We do not
/// control the wakers construted by other libraries, and it is for this
/// reason that we have to call all the wakers - even if by default they
/// will do nothing.
pub(crate) fn block_until(&self) {
/// Block until at least one pending pollable is ready, waking a pending future.
pub(crate) fn block_on_pollables(&self) {
let reactor = self.inner.borrow();

// We're about to wait for a number of pollables. When they wake we get
Expand Down Expand Up @@ -281,4 +270,55 @@ mod test {
.await;
})
}

#[test]
fn progresses_wasi_independent_futures() {
crate::runtime::block_on(async {
let start = wasi::clocks::monotonic_clock::now();

let reactor = Reactor::current();
const LONG_DURATION: u64 = 1_000_000_000;
let later = wasi::clocks::monotonic_clock::subscribe_duration(LONG_DURATION);
let later = reactor.schedule(later);
let mut polled_before = false;
// This is basically futures_lite::future::yield_now, except with a boolean
// `polled_before` so we can definitively observe what happened
let wasi_independent_future = futures_lite::future::poll_fn(|cx| {
if polled_before {
std::task::Poll::Ready(true)
} else {
polled_before = true;
cx.waker().wake_by_ref();
std::task::Poll::Pending
}
});
let later = async {
later.wait_for().await;
false
};
let wasi_independent_future_won =
futures_lite::future::race(wasi_independent_future, later).await;
assert!(
wasi_independent_future_won,
"wasi_independent_future should win the race"
);
const SHORT_DURATION: u64 = LONG_DURATION / 100;
let soon = wasi::clocks::monotonic_clock::subscribe_duration(SHORT_DURATION);
let soon = reactor.schedule(soon);
soon.wait_for().await;

let end = wasi::clocks::monotonic_clock::now();

let duration = end - start;
assert!(
duration > SHORT_DURATION,
"{duration} greater than short duration shows awaited for `soon` properly"
);
// Upper bound is high enough that even the very poor windows CI machines meet it
assert!(
duration < (5 * SHORT_DURATION),
"{duration} less than a reasonable multiple of short duration {SHORT_DURATION} shows did not await for `later`"
);
})
}
}
Loading