Skip to content

Commit 5764da5

Browse files
dicejalexcrichtonrvolosatovs
authored
Revamp component model stream/future host API (again) (#11515)
* Revamp component model stream/future host API (again) This changes the host APIs for dealing with futures and streams from a "rendezvous"-style API to a callback-oriented one. Previously you would create e.g. a `StreamReader`/`StreamWriter` pair and call their `read` and `write` methods, respectively, and those methods would return `Future`s that resolved when the operation was matched with a corresponding `write` or `read` operation on the other end. With the new API, you instead provide a `StreamProducer` trait implementation whe creating the stream, whose `produce` method will be called as soon as a read happens, giving the implementation a chance to respond immediately without making the reader wait for a rendezvous. Likewise, you can match the read end of a stream to a `StreamConsumer` to respond immediately to writes. This model should reduce scheduling overhead and make it easier to e.g. pipe items to/from `AsyncWrite`/`AsyncRead` or `Sink`/`Stream` implementations without needing to explicitly spawn background tasks. In addition, the new API provides direct access to guest read and write buffers for `stream<u8>` operations, enabling zero-copy operations. Other changes: - I've removed the `HostTaskOutput`; we were using it to run extra code with access to the store after a host task completes, but we can do that more elegantly inside the future using `tls::get`. This also allowed me to simplify `Instance::poll_until` a bit. - I've removed the `watch_{reader,writer}` functionality; it's not needed now given that the runtime will automatically dispose of the producer or consumer when the other end of the stream or future is closed -- no need for embedder code to manage that. - In order to make `UntypedWriteBuffer` `Send`, I had to wrap its raw pointer `buf` field in a `SendSyncPtr`. - I've removed `{Future,Stream}Writer` entirely and moved `Instance::{future,stream}` to `{Future,Stream}Reader::new`, respectively. - I've added a bounds check to the beginnings of `Instance::guest_read` and `Instance::guest_write` so that we need not do it later in `Guest{Source,Destination}::remaining`, meaning those functions can be infallible. Note that I haven't updated `wasmtime-wasi` yet to match; that will happen in one or more follow-up commits. Signed-off-by: Joel Dice <[email protected]> * Add `Accessor::getter`, rename `with_data` to `with_getter` * fixup bindgen invocation Signed-off-by: Roman Volosatovs <[email protected]> * add support for zero-length writes/reads to/from host I've added a test to cover this; it also tests direct buffer access for `stream<u8>`, which I realized I forgot to cover earlier. And of course there was a bug 🤦. Signed-off-by: Joel Dice <[email protected]> * add `{Destination,Source}::remaining` methods This can help `Stream{Producer,Consumer}` implementations determine how many items to write or read, respectively. Signed-off-by: Joel Dice <[email protected]> * wasi: migrate sockets to new API Signed-off-by: Roman Volosatovs <[email protected]> * tests: read the socket stream until EOF Signed-off-by: Roman Volosatovs <[email protected]> * p3-sockets: account for cancellation Signed-off-by: Roman Volosatovs <[email protected]> * p3-sockets: mostly ensure byte buffer cancellation-safety Signed-off-by: Roman Volosatovs <[email protected]> * p3-filesystem: switch to new API Signed-off-by: Roman Volosatovs <[email protected]> * fixup! p3-sockets: mostly ensure byte buffer cancellation-safety * p3-cli: switch to new API Signed-off-by: Roman Volosatovs <[email protected]> * p3: limit maximum buffer size Signed-off-by: Roman Volosatovs <[email protected]> * p3-sockets: remove reuseaddr test loop workaround Signed-off-by: Roman Volosatovs <[email protected]> * p3: drive I/O in `when_ready` Signed-off-by: Roman Volosatovs <[email protected]> * fixup! p3: drive I/O in `when_ready` * Refine `Stream{Producer,Consumer}` APIs Per conversations last week with Roman, Alex, and Lann, I've updated these traits to present a lower-level API based on `poll_{consume,produce}` functions and have documented the implementation requirements for various scenarios which have come up in `wasmtime-wasi`, particularly around graceful cancellation. See the doc comments for those functions for details. Signed-off-by: Joel Dice <[email protected]> * being integration of new API Signed-off-by: Roman Volosatovs <[email protected]> * update wasi/src/p3/filesystem to use new stream API This is totally untested so far; I'll run the tests once we have everything else compiling. Signed-off-by: Joel Dice <[email protected]> * update wasi/src/p3/cli to use new stream API This is totally untested and doesn't even compile yet due to a lifetime issue I don't have time to address yet. I'll follow up later with a fix. Signed-off-by: Joel Dice <[email protected]> * fix: remove `'a` bound on `&self` Signed-off-by: Roman Volosatovs <[email protected]> * finish `wasi:sockets` adaptation Signed-off-by: Roman Volosatovs <[email protected]> * finish `wasi:cli` adaptation Note, that this removes the read optimization - let's get the implementation complete first and optimize later Signed-off-by: Roman Volosatovs <[email protected]> * remove redundant loop in sockets Signed-off-by: Roman Volosatovs <[email protected]> * wasi: buffer on 0-length reads Signed-off-by: Roman Volosatovs <[email protected]> * finish `wasi:filesystem` adaptation Signed-off-by: Roman Volosatovs <[email protected]> * remove `MAX_BUFFER_CAPACITY` Signed-off-by: Roman Volosatovs <[email protected]> * refactor `Cursor` usage Signed-off-by: Roman Volosatovs <[email protected]> * impl Default for VecBuffer Signed-off-by: Roman Volosatovs <[email protected]> * refactor: use consistent import styling Signed-off-by: Roman Volosatovs <[email protected]> * feature-gate fs Arc accessors Signed-off-by: Roman Volosatovs <[email protected]> * Update test expectations --------- Signed-off-by: Joel Dice <[email protected]> Signed-off-by: Roman Volosatovs <[email protected]> Co-authored-by: Alex Crichton <[email protected]> Co-authored-by: Roman Volosatovs <[email protected]>
1 parent e3561d5 commit 5764da5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+3691
-2614
lines changed

crates/component-macro/tests/expanded/char_concurrent.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ pub mod foo {
218218
"take-char",
219219
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (char,)| {
220220
wasmtime::component::__internal::Box::pin(async move {
221-
let accessor = &caller.with_data(host_getter);
221+
let accessor = &caller.with_getter(host_getter);
222222
let r = <D as HostWithStore>::take_char(accessor, arg0)
223223
.await;
224224
Ok(r)
@@ -229,7 +229,7 @@ pub mod foo {
229229
"return-char",
230230
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
231231
wasmtime::component::__internal::Box::pin(async move {
232-
let accessor = &caller.with_data(host_getter);
232+
let accessor = &caller.with_getter(host_getter);
233233
let r = <D as HostWithStore>::return_char(accessor).await;
234234
Ok((r,))
235235
})

crates/component-macro/tests/expanded/conventions_concurrent.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ pub mod foo {
286286
"kebab-case",
287287
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
288288
wasmtime::component::__internal::Box::pin(async move {
289-
let accessor = &caller.with_data(host_getter);
289+
let accessor = &caller.with_getter(host_getter);
290290
let r = <D as HostWithStore>::kebab_case(accessor).await;
291291
Ok(r)
292292
})
@@ -299,7 +299,7 @@ pub mod foo {
299299
(arg0,): (LudicrousSpeed,)|
300300
{
301301
wasmtime::component::__internal::Box::pin(async move {
302-
let accessor = &caller.with_data(host_getter);
302+
let accessor = &caller.with_getter(host_getter);
303303
let r = <D as HostWithStore>::foo(accessor, arg0).await;
304304
Ok(r)
305305
})
@@ -309,7 +309,7 @@ pub mod foo {
309309
"function-with-dashes",
310310
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
311311
wasmtime::component::__internal::Box::pin(async move {
312-
let accessor = &caller.with_data(host_getter);
312+
let accessor = &caller.with_getter(host_getter);
313313
let r = <D as HostWithStore>::function_with_dashes(accessor)
314314
.await;
315315
Ok(r)
@@ -320,7 +320,7 @@ pub mod foo {
320320
"function-with-no-weird-characters",
321321
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
322322
wasmtime::component::__internal::Box::pin(async move {
323-
let accessor = &caller.with_data(host_getter);
323+
let accessor = &caller.with_getter(host_getter);
324324
let r = <D as HostWithStore>::function_with_no_weird_characters(
325325
accessor,
326326
)
@@ -333,7 +333,7 @@ pub mod foo {
333333
"apple",
334334
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
335335
wasmtime::component::__internal::Box::pin(async move {
336-
let accessor = &caller.with_data(host_getter);
336+
let accessor = &caller.with_getter(host_getter);
337337
let r = <D as HostWithStore>::apple(accessor).await;
338338
Ok(r)
339339
})
@@ -343,7 +343,7 @@ pub mod foo {
343343
"apple-pear",
344344
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
345345
wasmtime::component::__internal::Box::pin(async move {
346-
let accessor = &caller.with_data(host_getter);
346+
let accessor = &caller.with_getter(host_getter);
347347
let r = <D as HostWithStore>::apple_pear(accessor).await;
348348
Ok(r)
349349
})
@@ -353,7 +353,7 @@ pub mod foo {
353353
"apple-pear-grape",
354354
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
355355
wasmtime::component::__internal::Box::pin(async move {
356-
let accessor = &caller.with_data(host_getter);
356+
let accessor = &caller.with_getter(host_getter);
357357
let r = <D as HostWithStore>::apple_pear_grape(accessor)
358358
.await;
359359
Ok(r)
@@ -364,7 +364,7 @@ pub mod foo {
364364
"a0",
365365
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
366366
wasmtime::component::__internal::Box::pin(async move {
367-
let accessor = &caller.with_data(host_getter);
367+
let accessor = &caller.with_getter(host_getter);
368368
let r = <D as HostWithStore>::a0(accessor).await;
369369
Ok(r)
370370
})
@@ -374,7 +374,7 @@ pub mod foo {
374374
"is-XML",
375375
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
376376
wasmtime::component::__internal::Box::pin(async move {
377-
let accessor = &caller.with_data(host_getter);
377+
let accessor = &caller.with_getter(host_getter);
378378
let r = <D as HostWithStore>::is_xml(accessor).await;
379379
Ok(r)
380380
})
@@ -384,7 +384,7 @@ pub mod foo {
384384
"explicit",
385385
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
386386
wasmtime::component::__internal::Box::pin(async move {
387-
let accessor = &caller.with_data(host_getter);
387+
let accessor = &caller.with_getter(host_getter);
388388
let r = <D as HostWithStore>::explicit(accessor).await;
389389
Ok(r)
390390
})
@@ -394,7 +394,7 @@ pub mod foo {
394394
"explicit-kebab",
395395
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
396396
wasmtime::component::__internal::Box::pin(async move {
397-
let accessor = &caller.with_data(host_getter);
397+
let accessor = &caller.with_getter(host_getter);
398398
let r = <D as HostWithStore>::explicit_kebab(accessor).await;
399399
Ok(r)
400400
})
@@ -404,7 +404,7 @@ pub mod foo {
404404
"bool",
405405
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
406406
wasmtime::component::__internal::Box::pin(async move {
407-
let accessor = &caller.with_data(host_getter);
407+
let accessor = &caller.with_getter(host_getter);
408408
let r = <D as HostWithStore>::bool(accessor).await;
409409
Ok(r)
410410
})

crates/component-macro/tests/expanded/dead-code_concurrent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ pub mod a {
228228
"f",
229229
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
230230
wasmtime::component::__internal::Box::pin(async move {
231-
let accessor = &caller.with_data(host_getter);
231+
let accessor = &caller.with_getter(host_getter);
232232
let r = <D as HostWithStore>::f(accessor).await;
233233
Ok((r,))
234234
})

crates/component-macro/tests/expanded/direct-import_concurrent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ const _: () = {
184184
"foo",
185185
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
186186
wasmtime::component::__internal::Box::pin(async move {
187-
let accessor = &caller.with_data(host_getter);
187+
let accessor = &caller.with_getter(host_getter);
188188
let r = <D as FooImportsWithStore>::foo(accessor).await;
189189
Ok(r)
190190
})

crates/component-macro/tests/expanded/flags_concurrent.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ pub mod foo {
351351
"roundtrip-flag1",
352352
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (Flag1,)| {
353353
wasmtime::component::__internal::Box::pin(async move {
354-
let accessor = &caller.with_data(host_getter);
354+
let accessor = &caller.with_getter(host_getter);
355355
let r = <D as HostWithStore>::roundtrip_flag1(accessor, arg0)
356356
.await;
357357
Ok((r,))
@@ -362,7 +362,7 @@ pub mod foo {
362362
"roundtrip-flag2",
363363
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (Flag2,)| {
364364
wasmtime::component::__internal::Box::pin(async move {
365-
let accessor = &caller.with_data(host_getter);
365+
let accessor = &caller.with_getter(host_getter);
366366
let r = <D as HostWithStore>::roundtrip_flag2(accessor, arg0)
367367
.await;
368368
Ok((r,))
@@ -373,7 +373,7 @@ pub mod foo {
373373
"roundtrip-flag4",
374374
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (Flag4,)| {
375375
wasmtime::component::__internal::Box::pin(async move {
376-
let accessor = &caller.with_data(host_getter);
376+
let accessor = &caller.with_getter(host_getter);
377377
let r = <D as HostWithStore>::roundtrip_flag4(accessor, arg0)
378378
.await;
379379
Ok((r,))
@@ -384,7 +384,7 @@ pub mod foo {
384384
"roundtrip-flag8",
385385
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (Flag8,)| {
386386
wasmtime::component::__internal::Box::pin(async move {
387-
let accessor = &caller.with_data(host_getter);
387+
let accessor = &caller.with_getter(host_getter);
388388
let r = <D as HostWithStore>::roundtrip_flag8(accessor, arg0)
389389
.await;
390390
Ok((r,))
@@ -395,7 +395,7 @@ pub mod foo {
395395
"roundtrip-flag16",
396396
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (Flag16,)| {
397397
wasmtime::component::__internal::Box::pin(async move {
398-
let accessor = &caller.with_data(host_getter);
398+
let accessor = &caller.with_getter(host_getter);
399399
let r = <D as HostWithStore>::roundtrip_flag16(
400400
accessor,
401401
arg0,
@@ -409,7 +409,7 @@ pub mod foo {
409409
"roundtrip-flag32",
410410
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (Flag32,)| {
411411
wasmtime::component::__internal::Box::pin(async move {
412-
let accessor = &caller.with_data(host_getter);
412+
let accessor = &caller.with_getter(host_getter);
413413
let r = <D as HostWithStore>::roundtrip_flag32(
414414
accessor,
415415
arg0,
@@ -423,7 +423,7 @@ pub mod foo {
423423
"roundtrip-flag64",
424424
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (Flag64,)| {
425425
wasmtime::component::__internal::Box::pin(async move {
426-
let accessor = &caller.with_data(host_getter);
426+
let accessor = &caller.with_getter(host_getter);
427427
let r = <D as HostWithStore>::roundtrip_flag64(
428428
accessor,
429429
arg0,

crates/component-macro/tests/expanded/floats_concurrent.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ pub mod foo {
225225
"f32-param",
226226
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (f32,)| {
227227
wasmtime::component::__internal::Box::pin(async move {
228-
let accessor = &caller.with_data(host_getter);
228+
let accessor = &caller.with_getter(host_getter);
229229
let r = <D as HostWithStore>::f32_param(accessor, arg0)
230230
.await;
231231
Ok(r)
@@ -236,7 +236,7 @@ pub mod foo {
236236
"f64-param",
237237
move |caller: &wasmtime::component::Accessor<T>, (arg0,): (f64,)| {
238238
wasmtime::component::__internal::Box::pin(async move {
239-
let accessor = &caller.with_data(host_getter);
239+
let accessor = &caller.with_getter(host_getter);
240240
let r = <D as HostWithStore>::f64_param(accessor, arg0)
241241
.await;
242242
Ok(r)
@@ -247,7 +247,7 @@ pub mod foo {
247247
"f32-result",
248248
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
249249
wasmtime::component::__internal::Box::pin(async move {
250-
let accessor = &caller.with_data(host_getter);
250+
let accessor = &caller.with_getter(host_getter);
251251
let r = <D as HostWithStore>::f32_result(accessor).await;
252252
Ok((r,))
253253
})
@@ -257,7 +257,7 @@ pub mod foo {
257257
"f64-result",
258258
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
259259
wasmtime::component::__internal::Box::pin(async move {
260-
let accessor = &caller.with_data(host_getter);
260+
let accessor = &caller.with_getter(host_getter);
261261
let r = <D as HostWithStore>::f64_result(accessor).await;
262262
Ok((r,))
263263
})

crates/component-macro/tests/expanded/host-world_concurrent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ const _: () = {
184184
"foo",
185185
move |caller: &wasmtime::component::Accessor<T>, (): ()| {
186186
wasmtime::component::__internal::Box::pin(async move {
187-
let accessor = &caller.with_data(host_getter);
187+
let accessor = &caller.with_getter(host_getter);
188188
let r = <D as Host_ImportsWithStore>::foo(accessor).await;
189189
Ok(r)
190190
})

0 commit comments

Comments
 (0)