Skip to content

Commit aca2a57

Browse files
authored
make Instance::run_concurrent cancel-safe (#11718)
This ensures that the `ConcurrentState::futures` field is restored if the event loop future is dropped while that field was taken; otherwise, the next attempt to run the event loop would panic. Signed-off-by: Joel Dice <[email protected]>
1 parent 9362e47 commit aca2a57

File tree

2 files changed

+70
-31
lines changed

2 files changed

+70
-31
lines changed

crates/misc/component-async-tests/tests/scenario/post_return.rs

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ use super::util::test_run;
22
use crate::scenario::util::{config, make_component};
33
use anyhow::Result;
44
use component_async_tests::{Ctx, sleep};
5+
use std::future;
6+
use std::pin::pin;
57
use std::sync::{Arc, Mutex};
6-
use wasmtime::component::{Linker, ResourceTable};
7-
use wasmtime::{Engine, Store};
8+
use std::task::Poll;
9+
use wasmtime::component::{Instance, Linker, ResourceTable};
10+
use wasmtime::{AsContextMut, Engine, Store, StoreContextMut};
811
use wasmtime_wasi::WasiCtxBuilder;
912

1013
mod sleep_post_return {
@@ -67,24 +70,41 @@ async fn test_sleep_post_return(components: &[&str]) -> Result<()> {
6770
);
6871

6972
let instance = linker.instantiate_async(&mut store, &component).await?;
70-
let guest = sleep_post_return::SleepPostReturnCallee::new(&mut store, &instance)?;
71-
instance
72-
.run_concurrent(&mut store, async |accessor| {
73-
// This function should return immediately, then sleep the specified
74-
// number of milliseconds after returning, and then finally exit.
75-
let exit = guest
76-
.local_local_sleep_post_return()
77-
.call_run(accessor, 100)
78-
.await?
79-
.1;
80-
// The function has returned, now we wait for it (and any subtasks
81-
// it may have spawned) to exit.
82-
exit.block(accessor).await;
83-
anyhow::Ok(())
84-
})
85-
.await??;
73+
74+
async fn run(mut store: StoreContextMut<'_, Ctx>, instance: Instance) -> Result<()> {
75+
let guest = sleep_post_return::SleepPostReturnCallee::new(&mut store, &instance)?;
76+
instance
77+
.run_concurrent(store, async |accessor| {
78+
// This function should return immediately, then sleep the specified
79+
// number of milliseconds after returning, and then finally exit.
80+
let exit = guest
81+
.local_local_sleep_post_return()
82+
.call_run(accessor, 100)
83+
.await?
84+
.1;
85+
// The function has returned, now we wait for it (and any subtasks
86+
// it may have spawned) to exit.
87+
exit.block(accessor).await;
88+
anyhow::Ok(())
89+
})
90+
.await?
91+
}
92+
93+
run(store.as_context_mut(), instance).await?;
8694
// At this point, all subtasks should have exited, meaning no waitables,
8795
// tasks, or other concurrent state should remain present in the instance.
8896
instance.assert_concurrent_state_empty(&mut store);
97+
98+
// Do it again, but this time cancel the event loop before it exits:
99+
assert!(
100+
future::poll_fn(|cx| Poll::Ready(pin!(run(store.as_context_mut(), instance)).poll(cx)))
101+
.await
102+
.is_pending()
103+
);
104+
105+
// Assuming the event loop is cancel-safe, this should complete without
106+
// errors or panics:
107+
run(store.as_context_mut(), instance).await?;
108+
89109
Ok(())
90110
}

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,31 +1170,50 @@ impl Instance {
11701170
mut future: Pin<&mut impl Future<Output = R>>,
11711171
) -> Result<R>
11721172
where
1173-
T: Send,
1173+
T: Send + 'static,
11741174
{
1175+
struct Reset<'a, T: 'static> {
1176+
store: StoreContextMut<'a, T>,
1177+
instance: Instance,
1178+
futures: Option<FuturesUnordered<HostTaskFuture>>,
1179+
}
1180+
1181+
impl<'a, T> Drop for Reset<'a, T> {
1182+
fn drop(&mut self) {
1183+
if let Some(futures) = self.futures.take() {
1184+
*self
1185+
.instance
1186+
.concurrent_state_mut(self.store.0)
1187+
.futures
1188+
.get_mut() = Some(futures);
1189+
}
1190+
}
1191+
}
1192+
11751193
loop {
11761194
// Take `ConcurrentState::futures` out of the instance so we can
11771195
// poll it while also safely giving any of the futures inside access
11781196
// to `self`.
1179-
let mut futures = self
1180-
.concurrent_state_mut(store.0)
1181-
.futures
1182-
.get_mut()
1183-
.take()
1184-
.unwrap();
1185-
let mut next = pin!(futures.next());
1197+
let futures = self.concurrent_state_mut(store.0).futures.get_mut().take();
1198+
let mut reset = Reset {
1199+
store: store.as_context_mut(),
1200+
instance: self,
1201+
futures,
1202+
};
1203+
let mut next = pin!(reset.futures.as_mut().unwrap().next());
11861204

11871205
let result = future::poll_fn(|cx| {
11881206
// First, poll the future we were passed as an argument and
11891207
// return immediately if it's ready.
1190-
if let Poll::Ready(value) = self.set_tls(store.0, || future.as_mut().poll(cx)) {
1208+
if let Poll::Ready(value) = self.set_tls(reset.store.0, || future.as_mut().poll(cx))
1209+
{
11911210
return Poll::Ready(Ok(Either::Left(value)));
11921211
}
11931212

11941213
// Next, poll `ConcurrentState::futures` (which includes any
11951214
// pending host tasks and/or background tasks), returning
11961215
// immediately if one of them fails.
1197-
let next = match self.set_tls(store.0, || next.as_mut().poll(cx)) {
1216+
let next = match self.set_tls(reset.store.0, || next.as_mut().poll(cx)) {
11981217
Poll::Ready(Some(output)) => {
11991218
match output {
12001219
Err(e) => return Poll::Ready(Err(e)),
@@ -1206,7 +1225,7 @@ impl Instance {
12061225
Poll::Pending => Poll::Pending,
12071226
};
12081227

1209-
let mut instance = self.id().get_mut(store.0);
1228+
let mut instance = self.id().get_mut(reset.store.0);
12101229

12111230
// Next, check the "high priority" work queue and return
12121231
// immediately if it has at least one item.
@@ -1231,7 +1250,7 @@ impl Instance {
12311250
// in case one of `ConcurrentState::futures` had
12321251
// the side effect of unblocking it.
12331252
if let Poll::Ready(value) =
1234-
self.set_tls(store.0, || future.as_mut().poll(cx))
1253+
self.set_tls(reset.store.0, || future.as_mut().poll(cx))
12351254
{
12361255
Poll::Ready(Ok(Either::Left(value)))
12371256
} else {
@@ -1290,7 +1309,7 @@ impl Instance {
12901309
// Put the `ConcurrentState::futures` back into the instance before
12911310
// we return or handle any work items since one or more of those
12921311
// items might append more futures.
1293-
*self.concurrent_state_mut(store.0).futures.get_mut() = Some(futures);
1312+
drop(reset);
12941313

12951314
match result? {
12961315
// The future we were passed as an argument completed, so we

0 commit comments

Comments
 (0)