Skip to content

Commit 3053229

Browse files
authored
Merge pull request #35 from yoshuawuyts/pch/new_reactor
Re-implement reactor
2 parents 260a42a + 04950cd commit 3053229

File tree

11 files changed

+325
-249
lines changed

11 files changed

+325
-249
lines changed

src/runtime/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111
#![warn(missing_docs, unreachable_pub)]
1212

1313
mod block_on;
14-
mod polling;
1514
mod reactor;
1615

1716
pub use block_on::block_on;
18-
pub use reactor::Reactor;
17+
pub use reactor::{AsyncPollable, Reactor, WaitFor};
1918
use std::cell::RefCell;
2019

2120
// There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all

src/runtime/polling.rs

Lines changed: 0 additions & 88 deletions
This file was deleted.

src/runtime/reactor.rs

Lines changed: 221 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,88 @@
1-
use super::{
2-
polling::{EventKey, Poller},
3-
REACTOR,
4-
};
1+
use super::REACTOR;
52

63
use core::cell::RefCell;
74
use core::future;
8-
use core::task::Poll;
9-
use core::task::Waker;
5+
use core::pin::Pin;
6+
use core::task::{Context, Poll, Waker};
7+
use slab::Slab;
108
use std::collections::HashMap;
119
use std::rc::Rc;
1210
use wasi::io::poll::Pollable;
1311

