Skip to content

Commit 91abf8c

Browse files
dicejalexcrichton
andauthored
trap on read from and write to intra-component stream/future (#12117)
The spec says we need to do this for streams and futures that have a payload type. Note that `p3_http_middleware_host_to_host` triggers this trap, which is expected given how it functions, so now we assert accordingly. Thanks to Alex for the tests! Fixes #12108 Signed-off-by: Joel Dice <[email protected]> Co-authored-by: Alex Crichton <[email protected]>
1 parent dd06e7a commit 91abf8c

File tree

5 files changed

+144
-4
lines changed

5 files changed

+144
-4
lines changed

crates/wasi-http/tests/all/p3/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,15 @@ async fn p3_http_middleware() -> Result<()> {
330330
}
331331

332332
#[test_log::test(tokio::test(flavor = "multi_thread"))]
333-
async fn p3_http_middleware_host_to_host() -> Result<()> {
334-
test_http_middleware(true).await
333+
async fn p3_http_middleware_host_to_host() {
334+
let error = format!("{:?}", test_http_middleware(true).await.unwrap_err());
335+
336+
let expected = "cannot read from and write to intra-component stream with non-unit payload";
337+
338+
assert!(
339+
error.contains(expected),
340+
"expected `{expected}`; got `{error}`"
341+
);
335342
}
336343

337344
async fn test_http_middleware(host_to_host: bool) -> Result<()> {

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3608,6 +3608,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
36083608
instance
36093609
.guest_write(
36103610
StoreContextMut(self),
3611+
caller,
36113612
TransmitIndex::Future(ty),
36123613
options,
36133614
None,
@@ -3631,6 +3632,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
36313632
instance
36323633
.guest_read(
36333634
StoreContextMut(self),
3635+
caller,
36343636
TransmitIndex::Future(ty),
36353637
options,
36363638
None,
@@ -3655,6 +3657,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
36553657
instance
36563658
.guest_write(
36573659
StoreContextMut(self),
3660+
caller,
36583661
TransmitIndex::Stream(ty),
36593662
options,
36603663
None,
@@ -3679,6 +3682,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
36793682
instance
36803683
.guest_read(
36813684
StoreContextMut(self),
3685+
caller,
36823686
TransmitIndex::Stream(ty),
36833687
options,
36843688
None,
@@ -3716,6 +3720,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
37163720
instance
37173721
.guest_write(
37183722
StoreContextMut(self),
3723+
caller,
37193724
TransmitIndex::Stream(ty),
37203725
options,
37213726
Some(FlatAbi {
@@ -3745,6 +3750,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
37453750
instance
37463751
.guest_read(
37473752
StoreContextMut(self),
3753+
caller,
37483754
TransmitIndex::Stream(ty),
37493755
options,
37503756
Some(FlatAbi {

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1916,6 +1916,7 @@ enum WriteState {
19161916
/// The write end is owned by a guest task and a write is pending.
19171917
GuestReady {
19181918
instance: Instance,
1919+
caller: RuntimeComponentInstanceIndex,
19191920
ty: TransmitIndex,
19201921
flat_abi: Option<FlatAbi>,
19211922
options: OptionsIndex,
@@ -1953,6 +1954,7 @@ enum ReadState {
19531954
/// The read end is owned by a guest task and a read is pending.
19541955
GuestReady {
19551956
ty: TransmitIndex,
1957+
caller: RuntimeComponentInstanceIndex,
19561958
flat_abi: Option<FlatAbi>,
19571959
instance: Instance,
19581960
options: OptionsIndex,
@@ -2672,6 +2674,7 @@ async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: Write
26722674
count,
26732675
handle,
26742676
instance,
2677+
caller,
26752678
} => {
26762679
let guest_offset = guest_offset.unwrap();
26772680

@@ -2746,6 +2749,7 @@ async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: Write
27462749
count,
27472750
handle,
27482751
instance,
2752+
caller,
27492753
};
27502754

27512755
anyhow::Ok(())
@@ -2916,9 +2920,11 @@ impl Instance {
29162920
self,
29172921
mut store: StoreContextMut<T>,
29182922
flat_abi: Option<FlatAbi>,
2923+
write_caller: RuntimeComponentInstanceIndex,
29192924
write_ty: TransmitIndex,
29202925
write_options: OptionsIndex,
29212926
write_address: usize,
2927+
read_caller: RuntimeComponentInstanceIndex,
29222928
read_ty: TransmitIndex,
29232929
read_options: OptionsIndex,
29242930
read_address: usize,
@@ -2930,8 +2936,15 @@ impl Instance {
29302936
(TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
29312937
assert_eq!(count, 1);
29322938

2933-
let val = types[types[write_ty].ty]
2934-
.payload
2939+
let payload = types[types[write_ty].ty].payload;
2940+
2941+
if write_caller == read_caller && payload.is_some() {
2942+
bail!(
2943+
"cannot read from and write to intra-component future with non-unit payload"
2944+
)
2945+
}
2946+
2947+
let val = payload
29352948
.map(|ty| {
29362949
let lift =
29372950
&mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
@@ -2968,6 +2981,12 @@ impl Instance {
29682981
}
29692982
(TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
29702983
if let Some(flat_abi) = flat_abi {
2984+
if write_caller == read_caller && types[types[write_ty].ty].payload.is_some() {
2985+
bail!(
2986+
"cannot read from and write to intra-component stream with non-unit payload"
2987+
)
2988+
}
2989+
29712990
// Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
29722991
let length_in_bytes = usize::try_from(flat_abi.size).unwrap() * count;
29732992
if length_in_bytes > 0 {
@@ -3086,6 +3105,7 @@ impl Instance {
30863105
pub(super) fn guest_write<T: 'static>(
30873106
self,
30883107
mut store: StoreContextMut<T>,
3108+
caller: RuntimeComponentInstanceIndex,
30893109
ty: TransmitIndex,
30903110
options: OptionsIndex,
30913111
flat_abi: Option<FlatAbi>,
@@ -3137,6 +3157,7 @@ impl Instance {
31373157
);
31383158
transmit.write = WriteState::GuestReady {
31393159
instance: self,
3160+
caller,
31403161
ty,
31413162
flat_abi,
31423163
options,
@@ -3156,6 +3177,7 @@ impl Instance {
31563177
count: read_count,
31573178
handle: read_handle,
31583179
instance: read_instance,
3180+
caller: read_caller,
31593181
} => {
31603182
assert_eq!(flat_abi, read_flat_abi);
31613183

@@ -3195,9 +3217,11 @@ impl Instance {
31953217
self.copy(
31963218
store.as_context_mut(),
31973219
flat_abi,
3220+
caller,
31983221
ty,
31993222
options,
32003223
address,
3224+
read_caller,
32013225
read_ty,
32023226
read_options,
32033227
read_address,
@@ -3238,6 +3262,7 @@ impl Instance {
32383262
count: read_count - count,
32393263
handle: read_handle,
32403264
instance: read_instance,
3265+
caller: read_caller,
32413266
};
32423267
}
32433268

@@ -3308,6 +3333,7 @@ impl Instance {
33083333
pub(super) fn guest_read<T: 'static>(
33093334
self,
33103335
mut store: StoreContextMut<T>,
3336+
caller: RuntimeComponentInstanceIndex,
33113337
ty: TransmitIndex,
33123338
options: OptionsIndex,
33133339
flat_abi: Option<FlatAbi>,
@@ -3362,6 +3388,7 @@ impl Instance {
33623388
count,
33633389
handle,
33643390
instance: self,
3391+
caller,
33653392
};
33663393
Ok::<_, crate::Error>(())
33673394
};
@@ -3375,6 +3402,7 @@ impl Instance {
33753402
address: write_address,
33763403
count: write_count,
33773404
handle: write_handle,
3405+
caller: write_caller,
33783406
} => {
33793407
assert_eq!(flat_abi, write_flat_abi);
33803408

@@ -3397,9 +3425,11 @@ impl Instance {
33973425
self.copy(
33983426
store.as_context_mut(),
33993427
flat_abi,
3428+
write_caller,
34003429
write_ty,
34013430
write_options,
34023431
write_address,
3432+
caller,
34033433
ty,
34043434
options,
34053435
address,
@@ -3440,6 +3470,7 @@ impl Instance {
34403470
let transmit = concurrent_state.get_mut(transmit_id)?;
34413471
transmit.write = WriteState::GuestReady {
34423472
instance: self,
3473+
caller: write_caller,
34433474
ty: write_ty,
34443475
flat_abi: write_flat_abi,
34453476
options: write_options,
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
;;! component_model_async = true
2+
;;! component_model_async_builtins = true
3+
4+
(component
5+
(core module $libc (memory (export "m") 1))
6+
(core instance $libc (instantiate $libc))
7+
8+
(type $s (future u32))
9+
(core func $future.new (canon future.new $s))
10+
(core func $future.read (canon future.read $s async (memory $libc "m")))
11+
(core func $future.write (canon future.write $s async (memory $libc "m")))
12+
13+
(core module $m
14+
(import "" "future.new" (func $future.new (result i64)))
15+
(import "" "future.read" (func $future.read (param i32 i32) (result i32)))
16+
(import "" "future.write" (func $future.write (param i32 i32) (result i32)))
17+
18+
(func (export "run")
19+
(local $tmp i64)
20+
(local $r i32)
21+
(local $w i32)
22+
(local.set $tmp (call $future.new))
23+
24+
(local.set $r (i32.wrap_i64 (local.get $tmp)))
25+
(local.set $w (i32.wrap_i64 (i64.shr_u (local.get $tmp) (i64.const 32))))
26+
27+
(call $future.read (local.get $r) (i32.const 0))
28+
i32.const -1 ;; BLOCKED
29+
i32.ne
30+
if unreachable end
31+
32+
(call $future.write (local.get $w) (i32.const 0))
33+
drop
34+
)
35+
)
36+
37+
(core instance $i (instantiate $m
38+
(with "" (instance
39+
(export "future.new" (func $future.new))
40+
(export "future.read" (func $future.read))
41+
(export "future.write" (func $future.write))
42+
))
43+
))
44+
45+
(func (export "run") (canon lift (core func $i "run")))
46+
)
47+
48+
(assert_trap (invoke "run") "cannot read from and write to intra-component future with non-unit payload")
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
;;! component_model_async = true
2+
;;! component_model_async_builtins = true
3+
4+
(component
5+
(core module $libc (memory (export "m") 1))
6+
(core instance $libc (instantiate $libc))
7+
8+
(type $s (stream u32))
9+
(core func $stream.new (canon stream.new $s))
10+
(core func $stream.read (canon stream.read $s async (memory $libc "m")))
11+
(core func $stream.write (canon stream.write $s async (memory $libc "m")))
12+
13+
(core module $m
14+
(import "" "stream.new" (func $stream.new (result i64)))
15+
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
16+
(import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32)))
17+
18+
(func (export "run")
19+
(local $tmp i64)
20+
(local $r i32)
21+
(local $w i32)
22+
(local.set $tmp (call $stream.new))
23+
24+
(local.set $r (i32.wrap_i64 (local.get $tmp)))
25+
(local.set $w (i32.wrap_i64 (i64.shr_u (local.get $tmp) (i64.const 32))))
26+
27+
(call $stream.read (local.get $r) (i32.const 0) (i32.const 4))
28+
i32.const -1 ;; BLOCKED
29+
i32.ne
30+
if unreachable end
31+
32+
(call $stream.write (local.get $w) (i32.const 0) (i32.const 4))
33+
drop
34+
)
35+
)
36+
37+
(core instance $i (instantiate $m
38+
(with "" (instance
39+
(export "stream.new" (func $stream.new))
40+
(export "stream.read" (func $stream.read))
41+
(export "stream.write" (func $stream.write))
42+
))
43+
))
44+
45+
(func (export "run") (canon lift (core func $i "run")))
46+
)
47+
48+
(assert_trap (invoke "run") "cannot read from and write to intra-component stream with non-unit payload")

0 commit comments

Comments
 (0)