Skip to content

Commit 949bddc

Browse files
committed
Allow futures independent of wasi pollables to progress
This commit moves away from using noop waker for the main task. Instead, it keeps track if the main task is ready to progress again immediately after `Future::poll()`. If it is, the runtime will skip calling `wasi::io::poll::poll` if there are no wasi pollables scheduled, or otherwise call in a non-blocking manner to quickly get an update for all wasi pollables before progressing the main task again.
1 parent 2218692 commit 949bddc

File tree

2 files changed

+108
-14
lines changed

2 files changed

+108
-14
lines changed

src/runtime/block_on.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use core::future::Future;
44
use core::pin::pin;
55
use core::task::Waker;
66
use core::task::{Context, Poll};
7+
use std::sync::atomic::{AtomicBool, Ordering};
78
use std::sync::Arc;
89
use std::task::Wake;
910

@@ -24,30 +25,50 @@ where
2425
let mut fut = pin!(fut);
2526

2627
// Create a new context to be passed to the future.
27-
let waker = noop_waker();
28+
let waker_impl = Arc::new(ReactorWaker::new());
29+
let waker = Waker::from(Arc::clone(&waker_impl));
2830
let mut cx = Context::from_waker(&waker);
2931

3032
// Either the future completes and we return, or some IO is happening
3133
// and we wait.
3234
let res = loop {
3335
match fut.as_mut().poll(&mut cx) {
3436
Poll::Ready(res) => break res,
35-
Poll::Pending => reactor.block_until(),
37+
Poll::Pending => {
38+
reactor.block_until(waker_impl.awake());
39+
waker_impl.set_awake(false);
40+
}
3641
}
3742
};
3843
// Clear the singleton
3944
REACTOR.replace(None);
4045
res
4146
}
4247

