Skip to content

Commit ec68a03

Browse files
authored
don't trap on idle in Instance::run_concurrent (#11756)
Previously, `Instance::run_concurrent` returned `Trap::AsyncDeadlock` when all guest tasks and background host tasks had completed, and yet the future parameter it was passed still hadn't resolved. The theory was that this indicated a mistake on the host embedder's part, but it turns out there are scenarios where this is actually what the embedder wanted. For example, consider a host embedder that implements a pool of worker tasks, each of which runs a loop inside async closure passed to `Instance::run_concurrent`. In this case, each worker accepts jobs (which involve calling guest functions) from a multiple-producer, multiple-consumer job queue, adding them to a `futures::stream::FuturesUnordered` so they can be run concurrently. When all the jobs accepted by a given worker have finished, there may be a lull during which no new jobs are yet available. In that case, the worker _could_ break out of the loop, resolve the future, allow `Instance::run_concurrent` to finish, and wait until the next job arrives before calling `Instance::run_concurrent` again, but that's more awkward (i.e. nested loops, complicated control flow) than just a single loop inside `Instance::run_concurrent` that goes idle now and then. In short, the closure passed to `Instance::run_concurrent` might experience delays between when a set of guest tasks have completed and when the next set are ready to start, and that's not necessarily a bug. Internally, I've added a new `run_concurrent_trap_on_idle` function, which provides the original, trapping behavior, and I'm using it to implement `[Typed]Func::call_async`, in which case it _is_ an error if the event loop goes idle without the future resolving. If this turns out to be useful as part of the public API, we can change the `pub(super)` to `pub`. Signed-off-by: Joel Dice <[email protected]>
1 parent ddcd9b2 commit ec68a03

File tree

4 files changed

+85
-56
lines changed

4 files changed

+85
-56
lines changed

crates/misc/component-async-tests/tests/scenario/post_return.rs

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use super::util::test_run;
22
use crate::scenario::util::{config, make_component};
33
use anyhow::Result;
4+
use component_async_tests::util;
45
use component_async_tests::{Ctx, sleep};
56
use std::future;
67
use std::pin::pin;
78
use std::sync::{Arc, Mutex};
89
use std::task::Poll;
9-
use wasmtime::component::{Instance, Linker, ResourceTable};
10+
use std::time::Duration;
11+
use wasmtime::component::{Accessor, Instance, Linker, ResourceTable};
1012
use wasmtime::{AsContextMut, Engine, Store, StoreContextMut};
1113
use wasmtime_wasi::WasiCtxBuilder;
1214

@@ -70,41 +72,65 @@ async fn test_sleep_post_return(components: &[&str]) -> Result<()> {
7072
);
7173

7274
let instance = linker.instantiate_async(&mut store, &component).await?;
75+
let guest = sleep_post_return::SleepPostReturnCallee::new(&mut store, &instance)?;
76+
77+
async fn run_with(
78+
accessor: &Accessor<Ctx>,
79+
guest: &sleep_post_return::SleepPostReturnCallee,
80+
) -> Result<()> {
81+
// This function should return immediately, then sleep the specified
82+
// number of milliseconds after returning, and then finally exit.
83+
let exit = guest
84+
.local_local_sleep_post_return()
85+
.call_run(accessor, 100)
86+
.await?
87+
.1;
88+
// The function has returned, now we wait for it (and any subtasks
89+
// it may have spawned) to exit.
90+
exit.block(accessor).await;
91+
anyhow::Ok(())
92+
}
7393

