Skip to content

Commit 7e39c25

Browse files
authored
move ConcurrentState from ComponentInstance to Store (#11796)
* move `ConcurrentState` from `ComponentInstance` to `Store` This has a few benefits: - No need to specify an instance when creating or piping from a stream or future. - No need to track the instance in an `Accessor`. - You may now execute tasks for multiple instances in a single event loop. The main drawback is that, if one of several instances within a single store traps, it effectively means all instances have trapped, and the store can't be used to create new instances. The way to avoid that is to use separate stores for instances which must be isolated from others. As a result of this change, a lot of code had to move from e.g. `impl Instance` to e.g. `impl StoreOpaque`, so the diff is pretty huge, but the changes themselves are almost entirely non-functional. Fixes #11226 Signed-off-by: Joel Dice <[email protected]> * fix non-component-model-async build Signed-off-by: Joel Dice <[email protected]> * fix outdated doc comment Signed-off-by: Joel Dice <[email protected]> * address review feedback - restore `ComponentStoreData` encapsulation - avoid conditional code duplication in `LiftContext::new` Signed-off-by: Joel Dice <[email protected]> --------- Signed-off-by: Joel Dice <[email protected]>
1 parent a3d6e40 commit 7e39c25

File tree

36 files changed

+2004
-2215
lines changed

36 files changed

+2004
-2215
lines changed

crates/misc/component-async-tests/src/resource_stream.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ impl bindings::local::local::resource_stream::HostWithStore for Ctx {
4444
tx.try_send(access.get().table.push(ResourceStreamX)?)
4545
.unwrap()
4646
}
47-
let instance = access.instance();
48-
Ok(StreamReader::new(instance, access, PipeProducer::new(rx)))
47+
Ok(StreamReader::new(access, PipeProducer::new(rx)))
4948
})
5049
}
5150
}

crates/misc/component-async-tests/tests/scenario/borrowing.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,14 @@ pub async fn test_run_bool(components: &[&str], v: bool) -> Result<()> {
102102
});
103103
}
104104

105-
let instance = linker.instantiate_async(&mut store, &component).await?;
106105
let borrowing_host =
107-
component_async_tests::borrowing_host::bindings::BorrowingHost::new(&mut store, &instance)?;
106+
component_async_tests::borrowing_host::bindings::BorrowingHost::instantiate_async(
107+
&mut store, &component, &linker,
108+
)
109+
.await?;
108110

