diff --git a/Cargo.toml b/Cargo.toml index 23416d4..b8e7c52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ serde_json = { workspace = true, optional = true } anyhow.workspace = true clap.workspace = true futures-lite.workspace = true +futures-concurrency.workspace = true humantime.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true @@ -63,6 +64,7 @@ cargo_metadata = "0.18.1" clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" futures-lite = "1.12.0" +futures-concurrency = "7.6" humantime = "2.1.0" heck = "0.5" http = "1.1" diff --git a/src/runtime/block_on.rs b/src/runtime/block_on.rs index 36c6fe1..d38a9de 100644 --- a/src/runtime/block_on.rs +++ b/src/runtime/block_on.rs @@ -37,8 +37,17 @@ where // as awake, reset and poll again. otherwise, block until a // pollable wakes a future. if root.is_awake() { + reactor.nonblock_check_pollables(); root.reset() } else { + // If there are no futures awake or waiting on a WASI + // pollable, its impossible for the reactor to make + // progress, and the only valid behaviors are to sleep + // forever or panic. This should only be reachable if the + // user's Futures are implemented incorrectly. + if !reactor.nonempty_pending_pollables() { + panic!("reactor has no futures which are awake, or are waiting on a WASI pollable to be ready") + } reactor.block_on_pollables() } } diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index 8462bd9..22b34a8 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -126,8 +126,66 @@ impl Reactor { } } + /// The reactor tracks the set of WASI pollables which have an associated + /// Future pending on their readiness. This function returns indicating + /// that set of pollables is not empty. + pub(crate) fn nonempty_pending_pollables(&self) -> bool { + !self.inner.borrow().wakers.is_empty() + } + /// Block until at least one pending pollable is ready, waking a pending future. + /// Precondition: self.nonempty_pending_pollables() is true. pub(crate) fn block_on_pollables(&self) { + self.check_pollables(|targets| { + debug_assert_ne!( + targets.len(), + 0, + "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap" + ); + wasi::io::poll::poll(targets) + + }) + } + + /// Without blocking, check for any ready pollables and wake the + /// associated futures. + pub(crate) fn nonblock_check_pollables(&self) { + // If there are no pollables with associated pending futures, there is + // no work to do here, so return immediately. + if !self.nonempty_pending_pollables() { + return; + } + // Lazily create a pollable which always resolves to ready. + use std::sync::LazyLock; + static READY_POLLABLE: LazyLock = + LazyLock::new(|| wasi::clocks::monotonic_clock::subscribe_duration(0)); + + self.check_pollables(|targets| { + // Create a new set of targets, with the addition of the ready + // pollable: + let ready_index = targets.len(); + let mut new_targets = Vec::with_capacity(ready_index + 1); + new_targets.extend_from_slice(targets); + new_targets.push(&*READY_POLLABLE); + + // Poll is now guaranteed to return immediately, because at least + // one member is ready: + let mut ready_list = wasi::io::poll::poll(&new_targets); + + // Erase our extra ready pollable from the ready list: + ready_list.retain(|e| *e != ready_index as u32); + ready_list + }) + } + + /// Common core of blocking and nonblocking pollable checks. Wakes any + /// futures which are pending on the pollables, according to the result of + /// the check_ready function. + /// Precondition: self.nonempty_pending_pollables() is true. + fn check_pollables(&self, check_ready: F) + where + F: FnOnce(&[&Pollable]) -> Vec, + { let reactor = self.inner.borrow(); // We're about to wait for a number of pollables. When they wake we get @@ -144,15 +202,9 @@ impl Reactor { targets.push(&reactor.pollables[pollable_index.0]); } - debug_assert_ne!( - targets.len(), - 0, - "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap" - ); - - // Now that we have that association, we're ready to poll our targets. - // This will block until an event has completed. - let ready_indexes = wasi::io::poll::poll(&targets); + // Now that we have that association, we're ready to check our targets for readiness. + // (This is either a wasi poll, or the nonblocking variant.) + let ready_indexes = check_ready(&targets); // Once we have the indexes for which pollables are available, we need // to convert it back to the right keys for the wakers. Earlier we @@ -321,4 +373,31 @@ mod test { ); }) } + + #[test] + fn cooperative_concurrency() { + crate::runtime::block_on(async { + let cpu_heavy = async move { + // Simulating a CPU-heavy task that runs for 1 second and yields occasionally + for _ in 0..10 { + std::thread::sleep(std::time::Duration::from_millis(100)); + futures_lite::future::yield_now().await; + } + true + }; + let timeout = async move { + crate::time::Timer::after(crate::time::Duration::from_millis(200)) + .wait() + .await; + false + }; + let mut future_group = futures_concurrency::future::FutureGroup::< + Pin>>, + >::new(); + future_group.insert(Box::pin(cpu_heavy)); + future_group.insert(Box::pin(timeout)); + let result = futures_lite::StreamExt::next(&mut future_group).await; + assert_eq!(result, Some(false), "cpu_heavy task should have timed out"); + }); + } }