Skip to content

Commit 1cf72be

Browse files
authored
Merge pull request #78 from bytecodealliance/pch/fix_73
nonblocking check of pollables when reactor is busy
2 parents 383902a + 18d97a2 commit 1cf72be

File tree

3 files changed

+99
-9
lines changed

3 files changed

+99
-9
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ serde_json = { workspace = true, optional = true }
3232
anyhow.workspace = true
3333
clap.workspace = true
3434
futures-lite.workspace = true
35+
futures-concurrency.workspace = true
3536
humantime.workspace = true
3637
serde = { workspace = true, features = ["derive"] }
3738
serde_json.workspace = true
@@ -63,6 +64,7 @@ cargo_metadata = "0.18.1"
6364
clap = { version = "4.5.26", features = ["derive"] }
6465
futures-core = "0.3.19"
6566
futures-lite = "1.12.0"
67+
futures-concurrency = "7.6"
6668
humantime = "2.1.0"
6769
heck = "0.5"
6870
http = "1.1"

src/runtime/block_on.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,17 @@ where
3737
// as awake, reset and poll again. otherwise, block until a
3838
// pollable wakes a future.
3939
if root.is_awake() {
40+
reactor.nonblock_check_pollables();
4041
root.reset()
4142
} else {
43+
// If there are no futures awake or waiting on a WASI
44+
// pollable, its impossible for the reactor to make
45+
// progress, and the only valid behaviors are to sleep
46+
// forever or panic. This should only be reachable if the
47+
// user's Futures are implemented incorrectly.
48+
if !reactor.nonempty_pending_pollables() {
49+
panic!("reactor has no futures which are awake, or are waiting on a WASI pollable to be ready")
50+
}
4251
reactor.block_on_pollables()
4352
}
4453
}

src/runtime/reactor.rs

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,66 @@ impl Reactor {
126126
}
127127
}
128128

129+
/// The reactor tracks the set of WASI pollables which have an associated
130+
/// Future pending on their readiness. This function returns indicating
131+
/// that set of pollables is not empty.
132+
pub(crate) fn nonempty_pending_pollables(&self) -> bool {
133+
!self.inner.borrow().wakers.is_empty()
134+
}
135+
129136
/// Block until at least one pending pollable is ready, waking a pending future.
137+
/// Precondition: self.nonempty_pending_pollables() is true.
130138
pub(crate) fn block_on_pollables(&self) {
139+
self.check_pollables(|targets| {
140+
debug_assert_ne!(
141+
targets.len(),
142+
0,
143+
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
144+
);
145+
wasi::io::poll::poll(targets)
146+
147+
})
148+
}
149+
150+
/// Without blocking, check for any ready pollables and wake the
151+
/// associated futures.
152+
pub(crate) fn nonblock_check_pollables(&self) {
153+
// If there are no pollables with associated pending futures, there is
154+
// no work to do here, so return immediately.
155+
if !self.nonempty_pending_pollables() {
156+
return;
157+
}
158+
// Lazily create a pollable which always resolves to ready.
159+
use std::sync::LazyLock;
160+
static READY_POLLABLE: LazyLock<Pollable> =
161+
LazyLock::new(|| wasi::clocks::monotonic_clock::subscribe_duration(0));
162+
163+
self.check_pollables(|targets| {
164+
// Create a new set of targets, with the addition of the ready
165+
// pollable:
166+
let ready_index = targets.len();
167+
let mut new_targets = Vec::with_capacity(ready_index + 1);
168+
new_targets.extend_from_slice(targets);
169+
new_targets.push(&*READY_POLLABLE);
170+
171+
// Poll is now guaranteed to return immediately, because at least
172+
// one member is ready:
173+
let mut ready_list = wasi::io::poll::poll(&new_targets);
174+
175+
// Erase our extra ready pollable from the ready list:
176+
ready_list.retain(|e| *e != ready_index as u32);
177+
ready_list
178+
})
179+
}
180+
181+
/// Common core of blocking and nonblocking pollable checks. Wakes any
182+
/// futures which are pending on the pollables, according to the result of
183+
/// the check_ready function.
184+
/// Precondition: self.nonempty_pending_pollables() is true.
185+
fn check_pollables<F>(&self, check_ready: F)
186+
where
187+
F: FnOnce(&[&Pollable]) -> Vec<u32>,
188+
{
131189
let reactor = self.inner.borrow();
132190

133191
// We're about to wait for a number of pollables. When they wake we get
@@ -144,15 +202,9 @@ impl Reactor {
144202
targets.push(&reactor.pollables[pollable_index.0]);
145203
}
146204

147-
debug_assert_ne!(
148-
targets.len(),
149-
0,
150-
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
151-
);
152-
153-
// Now that we have that association, we're ready to poll our targets.
154-
// This will block until an event has completed.
155-
let ready_indexes = wasi::io::poll::poll(&targets);
205+
// Now that we have that association, we're ready to check our targets for readiness.
206+
// (This is either a wasi poll, or the nonblocking variant.)
207+
let ready_indexes = check_ready(&targets);
156208

157209
// Once we have the indexes for which pollables are available, we need
158210
// to convert it back to the right keys for the wakers. Earlier we
@@ -321,4 +373,31 @@ mod test {
321373
);
322374
})
323375
}
376+
377+
#[test]
378+
fn cooperative_concurrency() {
379+
crate::runtime::block_on(async {
380+
let cpu_heavy = async move {
381+
// Simulating a CPU-heavy task that runs for 1 second and yields occasionally
382+
for _ in 0..10 {
383+
std::thread::sleep(std::time::Duration::from_millis(100));
384+
futures_lite::future::yield_now().await;
385+
}
386+
true
387+
};
388+
let timeout = async move {
389+
crate::time::Timer::after(crate::time::Duration::from_millis(200))
390+
.wait()
391+
.await;
392+
false
393+
};
394+
let mut future_group = futures_concurrency::future::FutureGroup::<
395+
Pin<Box<dyn std::future::Future<Output = bool>>>,
396+
>::new();
397+
future_group.insert(Box::pin(cpu_heavy));
398+
future_group.insert(Box::pin(timeout));
399+
let result = futures_lite::StreamExt::next(&mut future_group).await;
400+
assert_eq!(result, Some(false), "cpu_heavy task should have timed out");
401+
});
402+
}
324403
}

0 commit comments

Comments
 (0)