Skip to content

Commit dbc176e

Browse files
committed
add stream and future tests
Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent b072b0c commit dbc176e

File tree

4 files changed

+642
-1
lines changed

4 files changed

+642
-1
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package a:b;
2+
3+
interface x {
4+
resource thing {
5+
constructor(s: string);
6+
get: async func() -> string;
7+
}
8+
9+
echo-stream-u8: async func(s: stream<u8>) -> stream<u8>;
10+
echo-future-string: async func(f: future<string>) -> future<string>;
11+
short-reads: async func(s: stream<thing>) -> stream<thing>;
12+
}
13+
14+
world caller {
15+
import x;
16+
17+
export run: async func();
18+
}
19+
20+
world callee {
21+
export x;
22+
}
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
#![allow(
2+
unsafe_code,
3+
reason = "needed to create streams and futures from vtables"
4+
)]
5+
6+
use std::collections::HashMap;
7+
use std::sync::{
8+
Mutex,
9+
atomic::{AtomicUsize, Ordering::Relaxed},
10+
};
11+
use test_programs::*;
12+
use wit_bindgen::{FutureReader, StreamReader, StreamResult, rt::async_support};
13+
14+
static NEXT_THING: AtomicUsize = AtomicUsize::new(1);
15+
static THINGS: Mutex<Option<HashMap<usize, String>>> = Mutex::new(None);
16+
17+
export_test!(struct MyInterpreter);
18+
19+
impl TestCase for MyInterpreter {
20+
fn call_export(
21+
_wit: Wit,
22+
func: ExportFunction,
23+
mut args: impl ExactSizeIterator<Item = Val>,
24+
) -> Option<Val> {
25+
assert_eq!(func.interface(), Some("a:b/x"));
26+
27+
match func.name() {
28+
"[constructor]thing" => {
29+
assert_eq!(func.params().len(), 1);
30+
assert!(matches!(func.params().next(), Some(Type::String)));
31+
assert!(matches!(func.result(), Some(Type::Own(_))));
32+
assert_eq!(args.len(), 1);
33+
34+
let Some(Type::Own(ty)) = func.result() else {
35+
unreachable!();
36+
};
37+
38+
let Some(Val::String(value)) = args.next() else {
39+
unreachable!()
40+
};
41+
42+
let rep = NEXT_THING.fetch_add(1, Relaxed);
43+
44+
THINGS
45+
.lock()
46+
.unwrap()
47+
.get_or_insert_default()
48+
.insert(rep, value);
49+
50+
Some(Val::Own(Own::new(ty, rep)))
51+
}
52+
53+
other => panic!("unknown function {other:?}"),
54+
}
55+
}
56+
57+
async fn call_export_async(
58+
_wit: Wit,
59+
func: ExportFunction,
60+
mut args: impl ExactSizeIterator<Item = Val>,
61+
) -> Option<Val> {
62+
assert_eq!(func.interface(), Some("a:b/x"));
63+
64+
match func.name() {
65+
"[async]echo-stream-u8" => {
66+
assert_eq!(func.params().len(), 1);
67+
assert!(matches!(func.params().next(), Some(Type::Stream(_))));
68+
assert!(matches!(func.result(), Some(Type::Stream(_))));
69+
assert_eq!(args.len(), 1);
70+
71+
let Some(Type::Stream(ty)) = func.params().next() else {
72+
unreachable!()
73+
};
74+
75+
let vtable = get_stream_vtable::<u8>(ty);
76+
77+
let (mut tx, result) = unsafe { async_support::stream_new(vtable) };
78+
79+
let Some(Val::Stream(rx)) = args.next() else {
80+
unreachable!()
81+
};
82+
83+
let mut rx = StreamReader::new(rx, vtable);
84+
85+
async_support::spawn(async move {
86+
let mut chunk = Vec::with_capacity(1024);
87+
loop {
88+
let (status, buf) = rx.read(chunk).await;
89+
chunk = buf;
90+
match status {
91+
StreamResult::Complete(_) => {
92+
chunk = tx.write_all(chunk).await;
93+
assert!(chunk.is_empty());
94+
}
95+
StreamResult::Dropped => break,
96+
StreamResult::Cancelled => unreachable!(),
97+
}
98+
}
99+
});
100+
101+
Some(Val::Stream(result.take_handle()))
102+
}
103+
104+
"[async]echo-future-string" => {
105+
assert_eq!(func.params().len(), 1);
106+
assert!(matches!(func.params().next(), Some(Type::Future(_))));
107+
assert!(matches!(func.result(), Some(Type::Future(_))));
108+
assert_eq!(args.len(), 1);
109+
110+
let Some(Type::Future(ty)) = func.params().next() else {
111+
unreachable!()
112+
};
113+
114+
let vtable = get_future_vtable::<String>(ty);
115+
116+
let (tx, result) = unsafe { async_support::future_new(|| unreachable!(), vtable) };
117+
118+
let Some(Val::Future(rx)) = args.next() else {
119+
unreachable!()
120+
};
121+
122+
let rx = FutureReader::new(rx, vtable);
123+
124+
async_support::spawn(async move { tx.write(rx.await).await.unwrap() });
125+
126+
Some(Val::Future(result.take_handle()))
127+
}
128+
129+
"[async method]thing.get" => {
130+
assert_eq!(func.params().len(), 1);
131+
assert!(matches!(func.params().next(), Some(Type::Borrow(_))));
132+
assert!(matches!(func.result(), Some(Type::String)));
133+
assert_eq!(args.len(), 1);
134+
135+
let Some(Val::Borrow(Borrow::Rep(rep))) = args.next() else {
136+
unreachable!()
137+
};
138+
139+
let value = THINGS
140+
.lock()
141+
.unwrap()
142+
.as_ref()
143+
.unwrap()
144+
.get(&usize::try_from(rep).unwrap())
145+
.unwrap()
146+
.clone();
147+
Some(Val::String(value))
148+
}
149+
150+
"[async]short-reads" => {
151+
assert_eq!(func.params().len(), 1);
152+
assert!(matches!(func.params().next(), Some(Type::Stream(_))));
153+
assert!(matches!(func.result(), Some(Type::Stream(_))));
154+
assert_eq!(args.len(), 1);
155+
156+
let Some(Type::Stream(ty)) = func.params().next() else {
157+
unreachable!()
158+
};
159+
160+
let vtable = get_stream_vtable::<Val>(ty);
161+
162+
let (mut tx, result) = unsafe { async_support::stream_new(vtable) };
163+
164+
let Some(Val::Stream(rx)) = args.next() else {
165+
unreachable!()
166+
};
167+
168+
let mut rx = StreamReader::new(rx, vtable);
169+
170+
async_support::spawn(async move {
171+
// Read only one item at a time, forcing the sender to
172+
// retake ownership of any unwritten items.
173+
let mut received_things = Vec::new();
174+
loop {
175+
let (status, buffer) = rx.read(Vec::with_capacity(1)).await;
176+
received_things.extend(buffer);
177+
match status {
178+
StreamResult::Complete(_) => {}
179+
StreamResult::Dropped => break,
180+
StreamResult::Cancelled => unreachable!(),
181+
}
182+
}
183+
184+
// Write the items all at once. The receiver will only read
185+
// them one at a time, forcing us to retake ownership of the
186+
// unwritten items between writes.
187+
let buffer = tx.write_all(received_things).await;
188+
assert!(buffer.is_empty());
189+
});
190+
191+
Some(Val::Stream(result.take_handle()))
192+
}
193+
194+
other => panic!("unknown function {other:?}"),
195+
}
196+
}
197+
198+
fn resource_dtor(_: Resource, handle: usize) {
199+
assert!(
200+
THINGS
201+
.lock()
202+
.unwrap()
203+
.as_mut()
204+
.unwrap()
205+
.remove(&handle)
206+
.is_some()
207+
);
208+
}
209+
}

0 commit comments

Comments
 (0)