12+
/// A key for a Pollable, which is an index into the Slab<Pollable> in Reactor.
13+
#[repr(transparent)]
14+
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
15+
pub(crate) struct EventKey(pub(crate) usize);
16+
17+
/// A Registration is a reference to the Reactor's owned Pollable. When the registration is
18+
/// dropped, the reactor will drop the Pollable resource.
19+
#[derive(Debug, PartialEq, Eq, Hash)]
20+
struct Registration {
21+
key: EventKey,
22+
}
23+
24+
impl Drop for Registration {
25+
fn drop(&mut self) {
26+
Reactor::current().deregister_event(self.key)
27+
}
28+
}
29+
30+
/// An AsyncPollable is a reference counted Registration. It can be cloned, and used to create
31+
/// as many WaitFor futures on a Pollable that the user needs.
32+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33+
pub struct AsyncPollable(Rc<Registration>);
34+
35+
impl AsyncPollable {
36+
/// Create a Future that waits for the Pollable's readiness.
37+
pub fn wait_for(&self) -> WaitFor {
38+
use std::sync::atomic::{AtomicUsize, Ordering};
39+
static COUNTER: AtomicUsize = AtomicUsize::new(0);
40+
let unique = COUNTER.fetch_add(1, Ordering::Relaxed);
41+
WaitFor {
42+
waitee: Waitee {
43+
pollable: self.clone(),
44+
unique,
45+
},
46+
needs_deregistration: false,
47+
}
48+
}
49+
}
50+
51+
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
52+
struct Waitee {
53+
/// This needs to be a reference counted registration, because it may outlive the AsyncPollable
54+
/// &self that it was created from.
55+
pollable: AsyncPollable,
56+
unique: usize,
57+
}
58+
59+
/// A Future that waits for the Pollable's readiness.
60+
#[must_use = "futures do nothing unless polled or .awaited"]
61+
#[derive(Debug)]
62+
pub struct WaitFor {
63+
waitee: Waitee,
64+
needs_deregistration: bool,
65+
}
66+
impl future::Future for WaitFor {
67+
type Output = ();
68+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69+
let reactor = Reactor::current();
70+
if reactor.ready(&self.as_ref().waitee, cx.waker()) {
71+
Poll::Ready(())
72+
} else {
73+
self.as_mut().needs_deregistration = true;
74+
Poll::Pending
75+
}
76+
}
77+
}
78+
impl Drop for WaitFor {
79+
fn drop(&mut self) {
80+
if self.needs_deregistration {
81+
Reactor::current().deregister_waitee(&self.waitee)
82+
}
83+
}
84+
}
85+
1486
/// Manage async system resources for WASI 0.2
1587
#[derive(Debug, Clone)]
1688
pub struct Reactor {
@@ -21,8 +93,8 @@ pub struct Reactor {
2193
/// a lock of the whole.
2294
#[derive(Debug)]
2395
struct InnerReactor {
24-
poller: Poller,
25-
wakers: HashMap<EventKey, Waker>,
96+
pollables: Slab<Pollable>,
97+
wakers: HashMap<Waitee, Waker>,
2698
}
2799

28100
impl Reactor {
@@ -43,7 +115,7 @@ impl Reactor {
43115
pub(crate) fn new() -> Self {
44116
Self {
45117
inner: Rc::new(RefCell::new(InnerReactor {
46-
poller: Poller::new(),
118+
pollables: Slab::new(),
47119
wakers: HashMap::new(),
48120
})),
49121
}
@@ -62,41 +134,152 @@ impl Reactor {
62134
/// reason that we have to call all the wakers - even if by default they
63135
/// will do nothing.
64136
pub(crate) fn block_until(&self) {
137+
let reactor = self.inner.borrow();
138+
139+
// We're about to wait for a number of pollables. When they wake we get
140+
// the *indexes* back for the pollables whose events were available - so
141+
// we need to be able to associate the index with the right waker.
142+
143+
// We start by iterating over the pollables, and keeping note of which
144+
// pollable belongs to which waker
145+
let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len());
146+
let mut targets = Vec::with_capacity(reactor.wakers.len());
147+
for (waitee, waker) in reactor.wakers.iter() {
148+
let pollable_index = waitee.pollable.0.key;
149+
indexed_wakers.push(waker);
150+
targets.push(&reactor.pollables[pollable_index.0]);
151+
}
152+
153+
debug_assert_ne!(
154+
targets.len(),
155+
0,
156+
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
157+
);
158+
159+
// Now that we have that association, we're ready to poll our targets.
160+
// This will block until an event has completed.
161+
let ready_indexes = wasi::io::poll::poll(&targets);
162+
163+
// Once we have the indexes for which pollables are available, we need
164+
// to convert it back to the right keys for the wakers. Earlier we
165+
// established a positional index -> waker key relationship, so we can
166+
// go right ahead and perform a lookup there.
167+
let ready_wakers = ready_indexes
168+
.into_iter()
169+
.map(|index| indexed_wakers[index as usize]);
170+
171+
for waker in ready_wakers {
172+
waker.wake_by_ref()
173+
}
174+
}
175+
176+
/// Turn a Wasi [`Pollable`] into an [`AsyncPollable`]
177+
pub fn schedule(&self, pollable: Pollable) -> AsyncPollable {
178+
let mut reactor = self.inner.borrow_mut();
179+
let key = EventKey(reactor.pollables.insert(pollable));
180+
AsyncPollable(Rc::new(Registration { key }))
181+
}
182+
183+
fn deregister_event(&self, key: EventKey) {
184+
let mut reactor = self.inner.borrow_mut();
185+
reactor.pollables.remove(key.0);
186+
}
187+
188+
fn deregister_waitee(&self, waitee: &Waitee) {
189+
let mut reactor = self.inner.borrow_mut();
190+
reactor.wakers.remove(waitee);
191+
}
192+
193+
fn ready(&self, waitee: &Waitee, waker: &Waker) -> bool {
65194
let mut reactor = self.inner.borrow_mut();
66-
for key in reactor.poller.block_until() {
67-
match reactor.wakers.get(&key) {
68-
Some(waker) => waker.wake_by_ref(),
69-
None => panic!("tried to wake the waker for non-existent `{:?}`", key),
70-
}
195+
let ready = reactor
196+
.pollables
197+
.get(waitee.pollable.0.key.0)
198+
.expect("only live EventKey can be checked for readiness")
199+
.ready();
200+
if !ready {
201+
reactor.wakers.insert(waitee.clone(), waker.clone());
71202
}
203+
ready
72204
}
73205

74206
/// Wait for the pollable to resolve.
75207
pub async fn wait_for(&self, pollable: Pollable) {
76-
let mut pollable = Some(pollable);
77-
let mut key = None;
78-
// This function is the core loop of our function; it will be called
79-
// multiple times as the future is resolving.
80-
future::poll_fn(|cx| {
81-
// Start by taking a lock on the reactor. This is single-threaded
82-
// and short-lived, so it will never be contended.
83-
let mut reactor = self.inner.borrow_mut();
84-
85-
// Schedule interest in the `pollable` on the first iteration. On
86-
// every iteration, register the waker with the reactor.
87-
let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap()));
88-
reactor.wakers.insert(*key, cx.waker().clone());
89-
90-
// Check whether we're ready or need to keep waiting. If we're
91-
// ready, we clean up after ourselves.
92-
if reactor.poller.get(key).unwrap().ready() {
93-
reactor.poller.remove(*key);
94-
reactor.wakers.remove(key);
95-
Poll::Ready(())
96-
} else {
97-
Poll::Pending
98-
}
208+
let p = self.schedule(pollable);
209+
p.wait_for().await
210+
}
211+
}
212+
213+
#[cfg(test)]
214+
mod test {
215+
use super::*;
216+
// Using WASMTIME_LOG, observe that this test doesn't even call poll() - the pollable is ready
217+
// immediately.
218+
#[test]
219+
fn subscribe_no_duration() {
220+
crate::runtime::block_on(async {
221+
let reactor = Reactor::current();
222+
let pollable = wasi::clocks::monotonic_clock::subscribe_duration(0);
223+
let sched = reactor.schedule(pollable);
224+
sched.wait_for().await;
225+
})
226+
}
227+
// Using WASMTIME_LOG, observe that this test calls poll() until the timer is ready.
228+
#[test]
229+
fn subscribe_some_duration() {
230+
crate::runtime::block_on(async {
231+
let reactor = Reactor::current();
232+
let pollable = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
233+
let sched = reactor.schedule(pollable);
234+
sched.wait_for().await;
235+
})
236+
}
237+
238+
// Using WASMTIME_LOG, observe that this test results in a single poll() on the second
239+
// subscription, rather than spinning in poll() with first subscription, which is instantly
240+
// ready, but not what the waker requests.
241+
#[test]
242+
fn subscribe_multiple_durations() {
243+
crate::runtime::block_on(async {
244+
let reactor = Reactor::current();
245+
let now = wasi::clocks::monotonic_clock::subscribe_duration(0);
246+
let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
247+
let now = reactor.schedule(now);
248+
let soon = reactor.schedule(soon);
249+
soon.wait_for().await;
250+
drop(now)
251+
})
252+
}
253+
254+
// Using WASMTIME_LOG, observe that this test results in two calls to poll(), one with both
255+
// pollables because both are awaiting, and one with just the later pollable.
256+
#[test]
257+
fn subscribe_multiple_durations_zipped() {
258+
crate::runtime::block_on(async {
259+
let reactor = Reactor::current();
260+
let start = wasi::clocks::monotonic_clock::now();
261+
let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
262+
let later = wasi::clocks::monotonic_clock::subscribe_duration(40_000_000);
263+
let soon = reactor.schedule(soon);
264+
let later = reactor.schedule(later);
265+
266+
futures_lite::future::zip(
267+
async move {
268+
soon.wait_for().await;
269+
println!(
270+
"*** subscribe_duration(soon) ready ({})",
271+
wasi::clocks::monotonic_clock::now() - start
272+
);
273+
},
274+
async move {
275+
later.wait_for().await;
276+
println!(
277+
"*** subscribe_duration(later) ready ({})",
278+
wasi::clocks::monotonic_clock::now() - start
279+
);
280+
},
281+
)
282+
.await;
99283
})
100-
.await
101284
}
102285
}

0 commit comments

Comments
 (0)