74-
async fn run(mut store: StoreContextMut<'_, Ctx>, instance: Instance) -> Result<()> {
75-
let guest = sleep_post_return::SleepPostReturnCallee::new(&mut store, &instance)?;
94+
async fn run(
95+
store: StoreContextMut<'_, Ctx>,
96+
instance: Instance,
97+
guest: &sleep_post_return::SleepPostReturnCallee,
98+
) -> Result<()> {
7699
instance
77100
.run_concurrent(store, async |accessor| {
78-
// This function should return immediately, then sleep the specified
79-
// number of milliseconds after returning, and then finally exit.
80-
let exit = guest
81-
.local_local_sleep_post_return()
82-
.call_run(accessor, 100)
83-
.await?
84-
.1;
85-
// The function has returned, now we wait for it (and any subtasks
86-
// it may have spawned) to exit.
87-
exit.block(accessor).await;
101+
run_with(accessor, guest).await?;
102+
103+
// Go idle for a bit before doing it again. This tests that
104+
// `Instance::run_concurrent` is okay with having no outstanding
105+
// guest or host tasks to poll for a while, trusting that we'll
106+
// resolve the future independently, with or without giving it
107+
// more work to do.
108+
util::sleep(Duration::from_millis(100)).await;
109+
110+
run_with(accessor, guest).await?;
111+
88112
anyhow::Ok(())
89113
})
90114
.await?
91115
}
92116

93-
run(store.as_context_mut(), instance).await?;
117+
run(store.as_context_mut(), instance, &guest).await?;
94118
// At this point, all subtasks should have exited, meaning no waitables,
95119
// tasks, or other concurrent state should remain present in the instance.
96120
instance.assert_concurrent_state_empty(&mut store);
97121

98122
// Do it again, but this time cancel the event loop before it exits:
99123
assert!(
100-
future::poll_fn(|cx| Poll::Ready(pin!(run(store.as_context_mut(), instance)).poll(cx)))
101-
.await
102-
.is_pending()
124+
future::poll_fn(|cx| Poll::Ready(
125+
pin!(run(store.as_context_mut(), instance, &guest)).poll(cx)
126+
))
127+
.await
128+
.is_pending()
103129
);
104130

105131
// Assuming the event loop is cancel-safe, this should complete without
106132
// errors or panics:
107-
run(store.as_context_mut(), instance).await?;
133+
run(store.as_context_mut(), instance, &guest).await?;
108134

109135
Ok(())
110136
}

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,9 +1034,8 @@ impl Instance {
10341034
/// Run the specified closure `fun` to completion as part of this instance's
10351035
/// event loop.
10361036
///
1037-
/// Like [`Self::run`], this will run `fun` as part of this instance's event
1038-
/// loop until it yields a result _or_ there are no more tasks to run.
1039-
/// Unlike [`Self::run`], `fun` is provided an [`Accessor`], which provides
1037+
/// This will run `fun` as part of this instance's event loop until it
1038+
/// yields a result. `fun` is provided an [`Accessor`], which provides
10401039
/// controlled access to the `Store` and its data.
10411040
///
10421041
/// This function can be used to invoke [`Func::call_concurrent`] for
@@ -1076,9 +1075,32 @@ impl Instance {
10761075
/// # }
10771076
/// ```
10781077
pub async fn run_concurrent<T, R>(
1078+
self,
1079+
store: impl AsContextMut<Data = T>,
1080+
fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1081+
) -> Result<R>
1082+
where
1083+
T: Send + 'static,
1084+
{
1085+
self.do_run_concurrent(store, fun, false).await
1086+
}
1087+
1088+
pub(super) async fn run_concurrent_trap_on_idle<T, R>(
1089+
self,
1090+
store: impl AsContextMut<Data = T>,
1091+
fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1092+
) -> Result<R>
1093+
where
1094+
T: Send + 'static,
1095+
{
1096+
self.do_run_concurrent(store, fun, true).await
1097+
}
1098+
1099+
async fn do_run_concurrent<T, R>(
10791100
self,
10801101
mut store: impl AsContextMut<Data = T>,
10811102
fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1103+
trap_on_idle: bool,
10821104
) -> Result<R>
10831105
where
10841106
T: Send + 'static,
@@ -1112,7 +1134,7 @@ impl Instance {
11121134
// SAFETY: We never move `dropper` nor its `value` field.
11131135
let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
11141136

1115-
self.poll_until(dropper.store.as_context_mut(), future)
1137+
self.poll_until(dropper.store.as_context_mut(), future, trap_on_idle)
11161138
.await
11171139
}
11181140

@@ -1161,13 +1183,12 @@ impl Instance {
11611183

11621184
/// Run this instance's event loop.
11631185
///
1164-
/// The returned future will resolve when either the specified future
1165-
/// completes (in which case we return its result) or no further progress
1166-
/// can be made (in which case we trap with `Trap::AsyncDeadlock`).
1186+
/// The returned future will resolve when the specified future completes.
11671187
async fn poll_until<T, R>(
11681188
self,
11691189
mut store: StoreContextMut<'_, T>,
11701190
mut future: Pin<&mut impl Future<Output = R>>,
1191+
trap_on_idle: bool,
11711192
) -> Result<R>
11721193
where
11731194
T: Send + 'static,
@@ -1258,35 +1279,17 @@ impl Instance {
12581279
// futures in `ConcurrentState::futures`,
12591280
// there are no remaining work items, _and_
12601281
// the future we were passed as an argument
1261-
// still hasn't completed, meaning we're
1262-
// stuck, so we return an error. The
1263-
// underlying assumption is that `future`
1264-
// depends on this component instance making
1265-
// such progress, and thus there's no point
1266-
// in continuing to poll it given we've run
1267-
// out of work to do.
1268-
//
1269-
// Note that we'd also reach this point if
1270-
// the host embedder passed e.g. a
1271-
// `std::future::Pending` to
1272-
// `Instance::run_concurrent`, in which case
1273-
// we'd return a "deadlock" error even when
1274-
// any and all tasks have completed
1275-
// normally. However, that's not how
1276-
// `Instance::run_concurrent` is intended
1277-
// (and documented) to be used, so it seems
1278-
// reasonable to lump that case in with
1279-
// "real" deadlocks.
1280-
//
1281-
// TODO: Once we've added host APIs for
1282-
// cancelling in-progress tasks, we can
1283-
// return some other, non-error value here,
1284-
// treating it as "normal" and giving the
1285-
// host embedder a chance to intervene by
1286-
// cancelling one or more tasks and/or
1287-
// starting new tasks capable of waking the
1288-
// existing ones.
1289-
Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1282+
// still hasn't completed.
1283+
if trap_on_idle {
1284+
// `trap_on_idle` is true, so we exit
1285+
// immediately.
1286+
Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1287+
} else {
1288+
// `trap_on_idle` is false, so we assume
1289+
// that future will wake up and give us
1290+
// more work to do when it's ready to.
1291+
Poll::Pending
1292+
}
12901293
}
12911294
}
12921295
// There is at least one pending future in

crates/wasmtime/src/runtime/component/func.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ impl Func {
281281
#[cfg(feature = "component-model-async")]
282282
{
283283
self.instance
284-
.run_concurrent(&mut store, async |store| {
284+
.run_concurrent_trap_on_idle(&mut store, async |store| {
285285
self.call_concurrent_dynamic(store, params, results, false)
286286
.await
287287
.map(drop)

crates/wasmtime/src/runtime/component/func/typed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ where
234234
let result = concurrent::queue_call(wrapper.store.as_context_mut(), prepared)?;
235235
self.func
236236
.instance
237-
.run_concurrent(wrapper.store.as_context_mut(), async |_| {
237+
.run_concurrent_trap_on_idle(wrapper.store.as_context_mut(), async |_| {
238238
Ok(result.await?.0)
239239
})
240240
.await?

0 commit comments

Comments
 (0)