Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 40 additions & 18 deletions crates/guest-rust/src/rt/async_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl FutureState<'_> {

/// Handles the `event{0,1,2}` event codes and returns a corresponding
/// return code along with a flag whether this future is "done" or not.
fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> (u32, bool) {
fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> CallbackCode {
match event0 {
EVENT_NONE => rtdebug!("EVENT_NONE"),
EVENT_SUBTASK => rtdebug!("EVENT_SUBTASK({event1:#x}, {event2:#x})"),
Expand All @@ -171,7 +171,7 @@ impl FutureState<'_> {
// code/bool indicating we're done. The caller will then
// appropriately deallocate this `FutureState` which will
// transitively run all destructors.
return (CALLBACK_CODE_EXIT, true);
return CallbackCode::Exit;
}
_ => unreachable!(),
}
Expand All @@ -198,7 +198,7 @@ impl FutureState<'_> {
///
/// Returns the code representing what happened along with a boolean as to
/// whether this execution is done.
fn poll(&mut self) -> (u32, bool) {
fn poll(&mut self) -> CallbackCode {
self.with_p3_task_set(|me| {
let mut context = Context::from_waker(&me.waker_clone);

Expand All @@ -223,9 +223,9 @@ impl FutureState<'_> {
assert!(me.tasks.is_empty());
if me.remaining_work() {
let waitable = me.waitable_set.as_ref().unwrap().as_raw();
break (CALLBACK_CODE_WAIT | (waitable << 4), false);
break CallbackCode::Wait(waitable);
} else {
break (CALLBACK_CODE_EXIT, true);
break CallbackCode::Exit;
}
}

Expand All @@ -236,12 +236,18 @@ impl FutureState<'_> {
Poll::Pending => {
assert!(!me.tasks.is_empty());
if me.waker.0.load(Ordering::Relaxed) {
break (CALLBACK_CODE_YIELD, false);
let code = if me.remaining_work() {
let waitable = me.waitable_set.as_ref().unwrap().as_raw();
CallbackCode::Poll(waitable)
} else {
CallbackCode::Yield
};
break code;
}

assert!(me.remaining_work());
let waitable = me.waitable_set.as_ref().unwrap().as_raw();
break (CALLBACK_CODE_WAIT | (waitable << 4), false);
break CallbackCode::Wait(waitable);
}
}
}
Expand Down Expand Up @@ -331,10 +337,24 @@ const EVENT_FUTURE_READ: u32 = 4;
const EVENT_FUTURE_WRITE: u32 = 5;
const EVENT_CANCEL: u32 = 6;

const CALLBACK_CODE_EXIT: u32 = 0;
const CALLBACK_CODE_YIELD: u32 = 1;
const CALLBACK_CODE_WAIT: u32 = 2;
const _CALLBACK_CODE_POLL: u32 = 3;
#[derive(PartialEq, Debug)]
enum CallbackCode {
Exit,
Yield,
Wait(u32),
Poll(u32),
}

impl CallbackCode {
fn encode(self) -> u32 {
match self {
CallbackCode::Exit => 0,
CallbackCode::Yield => 1,
CallbackCode::Wait(waitable) => 2 | (waitable << 4),
CallbackCode::Poll(waitable) => 3 | (waitable << 4),
}
}
}

const STATUS_STARTING: u32 = 0;
const STATUS_STARTED: u32 = 1;
Expand Down Expand Up @@ -425,14 +445,14 @@ pub unsafe fn callback(event0: u32, event1: u32, event2: u32) -> u32 {
// our future so deallocate it. Otherwise put our future back in
// context-local storage and forward the code.
unsafe {
let (rc, done) = (*state).callback(event0, event1, event2);
if done {
let rc = (*state).callback(event0, event1, event2);
if rc == CallbackCode::Exit {
drop(Box::from_raw(state));
} else {
context_set(state.cast());
}
rtdebug!(" => (cb) {rc:#x}");
rc
rtdebug!(" => (cb) {rc:?}");
rc.encode()
}
}

Expand All @@ -449,12 +469,14 @@ pub fn block_on<T: 'static>(future: impl Future<Output = T>) -> T {
let mut event = (EVENT_NONE, 0, 0);
loop {
match state.callback(event.0, event.1, event.2) {
(_, true) => {
CallbackCode::Exit => {
drop(state);
break result.unwrap();
}
(CALLBACK_CODE_YIELD, false) => event = state.waitable_set.as_ref().unwrap().poll(),
_ => event = state.waitable_set.as_ref().unwrap().wait(),
CallbackCode::Yield | CallbackCode::Poll(_) => {
event = state.waitable_set.as_ref().unwrap().poll()
}
CallbackCode::Wait(_) => event = state.waitable_set.as_ref().unwrap().wait(),
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/guest-rust/src/rt/async_support/waitable_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ impl WaitableSet {
}

pub fn remove_waitable_from_all_sets(waitable: u32) {
rtdebug!("waitable-set.join({waitable}, 0)");
rtdebug!("waitable.join({waitable}, 0)");
unsafe { join(waitable, 0) }
}

pub fn wait(&self) -> (u32, u32, u32) {
unsafe {
let mut payload = [0; 2];
rtdebug!("waitable-set.wait({}) = ...", self.0.get());
let event0 = wait(self.0.get(), &mut payload);
rtdebug!(
"waitable-set.wait({}) = ({event0}, {:#x}, {:#x})",
Expand All @@ -38,6 +39,7 @@ impl WaitableSet {
pub fn poll(&self) -> (u32, u32, u32) {
unsafe {
let mut payload = [0; 2];
rtdebug!("waitable-set.poll({}) = ...", self.0.get());
let event0 = poll(self.0.get(), &mut payload);
rtdebug!(
"waitable-set.poll({}) = ({event0}, {:#x}, {:#x})",
Expand Down
35 changes: 27 additions & 8 deletions crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ impl Runner<'_> {
// done for async tests at this time to ensure that there's a version of
// composition that's done which is at the same version as wasmparser
// and friends.
let composed = if case.config.wac.is_none() && test_components.len() == 1 {
let composed = if case.config.wac.is_none() {
self.compose_wasm_with_wasm_compose(runner_wasm, test_components)?
} else {
self.compose_wasm_with_wac(case, runner, runner_wasm, test_components)?
Expand Down Expand Up @@ -865,13 +865,32 @@ impl Runner<'_> {
runner_wasm: &Path,
test_components: &[(&Component, &Path)],
) -> Result<Vec<u8>> {
assert!(test_components.len() == 1);
let test_wasm = test_components[0].1;
let mut config = wasm_compose::config::Config::default();
config.definitions = vec![test_wasm.to_path_buf()];
wasm_compose::composer::ComponentComposer::new(runner_wasm, &config)
.compose()
.with_context(|| format!("failed to compose {runner_wasm:?} with {test_wasm:?}"))
assert!(test_components.len() > 0);
let mut last_bytes = None;
let mut path: PathBuf;
for (i, (_component, component_path)) in test_components.iter().enumerate() {
let main = match last_bytes.take() {
Some(bytes) => {
path = runner_wasm.with_extension(&format!("composition{i}.wasm"));
std::fs::write(&path, &bytes)
.with_context(|| format!("failed to write temporary file {path:?}"))?;
path.as_path()
}
None => runner_wasm,
};

let mut config = wasm_compose::config::Config::default();
config.definitions = vec![component_path.to_path_buf()];
last_bytes = Some(
wasm_compose::composer::ComponentComposer::new(main, &config)
.compose()
.with_context(|| {
format!("failed to compose {main:?} with {component_path:?}")
})?,
);
}

Ok(last_bytes.unwrap())
}

fn compose_wasm_with_wac(
Expand Down
13 changes: 13 additions & 0 deletions tests/runtime-async/async/yield-loop-receives-events/leaf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
include!(env!("BINDINGS"));

struct Component;

export!(Component);

impl crate::exports::test::common::i_middle::Guest for Component {
async fn f() {
for _ in 0..2 {
wit_bindgen::yield_async().await;
}
}
}
34 changes: 34 additions & 0 deletions tests/runtime-async/async/yield-loop-receives-events/middle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
include!(env!("BINDINGS"));

use crate::test::common::i_middle::f;
use std::task::Poll;

pub struct Component;

export!(Component);

static mut HIT: bool = false;

impl crate::exports::test::common::i_runner::Guest for Component {
async fn f() {
wit_bindgen::spawn(async move {
f().await;
unsafe {
HIT = true;
}
});

// This is an "infinite loop" but it's also effectively a yield which
// should enable not only making progress on sibling rust-level tasks
// but additionally async events should be deliverable.
std::future::poll_fn(|cx| unsafe {
if HIT {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
include!(env!("BINDINGS"));

fn main() {
wit_bindgen::block_on(async {
crate::test::common::i_runner::f().await;
});
}
24 changes: 24 additions & 0 deletions tests/runtime-async/async/yield-loop-receives-events/test.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//@ dependencies = ['middle', 'leaf']

package test:common;

world runner {
import i-runner;
}

interface i-runner {
f: async func();
}

world middle {
export i-runner;
import i-middle;
}

interface i-middle {
f: async func();
}

world leaf {
export i-middle;
}