43-
/// Construct a new no-op waker
44-
// NOTE: we can remove this once <https://github.com/rust-lang/rust/issues/98286> lands
45-
fn noop_waker() -> Waker {
46-
struct NoopWaker;
48+
struct ReactorWaker {
49+
awake: AtomicBool,
50+
}
51+
52+
impl ReactorWaker {
53+
fn new() -> Self {
54+
Self {
55+
awake: AtomicBool::new(false),
56+
}
57+
}
4758

48-
impl Wake for NoopWaker {
49-
fn wake(self: Arc<Self>) {}
59+
#[inline]
60+
fn set_awake(&self, awake: bool) {
61+
self.awake.store(awake, Ordering::Relaxed);
5062
}
5163

52-
Waker::from(Arc::new(NoopWaker))
64+
#[inline]
65+
fn awake(&self) -> bool {
66+
self.awake.load(Ordering::Relaxed)
67+
}
68+
}
69+
70+
impl Wake for ReactorWaker {
71+
fn wake(self: Arc<Self>) {
72+
self.set_awake(true);
73+
}
5374
}

src/runtime/reactor.rs

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ use core::future;
55
use core::pin::Pin;
66
use core::task::{Context, Poll, Waker};
77
use slab::Slab;
8+
use std::cell::LazyCell;
89
use std::collections::HashMap;
910
use std::rc::Rc;
11+
use std::sync::Arc;
12+
use std::task::Wake;
1013
use wasi::io::poll::Pollable;
1114

1215
/// A key for a `Pollable`, which is an index into the `Slab<Pollable>` in `Reactor`.
@@ -100,6 +103,7 @@ pub struct Reactor {
100103
struct InnerReactor {
101104
pollables: Slab<Pollable>,
102105
wakers: HashMap<Waitee, Waker>,
106+
immediate: LazyCell<(Pollable, Waker)>,
103107
}
104108

105109
impl Reactor {
@@ -122,6 +126,12 @@ impl Reactor {
122126
inner: Rc::new(RefCell::new(InnerReactor {
123127
pollables: Slab::new(),
124128
wakers: HashMap::new(),
129+
immediate: LazyCell::new(|| {
130+
(
131+
wasi::clocks::monotonic_clock::subscribe_duration(0),
132+
noop_waker(),
133+
)
134+
}),
125135
})),
126136
}
127137
}
@@ -131,30 +141,49 @@ impl Reactor {
131141
/// # On Wakers and single-threaded runtimes
132142
///
133143
/// At first glance it might seem silly that this goes through the motions
134-
/// of calling the wakers. The main waker we create here is a `noop` waker:
135-
/// it does nothing. However, it is common and encouraged to use wakers to
144+
/// of calling the wakers. However, it is common and encouraged to use wakers to
136145
/// distinguish between events. Concurrency primitives may construct their
137146
/// own wakers to keep track of identity and wake more precisely. We do not
138147
/// control the wakers construted by other libraries, and it is for this
139148
/// reason that we have to call all the wakers - even if by default they
140149
/// will do nothing.
141-
pub(crate) fn block_until(&self) {
150+
///
151+
/// The [awake] argument indicates whether the main task is ready to be re-polled
152+
/// to make progress. If it is, we will just poll all pollables without blocking
153+
/// by including an always ready pollable, or choose to skip calling poll at all
154+
/// if no pollables are registered.
155+
pub(crate) fn block_until(&self, awake: bool) {
142156
let reactor = self.inner.borrow();
143157

158+
// If no tasks are interested in any pollables currently, and the main task
159+
// is already awake, run the next poll loop instead
160+
if reactor.wakers.is_empty() && awake {
161+
return;
162+
}
163+
144164
// We're about to wait for a number of pollables. When they wake we get
145165
// the *indexes* back for the pollables whose events were available - so
146166
// we need to be able to associate the index with the right waker.
147167

148168
// We start by iterating over the pollables, and keeping note of which
149169
// pollable belongs to which waker
150-
let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len());
151-
let mut targets = Vec::with_capacity(reactor.wakers.len());
170+
let capacity = reactor.wakers.len() + 1;
171+
let mut indexed_wakers = Vec::with_capacity(capacity);
172+
let mut targets = Vec::with_capacity(capacity);
152173
for (waitee, waker) in reactor.wakers.iter() {
153174
let pollable_index = waitee.pollable.0.key;
154175
indexed_wakers.push(waker);
155176
targets.push(&reactor.pollables[pollable_index.0]);
156177
}
157178

179+
// If the task is already awake, don't blockingly wait for the pollables,
180+
// instead ask the host to give us an update immediately
181+
if awake {
182+
let (immediate, waker) = &*reactor.immediate;
183+
indexed_wakers.push(waker);
184+
targets.push(immediate);
185+
}
186+
158187
debug_assert_ne!(
159188
targets.len(),
160189
0,
@@ -209,6 +238,18 @@ impl Reactor {
209238
}
210239
}
211240

241+
/// Construct a new no-op waker
242+
// NOTE: we can remove this once <https://github.com/rust-lang/rust/issues/98286> lands
243+
fn noop_waker() -> Waker {
244+
struct NoopWaker;
245+
246+
impl Wake for NoopWaker {
247+
fn wake(self: Arc<Self>) {}
248+
}
249+
250+
Waker::from(Arc::new(NoopWaker))
251+
}
252+
212253
#[cfg(test)]
213254
mod test {
214255
use super::*;
@@ -281,4 +322,36 @@ mod test {
281322
.await;
282323
})
283324
}
325+
326+
#[test]
327+
fn progresses_wasi_independent_futures() {
328+
crate::runtime::block_on(async {
329+
let reactor = Reactor::current();
330+
let later = wasi::clocks::monotonic_clock::subscribe_duration(1_000_000_000);
331+
let later = reactor.schedule(later);
332+
let mut polled_before = false;
333+
let wasi_independent_future = futures_lite::future::poll_fn(|cx| {
334+
if polled_before {
335+
std::task::Poll::Ready(true)
336+
} else {
337+
polled_before = true;
338+
cx.waker().wake_by_ref();
339+
std::task::Poll::Pending
340+
}
341+
});
342+
let later = async {
343+
later.wait_for().await;
344+
false
345+
};
346+
let wasi_independent_future_won =
347+
futures_lite::future::race(wasi_independent_future, later).await;
348+
assert!(
349+
wasi_independent_future_won,
350+
"wasi_independent_future should win the race"
351+
);
352+
let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
353+
let soon = reactor.schedule(soon);
354+
soon.wait_for().await;
355+
})
356+
}
284357
}

0 commit comments

Comments
 (0)