Skip to content

Allow futures independent of wasi pollables to progress #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions src/runtime/block_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use core::future::Future;
use core::pin::pin;
use core::task::Waker;
use core::task::{Context, Poll};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Wake;

Expand All @@ -24,30 +25,50 @@ where
let mut fut = pin!(fut);

// Create a new context to be passed to the future.
let waker = noop_waker();
let waker_impl = Arc::new(ReactorWaker::new());
let waker = Waker::from(Arc::clone(&waker_impl));
let mut cx = Context::from_waker(&waker);

// Either the future completes and we return, or some IO is happening
// and we wait.
let res = loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(res) => break res,
Poll::Pending => reactor.block_until(),
Poll::Pending => {
reactor.block_until(waker_impl.awake());
waker_impl.set_awake(false);
}
}
};
// Clear the singleton
REACTOR.replace(None);
res
}

/// Construct a new no-op waker
// NOTE: we can remove this once <https://github.com/rust-lang/rust/issues/98286> lands
fn noop_waker() -> Waker {
struct NoopWaker;
struct ReactorWaker {
awake: AtomicBool,
}

impl ReactorWaker {
fn new() -> Self {
Self {
awake: AtomicBool::new(false),
}
}

impl Wake for NoopWaker {
fn wake(self: Arc<Self>) {}
#[inline]
fn set_awake(&self, awake: bool) {
self.awake.store(awake, Ordering::Relaxed);
}

Waker::from(Arc::new(NoopWaker))
#[inline]
fn awake(&self) -> bool {
self.awake.load(Ordering::Relaxed)
}
}

impl Wake for ReactorWaker {
fn wake(self: Arc<Self>) {
self.set_awake(true);
}
}
83 changes: 78 additions & 5 deletions src/runtime/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use core::future;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use slab::Slab;
use std::cell::LazyCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;
use std::task::Wake;
use wasi::io::poll::Pollable;

/// A key for a `Pollable`, which is an index into the `Slab<Pollable>` in `Reactor`.
Expand Down Expand Up @@ -100,6 +103,7 @@ pub struct Reactor {
struct InnerReactor {
pollables: Slab<Pollable>,
wakers: HashMap<Waitee, Waker>,
immediate: LazyCell<(Pollable, Waker)>,
}

impl Reactor {
Expand All @@ -122,6 +126,12 @@ impl Reactor {
inner: Rc::new(RefCell::new(InnerReactor {
pollables: Slab::new(),
wakers: HashMap::new(),
immediate: LazyCell::new(|| {
(
wasi::clocks::monotonic_clock::subscribe_duration(0),
noop_waker(),
)
}),
})),
}
}
Expand All @@ -131,30 +141,49 @@ impl Reactor {
/// # On Wakers and single-threaded runtimes
///
/// At first glance it might seem silly that this goes through the motions
/// of calling the wakers. The main waker we create here is a `noop` waker:
/// it does nothing. However, it is common and encouraged to use wakers to
/// of calling the wakers. However, it is common and encouraged to use wakers to
/// distinguish between events. Concurrency primitives may construct their
/// own wakers to keep track of identity and wake more precisely. We do not
/// control the wakers construted by other libraries, and it is for this
/// reason that we have to call all the wakers - even if by default they
/// will do nothing.
pub(crate) fn block_until(&self) {
///
/// The [awake] argument indicates whether the main task is ready to be re-polled
/// to make progress. If it is, we will just poll all pollables without blocking
/// by including an always ready pollable, or choose to skip calling poll at all
/// if no pollables are registered.
pub(crate) fn block_until(&self, awake: bool) {
let reactor = self.inner.borrow();

// If no tasks are interested in any pollables currently, and the main task
// is already awake, run the next poll loop instead
if reactor.wakers.is_empty() && awake {
return;
}

// We're about to wait for a number of pollables. When they wake we get
// the *indexes* back for the pollables whose events were available - so
// we need to be able to associate the index with the right waker.

// We start by iterating over the pollables, and keeping note of which
// pollable belongs to which waker
let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len());
let mut targets = Vec::with_capacity(reactor.wakers.len());
let capacity = reactor.wakers.len() + 1;
let mut indexed_wakers = Vec::with_capacity(capacity);
let mut targets = Vec::with_capacity(capacity);
for (waitee, waker) in reactor.wakers.iter() {
let pollable_index = waitee.pollable.0.key;
indexed_wakers.push(waker);
targets.push(&reactor.pollables[pollable_index.0]);
}

// If the task is already awake, don't blockingly wait for the pollables,
// instead ask the host to give us an update immediately
if awake {
let (immediate, waker) = &*reactor.immediate;
indexed_wakers.push(waker);
targets.push(immediate);
}

debug_assert_ne!(
targets.len(),
0,
Expand Down Expand Up @@ -209,6 +238,18 @@ impl Reactor {
}
}

/// Construct a new no-op waker
// NOTE: we can remove this once <https://github.com/rust-lang/rust/issues/98286> lands
fn noop_waker() -> Waker {
struct NoopWaker;

impl Wake for NoopWaker {
fn wake(self: Arc<Self>) {}
}

Waker::from(Arc::new(NoopWaker))
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -281,4 +322,36 @@ mod test {
.await;
})
}

#[test]
fn progresses_wasi_independent_futures() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let later = wasi::clocks::monotonic_clock::subscribe_duration(1_000_000_000);
let later = reactor.schedule(later);
let mut polled_before = false;
let wasi_independent_future = futures_lite::future::poll_fn(|cx| {
if polled_before {
std::task::Poll::Ready(true)
} else {
polled_before = true;
cx.waker().wake_by_ref();
std::task::Poll::Pending
}
});
let later = async {
later.wait_for().await;
false
};
let wasi_independent_future_won =
futures_lite::future::race(wasi_independent_future, later).await;
assert!(
wasi_independent_future_won,
"wasi_independent_future should win the race"
);
let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
let soon = reactor.schedule(soon);
soon.wait_for().await;
})
}
}