From d2c1a7da529cb7cf76f60944580f0a084fbb0616 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 25 Nov 2025 12:55:34 -0800 Subject: [PATCH] rust: Fix receiving async events in busy yield loops This commit fixes an issue with the rust async support where when a yield-loop was detected it suspended with `CALLBACK_CODE_YIELD` which meant that it wasn't possible to deliver async events for other async operations in progress. The fix is to instead return `CALLBACK_CODE_POLL` with the waitable-set that has all the waitables inside of it. This enables delivery of async events from that set to ensure that if the yielding task is waiting on something else that it gets delivered. This also refactors a few things here and there, such as `CALLBACK_CODE_*`, to be less error prone. Support was added for composing 3+ components with `wasm-compose` since `wac` doesn't yet use the same `wasm-tools`. --- crates/guest-rust/src/rt/async_support.rs | 58 +++++++++++++------ .../src/rt/async_support/waitable_set.rs | 4 +- crates/test/src/lib.rs | 35 ++++++++--- .../async/yield-loop-receives-events/leaf.rs | 13 +++++ .../yield-loop-receives-events/middle.rs | 34 +++++++++++ .../yield-loop-receives-events/runner.rs | 7 +++ .../async/yield-loop-receives-events/test.wit | 24 ++++++++ 7 files changed, 148 insertions(+), 27 deletions(-) create mode 100644 tests/runtime-async/async/yield-loop-receives-events/leaf.rs create mode 100644 tests/runtime-async/async/yield-loop-receives-events/middle.rs create mode 100644 tests/runtime-async/async/yield-loop-receives-events/runner.rs create mode 100644 tests/runtime-async/async/yield-loop-receives-events/test.wit diff --git a/crates/guest-rust/src/rt/async_support.rs b/crates/guest-rust/src/rt/async_support.rs index 624953816..1e0168e3a 100644 --- a/crates/guest-rust/src/rt/async_support.rs +++ b/crates/guest-rust/src/rt/async_support.rs @@ -156,7 +156,7 @@ impl FutureState<'_> { /// Handles the `event{0,1,2}` event codes and returns a corresponding /// return code along with a flag whether this future is "done" or not. - fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> (u32, bool) { + fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> CallbackCode { match event0 { EVENT_NONE => rtdebug!("EVENT_NONE"), EVENT_SUBTASK => rtdebug!("EVENT_SUBTASK({event1:#x}, {event2:#x})"), @@ -171,7 +171,7 @@ impl FutureState<'_> { // code/bool indicating we're done. The caller will then // appropriately deallocate this `FutureState` which will // transitively run all destructors. - return (CALLBACK_CODE_EXIT, true); + return CallbackCode::Exit; } _ => unreachable!(), } @@ -198,7 +198,7 @@ impl FutureState<'_> { /// /// Returns the code representing what happened along with a boolean as to /// whether this execution is done. - fn poll(&mut self) -> (u32, bool) { + fn poll(&mut self) -> CallbackCode { self.with_p3_task_set(|me| { let mut context = Context::from_waker(&me.waker_clone); @@ -223,9 +223,9 @@ impl FutureState<'_> { assert!(me.tasks.is_empty()); if me.remaining_work() { let waitable = me.waitable_set.as_ref().unwrap().as_raw(); - break (CALLBACK_CODE_WAIT | (waitable << 4), false); + break CallbackCode::Wait(waitable); } else { - break (CALLBACK_CODE_EXIT, true); + break CallbackCode::Exit; } } @@ -236,12 +236,18 @@ impl FutureState<'_> { Poll::Pending => { assert!(!me.tasks.is_empty()); if me.waker.0.load(Ordering::Relaxed) { - break (CALLBACK_CODE_YIELD, false); + let code = if me.remaining_work() { + let waitable = me.waitable_set.as_ref().unwrap().as_raw(); + CallbackCode::Poll(waitable) + } else { + CallbackCode::Yield + }; + break code; } assert!(me.remaining_work()); let waitable = me.waitable_set.as_ref().unwrap().as_raw(); - break (CALLBACK_CODE_WAIT | (waitable << 4), false); + break CallbackCode::Wait(waitable); } } } @@ -331,10 +337,24 @@ const EVENT_FUTURE_READ: u32 = 4; const EVENT_FUTURE_WRITE: u32 = 5; const EVENT_CANCEL: u32 = 6; -const CALLBACK_CODE_EXIT: u32 = 0; -const CALLBACK_CODE_YIELD: u32 = 1; -const CALLBACK_CODE_WAIT: u32 = 2; -const _CALLBACK_CODE_POLL: u32 = 3; +#[derive(PartialEq, Debug)] +enum CallbackCode { + Exit, + Yield, + Wait(u32), + Poll(u32), +} + +impl CallbackCode { + fn encode(self) -> u32 { + match self { + CallbackCode::Exit => 0, + CallbackCode::Yield => 1, + CallbackCode::Wait(waitable) => 2 | (waitable << 4), + CallbackCode::Poll(waitable) => 3 | (waitable << 4), + } + } +} const STATUS_STARTING: u32 = 0; const STATUS_STARTED: u32 = 1; @@ -425,14 +445,14 @@ pub unsafe fn callback(event0: u32, event1: u32, event2: u32) -> u32 { // our future so deallocate it. Otherwise put our future back in // context-local storage and forward the code. unsafe { - let (rc, done) = (*state).callback(event0, event1, event2); - if done { + let rc = (*state).callback(event0, event1, event2); + if rc == CallbackCode::Exit { drop(Box::from_raw(state)); } else { context_set(state.cast()); } - rtdebug!(" => (cb) {rc:#x}"); - rc + rtdebug!(" => (cb) {rc:?}"); + rc.encode() } } @@ -449,12 +469,14 @@ pub fn block_on(future: impl Future) -> T { let mut event = (EVENT_NONE, 0, 0); loop { match state.callback(event.0, event.1, event.2) { - (_, true) => { + CallbackCode::Exit => { drop(state); break result.unwrap(); } - (CALLBACK_CODE_YIELD, false) => event = state.waitable_set.as_ref().unwrap().poll(), - _ => event = state.waitable_set.as_ref().unwrap().wait(), + CallbackCode::Yield | CallbackCode::Poll(_) => { + event = state.waitable_set.as_ref().unwrap().poll() + } + CallbackCode::Wait(_) => event = state.waitable_set.as_ref().unwrap().wait(), } } } diff --git a/crates/guest-rust/src/rt/async_support/waitable_set.rs b/crates/guest-rust/src/rt/async_support/waitable_set.rs index a1c805bfd..8d7e4a268 100644 --- a/crates/guest-rust/src/rt/async_support/waitable_set.rs +++ b/crates/guest-rust/src/rt/async_support/waitable_set.rs @@ -17,13 +17,14 @@ impl WaitableSet { } pub fn remove_waitable_from_all_sets(waitable: u32) { - rtdebug!("waitable-set.join({waitable}, 0)"); + rtdebug!("waitable.join({waitable}, 0)"); unsafe { join(waitable, 0) } } pub fn wait(&self) -> (u32, u32, u32) { unsafe { let mut payload = [0; 2]; + rtdebug!("waitable-set.wait({}) = ...", self.0.get()); let event0 = wait(self.0.get(), &mut payload); rtdebug!( "waitable-set.wait({}) = ({event0}, {:#x}, {:#x})", @@ -38,6 +39,7 @@ impl WaitableSet { pub fn poll(&self) -> (u32, u32, u32) { unsafe { let mut payload = [0; 2]; + rtdebug!("waitable-set.poll({}) = ...", self.0.get()); let event0 = poll(self.0.get(), &mut payload); rtdebug!( "waitable-set.poll({}) = ({event0}, {:#x}, {:#x})", diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs index a63198a68..6c6601e82 100644 --- a/crates/test/src/lib.rs +++ b/crates/test/src/lib.rs @@ -837,7 +837,7 @@ impl Runner<'_> { // done for async tests at this time to ensure that there's a version of // composition that's done which is at the same version as wasmparser // and friends. - let composed = if case.config.wac.is_none() && test_components.len() == 1 { + let composed = if case.config.wac.is_none() { self.compose_wasm_with_wasm_compose(runner_wasm, test_components)? } else { self.compose_wasm_with_wac(case, runner, runner_wasm, test_components)? @@ -865,13 +865,32 @@ impl Runner<'_> { runner_wasm: &Path, test_components: &[(&Component, &Path)], ) -> Result> { - assert!(test_components.len() == 1); - let test_wasm = test_components[0].1; - let mut config = wasm_compose::config::Config::default(); - config.definitions = vec![test_wasm.to_path_buf()]; - wasm_compose::composer::ComponentComposer::new(runner_wasm, &config) - .compose() - .with_context(|| format!("failed to compose {runner_wasm:?} with {test_wasm:?}")) + assert!(test_components.len() > 0); + let mut last_bytes = None; + let mut path: PathBuf; + for (i, (_component, component_path)) in test_components.iter().enumerate() { + let main = match last_bytes.take() { + Some(bytes) => { + path = runner_wasm.with_extension(&format!("composition{i}.wasm")); + std::fs::write(&path, &bytes) + .with_context(|| format!("failed to write temporary file {path:?}"))?; + path.as_path() + } + None => runner_wasm, + }; + + let mut config = wasm_compose::config::Config::default(); + config.definitions = vec![component_path.to_path_buf()]; + last_bytes = Some( + wasm_compose::composer::ComponentComposer::new(main, &config) + .compose() + .with_context(|| { + format!("failed to compose {main:?} with {component_path:?}") + })?, + ); + } + + Ok(last_bytes.unwrap()) } fn compose_wasm_with_wac( diff --git a/tests/runtime-async/async/yield-loop-receives-events/leaf.rs b/tests/runtime-async/async/yield-loop-receives-events/leaf.rs new file mode 100644 index 000000000..98179227b --- /dev/null +++ b/tests/runtime-async/async/yield-loop-receives-events/leaf.rs @@ -0,0 +1,13 @@ +include!(env!("BINDINGS")); + +struct Component; + +export!(Component); + +impl crate::exports::test::common::i_middle::Guest for Component { + async fn f() { + for _ in 0..2 { + wit_bindgen::yield_async().await; + } + } +} diff --git a/tests/runtime-async/async/yield-loop-receives-events/middle.rs b/tests/runtime-async/async/yield-loop-receives-events/middle.rs new file mode 100644 index 000000000..ab701b7e6 --- /dev/null +++ b/tests/runtime-async/async/yield-loop-receives-events/middle.rs @@ -0,0 +1,34 @@ +include!(env!("BINDINGS")); + +use crate::test::common::i_middle::f; +use std::task::Poll; + +pub struct Component; + +export!(Component); + +static mut HIT: bool = false; + +impl crate::exports::test::common::i_runner::Guest for Component { + async fn f() { + wit_bindgen::spawn(async move { + f().await; + unsafe { + HIT = true; + } + }); + + // This is an "infinite loop" but it's also effectively a yield which + // should enable not only making progress on sibling rust-level tasks + // but additionally async events should be deliverable. + std::future::poll_fn(|cx| unsafe { + if HIT { + Poll::Ready(()) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + .await; + } +} diff --git a/tests/runtime-async/async/yield-loop-receives-events/runner.rs b/tests/runtime-async/async/yield-loop-receives-events/runner.rs new file mode 100644 index 000000000..853e5a1ce --- /dev/null +++ b/tests/runtime-async/async/yield-loop-receives-events/runner.rs @@ -0,0 +1,7 @@ +include!(env!("BINDINGS")); + +fn main() { + wit_bindgen::block_on(async { + crate::test::common::i_runner::f().await; + }); +} diff --git a/tests/runtime-async/async/yield-loop-receives-events/test.wit b/tests/runtime-async/async/yield-loop-receives-events/test.wit new file mode 100644 index 000000000..0e94e22fe --- /dev/null +++ b/tests/runtime-async/async/yield-loop-receives-events/test.wit @@ -0,0 +1,24 @@ +//@ dependencies = ['middle', 'leaf'] + +package test:common; + +world runner { + import i-runner; +} + +interface i-runner { + f: async func(); +} + +world middle { + export i-runner; + import i-middle; +} + +interface i-middle { + f: async func(); +} + +world leaf { + export i-middle; +}