109-
instance
110-
.run_concurrent(&mut store, async move |accessor| {
111+
store
112+
.run_concurrent(async move |accessor| {
111113
// Start three concurrent calls and then join them all:
112114
let mut futures = FuturesUnordered::new();
113115
for _ in 0..3 {

crates/misc/component-async-tests/tests/scenario/post_return.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::pin::pin;
88
use std::sync::{Arc, Mutex};
99
use std::task::Poll;
1010
use std::time::Duration;
11-
use wasmtime::component::{Accessor, Instance, Linker, ResourceTable};
11+
use wasmtime::component::{Accessor, Linker, ResourceTable};
1212
use wasmtime::{AsContextMut, Engine, Store, StoreContextMut};
1313
use wasmtime_wasi::WasiCtxBuilder;
1414

@@ -71,8 +71,10 @@ async fn test_sleep_post_return(components: &[&str]) -> Result<()> {
7171
},
7272
);
7373

74-
let instance = linker.instantiate_async(&mut store, &component).await?;
75-
let guest = sleep_post_return::SleepPostReturnCallee::new(&mut store, &instance)?;
74+
let guest = sleep_post_return::SleepPostReturnCallee::instantiate_async(
75+
&mut store, &component, &linker,
76+
)
77+
.await?;
7678

7779
async fn run_with(
7880
accessor: &Accessor<Ctx>,
@@ -93,18 +95,17 @@ async fn test_sleep_post_return(components: &[&str]) -> Result<()> {
9395

9496
async fn run(
9597
store: StoreContextMut<'_, Ctx>,
96-
instance: Instance,
9798
guest: &sleep_post_return::SleepPostReturnCallee,
9899
) -> Result<()> {
99-
instance
100-
.run_concurrent(store, async |accessor| {
100+
store
101+
.run_concurrent(async |accessor| {
101102
run_with(accessor, guest).await?;
102103

103104
// Go idle for a bit before doing it again. This tests that
104-
// `Instance::run_concurrent` is okay with having no outstanding
105-
// guest or host tasks to poll for a while, trusting that we'll
106-
// resolve the future independently, with or without giving it
107-
// more work to do.
105+
// `StoreContextMut::run_concurrent` is okay with having no
106+
// outstanding guest or host tasks to poll for a while, trusting
107+
// that we'll resolve the future independently, with or without
108+
// giving it more work to do.
108109
util::sleep(Duration::from_millis(100)).await;
109110

110111
run_with(accessor, guest).await?;
@@ -114,23 +115,21 @@ async fn test_sleep_post_return(components: &[&str]) -> Result<()> {
114115
.await?
115116
}
116117

117-
run(store.as_context_mut(), instance, &guest).await?;
118+
run(store.as_context_mut(), &guest).await?;
118119
// At this point, all subtasks should have exited, meaning no waitables,
119-
// tasks, or other concurrent state should remain present in the instance.
120-
instance.assert_concurrent_state_empty(&mut store);
120+
// tasks, or other concurrent state should remain present in the store.
121+
store.assert_concurrent_state_empty();
121122

122123
// Do it again, but this time cancel the event loop before it exits:
123124
assert!(
124-
future::poll_fn(|cx| Poll::Ready(
125-
pin!(run(store.as_context_mut(), instance, &guest)).poll(cx)
126-
))
127-
.await
128-
.is_pending()
125+
future::poll_fn(|cx| Poll::Ready(pin!(run(store.as_context_mut(), &guest)).poll(cx)))
126+
.await
127+
.is_pending()
129128
);
130129

131130
// Assuming the event loop is cancel-safe, this should complete without
132131
// errors or panics:
133-
run(store.as_context_mut(), instance, &guest).await?;
132+
run(store.as_context_mut(), &guest).await?;
134133

135134
Ok(())
136135
}

crates/misc/component-async-tests/tests/scenario/round_trip.rs

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,9 @@ pub async fn test_round_trip(
236236
component_async_tests::round_trip::bindings::RoundTrip::new(&mut store, &instance)?;
237237

238238
if call_style == 0 || !cfg!(miri) {
239-
// Run the test using `Instance::run_concurrent`:
240-
instance
241-
.run_concurrent(&mut store, {
239+
// Run the test using `StoreContextMut::run_concurrent`:
240+
store
241+
.run_concurrent({
242242
let inputs_and_outputs = inputs_and_outputs
243243
.iter()
244244
.map(|(a, b)| (String::from(*a), String::from(*b)))
@@ -266,7 +266,7 @@ pub async fn test_round_trip(
266266
})
267267
.await??;
268268

269-
instance.assert_concurrent_state_empty(&mut store);
269+
store.assert_concurrent_state_empty();
270270
}
271271

272272
if call_style == 1 || !cfg!(miri) {
@@ -309,23 +309,18 @@ pub async fn test_round_trip(
309309
}
310310

311311
let (tx, rx) = oneshot::channel();
312-
instance.spawn(
313-
&mut store,
314-
Task {
315-
instance,
316-
inputs_and_outputs: inputs_and_outputs
317-
.iter()
318-
.map(|(a, b)| (String::from(*a), String::from(*b)))
319-
.collect::<Vec<_>>(),
320-
tx,
321-
},
322-
);
323-
324-
instance
325-
.run_concurrent(&mut store, async |_| rx.await)
326-
.await??;
327-
328-
instance.assert_concurrent_state_empty(&mut store);
312+
store.spawn(Task {
313+
instance,
314+
inputs_and_outputs: inputs_and_outputs
315+
.iter()
316+
.map(|(a, b)| (String::from(*a), String::from(*b)))
317+
.collect::<Vec<_>>(),
318+
tx,
319+
});
320+
321+
store.run_concurrent(async |_| rx.await).await??;
322+
323+
store.assert_concurrent_state_empty();
329324
}
330325

331326
if call_style == 2 || !cfg!(miri) {
@@ -345,7 +340,7 @@ pub async fn test_round_trip(
345340
);
346341
}
347342

348-
instance.assert_concurrent_state_empty(&mut store);
343+
store.assert_concurrent_state_empty();
349344
}
350345
}
351346

@@ -382,8 +377,8 @@ pub async fn test_round_trip(
382377
.ok_or_else(|| anyhow!("can't find `[async]foo` in instance"))?;
383378

384379
if call_style == 3 || !cfg!(miri) {
385-
instance
386-
.run_concurrent(&mut store, async |store| {
380+
store
381+
.run_concurrent(async |store| {
387382
// Start three concurrent calls and then join them all:
388383
let mut futures = FuturesUnordered::new();
389384
for (input, output) in inputs_and_outputs {
@@ -411,7 +406,7 @@ pub async fn test_round_trip(
411406
})
412407
.await??;
413408

414-
instance.assert_concurrent_state_empty(&mut store);
409+
store.assert_concurrent_state_empty();
415410
}
416411

417412
if call_style == 4 || !cfg!(miri) {
@@ -432,7 +427,7 @@ pub async fn test_round_trip(
432427
foo_function.post_return_async(&mut store).await?;
433428
}
434429

435-
instance.assert_concurrent_state_empty(&mut store);
430+
store.assert_concurrent_state_empty();
436431
}
437432
}
438433

crates/misc/component-async-tests/tests/scenario/round_trip_direct.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,14 @@ async fn test_round_trip_direct(
6060

6161
let mut store = make_store();
6262

63-
let instance = linker.instantiate_async(&mut store, &component).await?;
64-
let round_trip = component_async_tests::round_trip_direct::bindings::RoundTripDirect::new(
65-
&mut store, &instance,
66-
)?;
67-
68-
instance
69-
.run_concurrent(&mut store, {
63+
let round_trip =
64+
component_async_tests::round_trip_direct::bindings::RoundTripDirect::instantiate_async(
65+
&mut store, &component, &linker,
66+
)
67+
.await?;
68+
69+
store
70+
.run_concurrent({
7071
let input = input.to_owned();
7172
let expected_output = expected_output.to_owned();
7273
async move |accessor| {
@@ -115,8 +116,8 @@ async fn test_round_trip_direct(
115116
.ok_or_else(|| anyhow!("can't find `foo` in instance"))?;
116117

117118
// Start three concurrent calls and then join them all:
118-
instance
119-
.run_concurrent(&mut store, async |store| -> wasmtime::Result<_> {
119+
store
120+
.run_concurrent(async |store| -> wasmtime::Result<_> {
120121
let mut futures = FuturesUnordered::new();
121122
for _ in 0..3 {
122123
futures.push(async move {

crates/misc/component-async-tests/tests/scenario/round_trip_many.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ async fn test_round_trip_many(
250250
)?;
251251

252252
if call_style == 0 {
253-
instance
254-
.run_concurrent(&mut store, {
253+
store
254+
.run_concurrent({
255255
let c = c.clone();
256256
let e = e.clone();
257257
let f = f.clone();
@@ -293,7 +293,7 @@ async fn test_round_trip_many(
293293
})
294294
.await??;
295295

296-
instance.assert_concurrent_state_empty(&mut store);
296+
store.assert_concurrent_state_empty();
297297
}
298298

299299
if call_style == 1 {
@@ -329,7 +329,7 @@ async fn test_round_trip_many(
329329
);
330330
}
331331

332-
instance.assert_concurrent_state_empty(&mut store);
332+
store.assert_concurrent_state_empty();
333333
}
334334
}
335335

@@ -391,8 +391,8 @@ async fn test_round_trip_many(
391391
};
392392

393393
if call_style == 2 {
394-
instance
395-
.run_concurrent(&mut store, async |store| -> wasmtime::Result<_> {
394+
store
395+
.run_concurrent(async |store| -> wasmtime::Result<_> {
396396
// Start three concurrent calls and then join them all:
397397
let mut futures = FuturesUnordered::new();
398398
for (input, output) in inputs_and_outputs {
@@ -416,7 +416,7 @@ async fn test_round_trip_many(
416416
})
417417
.await??;
418418

419-
instance.assert_concurrent_state_empty(&mut store);
419+
store.assert_concurrent_state_empty();
420420
}
421421

422422
if call_style == 3 {
@@ -433,7 +433,7 @@ async fn test_round_trip_many(
433433
foo_function.post_return_async(&mut store).await?;
434434
}
435435

436-
instance.assert_concurrent_state_empty(&mut store);
436+
store.assert_concurrent_state_empty();
437437
}
438438
}
439439

crates/misc/component-async-tests/tests/scenario/streams.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -143,18 +143,18 @@ pub async fn async_closed_streams() -> Result<()> {
143143
let (mut input_tx, input_rx) = mpsc::channel(1);
144144
let (output_tx, mut output_rx) = mpsc::channel(1);
145145
let reader = if direct_producer {
146-
StreamReader::new(instance, &mut store, DirectPipeProducer(input_rx))
146+
StreamReader::new(&mut store, DirectPipeProducer(input_rx))
147147
} else {
148-
StreamReader::new(instance, &mut store, PipeProducer::new(input_rx))
148+
StreamReader::new(&mut store, PipeProducer::new(input_rx))
149149
};
150150
if direct_consumer {
151151
reader.pipe(&mut store, DirectPipeConsumer(output_tx));
152152
} else {
153153
reader.pipe(&mut store, PipeConsumer::new(output_tx));
154154
}
155155

156-
instance
157-
.run_concurrent(&mut store, async |_| {
156+
store
157+
.run_concurrent(async |_| {
158158
let (a, b) = future::join(
159159
async {
160160
for &value in &values {
@@ -183,11 +183,11 @@ pub async fn async_closed_streams() -> Result<()> {
183183
{
184184
let (input_tx, input_rx) = oneshot::channel();
185185
let (output_tx, output_rx) = oneshot::channel();
186-
FutureReader::new(instance, &mut store, OneshotProducer::new(input_rx))
186+
FutureReader::new(&mut store, OneshotProducer::new(input_rx))
187187
.pipe(&mut store, OneshotConsumer::new(output_tx));
188188

189-
instance
190-
.run_concurrent(&mut store, async |_| {
189+
store
190+
.run_concurrent(async |_| {
191191
_ = input_tx.send(value);
192192
assert_eq!(value, output_rx.await?);
193193
anyhow::Ok(())
@@ -198,14 +198,14 @@ pub async fn async_closed_streams() -> Result<()> {
198198
// Next, test stream host->guest
199199
{
200200
let (mut tx, rx) = mpsc::channel(1);
201-
let rx = StreamReader::new(instance, &mut store, PipeProducer::new(rx));
201+
let rx = StreamReader::new(&mut store, PipeProducer::new(rx));
202202

203203
let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;
204204

205205
let values = values.clone();
206206

207-
instance
208-
.run_concurrent(&mut store, async move |accessor| {
207+
store
208+
.run_concurrent(async move |accessor| {
209209
let (a, b) = future::join(
210210
async {
211211
for &value in &values {
@@ -230,14 +230,14 @@ pub async fn async_closed_streams() -> Result<()> {
230230
// Next, test futures host->guest
231231
{
232232
let (tx, rx) = oneshot::channel();
233-
let rx = FutureReader::new(instance, &mut store, OneshotProducer::new(rx));
233+
let rx = FutureReader::new(&mut store, OneshotProducer::new(rx));
234234
let (_, rx_ignored) = oneshot::channel();
235-
let rx_ignored = FutureReader::new(instance, &mut store, OneshotProducer::new(rx_ignored));
235+
let rx_ignored = FutureReader::new(&mut store, OneshotProducer::new(rx_ignored));
236236

237237
let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;
238238

239-
instance
240-
.run_concurrent(&mut store, async move |accessor| {
239+
store
240+
.run_concurrent(async move |accessor| {
241241
_ = tx.send(value);
242242
closed_streams
243243
.local_local_closed()
@@ -284,8 +284,8 @@ pub async fn async_closed_stream() -> Result<()> {
284284

285285
let instance = linker.instantiate_async(&mut store, &component).await?;
286286
let guest = closed_stream::ClosedStreamGuest::new(&mut store, &instance)?;
287-
instance
288-
.run_concurrent(&mut store, async move |accessor| {
287+
store
288+
.run_concurrent(async move |accessor| {
289289
let stream = guest.local_local_closed_stream().call_get(accessor).await?;
290290

291291
let (tx, mut rx) = mpsc::channel(1);

0 commit comments

Comments
 (0)