Skip to content

Commit f19cb90

Browse files
committed
Add 'stream' and 'future' types
1 parent 4d3cedf commit f19cb90

File tree

6 files changed

+2313
-232
lines changed

6 files changed

+2313
-232
lines changed

design/mvp/Async.md

Lines changed: 125 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ summary of the motivation and animated sketch of the design in action.
1717
* [Current task](#current-task)
1818
* [Subtask and Supertask](#subtask-and-supertask)
1919
* [Structured concurrency](#structured-concurrency)
20+
* [Streams and Futures](#streams-and-futures)
2021
* [Waiting](#waiting)
2122
* [Backpressure](#backpressure)
2223
* [Returning](#returning)
@@ -106,8 +107,30 @@ Thus, backpressure combined with the partitioning of low-level state provided
106107
by the Component Model enables sync and async code to interoperate while
107108
preserving the expectations of both.
108109

109-
[TODO](#todo): `future` and `stream` types that can be used in function
110-
signatures will be added next.
110+
In addition to being able to define and call whole functions asynchronously,
111+
the `stream` and `future` types can be used in function signatures to pass
112+
parameters and results incrementally over time, achieving finer-grained
113+
concurrency. Streams and futures are thus not defined to be free-standing
114+
resources with their own internal memory buffers (like a traditional channel or
115+
pipe) but, rather, more-primitive control-flow mechanisms that synchronize the
116+
incremental passing of parameters and results during cross-component calls.
117+
Higher-level resources like channels and pipes can then be defined in terms
118+
of these lower-level `stream` and `future` primitives, e.g.:
119+
```wit
120+
resource pipe {
121+
constructor(buffer-size: u32);
122+
write: func(bytes: stream<u8>) -> result;
123+
read: func() -> stream<u8>;
124+
}
125+
```
126+
but also many other domain-specific concurrent resources like WASI HTTP request
127+
and response bodies or WASI blobs. Streams and futures are however high-level
128+
enough to be bound automatically to many source languages' built-in concurrency
129+
features like futures, promises, streams, generators and iterators, unlike
130+
lower-level concurrency primitives (like callbacks or `wasi:[email protected]`
131+
`pollable`s). Thus, the Component Model seeks to provide the lowest-level
132+
fine-grained concurrency primitives that are high-level and idiomatic enough to
133+
enable automatic generation of usable language-integrated bindings.
111134

112135

113136
## Concepts
@@ -180,18 +203,80 @@ invocation of an export by the host. Moreover, at any one point in time, the
180203
set of tasks active in a linked component graph form a forest of async call
181204
trees which e.g., can be visualized using a traditional flamegraph.
182205

183-
The Canonical ABI's Python code enforces Structured Concurrency by maintaining
184-
a simple per-[`Task`] `num_async_subtasks` counter that traps if not zero when
185-
the `Task` finishes.
206+
The Canonical ABI's Python code enforces Structured Concurrency by incrementing
207+
a per-[`Task`] counter when a `Subtask` is created, decrementing when a
208+
`Subtask` is destroyed, and trapping if the counter is not zero when the `Task`
209+
attempts to exit.
210+
211+
### Streams and Futures
212+
213+
Streams and Futures have two "ends": a *readable end* and *writable end*. When
214+
*consuming* a `stream` or `future` value as a parameter (of an export call
215+
with a `stream` or `future` somewhere in the parameter types) or result (of an
216+
import call with a `stream` or `future` somewhere in the result type), the
217+
receiver always gets *unique ownership* of the *readable end* of the `stream`
218+
or `future`. When *producing* a `stream` or `future` value as a parameter (of
219+
an import call) or result (of an export call), the producer can either
220+
*transfer ownership* of a readable end it has already received or it can
221+
create a fresh writable end (via `stream.new` or `future.new`) and lift this
222+
writable end (maintaining ownership of the writable end, but creating a fresh
223+
readable end for the receiver). To maintain the invariant that readable ends
224+
are unique, a writable end can be lifted at most once, trapping otherwise.
225+
226+
Based on this, `stream<T>` and `future<T>` values can be passed between
227+
functions as if they were synchronous `list<T>` and `T` values, resp. For
228+
example, given `f` and `g` with types:
229+
```wit
230+
f: func(x: whatever) -> stream<T>;
231+
g: func(s: stream<T>) -> stuff;
232+
```
233+
`g(f(x))` works as you might hope, concurrently streaming `x` into `f` which
234+
concurrently streams its results into `g`. (The addition of [`error`](#TODO)
235+
will provide a generic answer to the question of what happens if `f`
236+
experiences an error: `f` can close its returned writable stream end with an
237+
`error` that will be propagated into `g` which should then propagate the error
238+
somehow into `stuff`.)
239+
240+
If a component instance *would* receive the readable end of a stream for which
241+
it already owns the writable end, the readable end disappears and the existing
242+
writable end is received instead (since the guest can now handle the whole
243+
stream more efficiently wholly from within guest code). E.g., if the same
244+
component instance defined `f` and `g` above, the composition `g(f(x))` would
245+
just instruct the guest to stream directly from `f` into `g` without crossing a
246+
component boundary or performing any extra copies. Thus, strengthening the
247+
previously-mentioned invariant, the readable and writable ends of a stream are
248+
unique *and never in the same component*.
249+
250+
Given the readable or writable end of a stream, core wasm code can call the
251+
imported `stream.read` or `stream.write` canonical built-ins, passing the
252+
pointer and length of a linear-memory buffer to write-into or read-from, resp.
253+
These built-ins can either return immediately if >0 elements were able to be
254+
written or read immediately (without blocking) or return a sentinel "blocked"
255+
value indicating that the read or write will execute concurrently. The
256+
readable and writable ends of streams and futures each have a well-defined
257+
parent `Task` that will receive "progress" events on all child streams/futures
258+
that have previously blocked.
259+
260+
From a [structured-concurrency](#structured-concurrency) perspective, the
261+
readable and writable ends of streams and futures are leaves of the async call
262+
tree. Unlike subtasks, the parent of the readable ends of streams and future
263+
*can* change over time (when transferred via function call, as mentioned
264+
above). However, there is always *some* parent `Task` and this parent `Task`
265+
is prevented from orphaning its children using the same reference-counting
266+
guard mentioned above for subtasks.
186267

187268
### Waiting
188269

189270
When a component asynchronously lowers an import, it is explicitly requesting
190271
that, if the import blocks, control flow be returned back to the calling task
191-
so that it can do something else. Eventually though a task may run out of other
272+
so that it can do something else. Similarly, if `stream.read` or `stream.write`
273+
would block, they return a "blocked" code so that the caller can continue to
274+
make progress on other things. But eventually, a task will run out of other
192275
things to do and will need to **wait** for progress on one of the task's
193-
subtasks. While a task is waiting, the runtime can switch to other running
194-
tasks or start new tasks by invoking exports.
276+
subtasks, readable stream ends, writable stream ends, readable future ends or
277+
writable future ends, which are collectively called its **waitables**. While a
278+
task is waiting on its waitables, the Component Model runtime can switch to
279+
other running tasks or start new tasks by invoking exports.
195280

196281
The Canonical ABI provides two ways for a task to wait:
197282
* The task can call the [`task.wait`] built-in to synchronously wait for
@@ -234,13 +319,23 @@ the "started" state.
234319

235320
### Returning
236321

237-
The way an async Core WebAssembly function returns its value is by calling
238-
[`task.return`], passing the core values that are to be lifted.
239-
240-
The main reason to have `task.return` is so that a task can continue execution
241-
after returning its value. This is useful for various finalization tasks (such
242-
as logging, billing or metrics) that don't need to be on the critical path of
243-
returning a value to the caller.
322+
The way an async function returns its value is by calling [`task.return`],
323+
passing the core values that are to be lifted as *parameters*. Additionally,
324+
when the `always-task-return` `canonopt` is set, synchronous functions also
325+
return their values by calling `task.return` (as a more expressive and
326+
general alternative to `post-return`).
327+
328+
Returning values by calling `task.return` allows a task to continue executing
329+
even after it has passed its initial results to the caller. This can be useful
330+
for various finalization tasks (freeing memory or performing logging, billing
331+
or metrics operations) that don't need to be on the critical path of returning
332+
a value to the caller, but the major use of executing code after `task.return`
333+
is to continue to read and write from streams and futures. For example, a
334+
stream transformer function of type `func(in: stream<T>) -> stream<U>` will
335+
immediately `task.return` a stream created via `stream.new` and then sit in a
336+
loop interleaving `stream.read`s (of the readable end passed for `in`) and
337+
`stream.write`s (of the writable end it `stream.new`ed) before exiting the
338+
task.
244339

245340
A task may not call `task.return` unless it is in the "started" state. Once
246341
`task.return` is called, the task is in the "returned" state. A task can only
@@ -419,21 +514,24 @@ For now, this remains a [TODO](#todo) and validation will reject `async`-lifted
419514

420515
## TODO
421516

422-
Native async support is being proposed in progressive chunks. The following
423-
features will be added in future chunks to complete "async" in Preview 3:
424-
* `future`/`stream`/`error`: add for use in function types for finer-grained
425-
concurrency
426-
* `subtask.cancel`: allow a supertask to signal to a subtask that its result is
427-
no longer wanted and to please wrap it up promptly
428-
* allow "tail-calling" a subtask so that the current wasm instance can be torn
429-
down eagerly
430-
* `task.index`+`task.wake`: allow tasks in the same instance to wait on and
431-
wake each other (async condvar-style)
517+
Native async support is being proposed incrementally. The following features
518+
will be added in future chunks roughly in the order list to complete the full
519+
"async" story:
520+
* add `error` type that can be included when closing a stream/future
432521
* `nonblocking` function type attribute: allow a function to declare in its
433522
type that it will not transitively do anything blocking
523+
* define what `async` means for `start` functions (top-level await + background
524+
tasks), along with cross-task coordination built-ins
525+
* `subtask.cancel`: allow a supertask to signal to a subtask that its result is
526+
no longer wanted and to please wrap it up promptly
527+
* zero-copy forwarding/splicing and built-in way to "tail-call" a subtask so
528+
that the current wasm instance can be torn down eagerly while preserving
529+
structured concurrency
530+
* some way to say "no more elements are coming for a while"
434531
* `recursive` function type attribute: allow a function to be reentered
435-
recursively (instead of trapping)
436-
* enable `async` `start` functions
532+
recursively (instead of trapping) and link inner and outer activations
533+
* allow pipelining multiple `stream.read`/`write` calls
534+
* allow chaining multiple async calls together ("promise pipelining")
437535
* integrate with `shared`: define how to lift and lower functions `async` *and*
438536
`shared`
439537

design/mvp/Binary.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ defvaltype ::= pvt:<primvaltype> => pvt
202202
| 0x6a t?:<valtype>? u?:<valtype>? => (result t? (error u)?)
203203
| 0x69 i:<typeidx> => (own i)
204204
| 0x68 i:<typeidx> => (borrow i)
205+
| 0x66 i:<typeidx> => (stream i)
206+
| 0x65 i:<typeidx> => (future i)
205207
labelvaltype ::= l:<label'> t:<valtype> => l t
206208
case ::= l:<label'> t?:<valtype>? 0x00 => (case l t?)
207209
label' ::= len:<u32> l:<label> => l (if len = |l|)
@@ -290,7 +292,19 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
290292
| 0x0a m:<core:memdix> => (canon task.wait (memory m) (core func)) 🔀
291293
| 0x0b m:<core:memidx> => (canon task.poll (memory m) (core func)) 🔀
292294
| 0x0c => (canon task.yield (core func)) 🔀
293-
| 0x0d => (canon subtask.drop (core func)) 🔀
295+
| 0x0d => (canon waitable.drop (core func)) 🔀
296+
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀
297+
| 0x0f => (canon stream.read (core func)) 🔀
298+
| 0x10 => (canon stream.write (core func)) 🔀
299+
| 0x11 async?:<async?> => (canon stream.cancel-read async? (core func)) 🔀
300+
| 0x12 async?:<async?> => (canon stream.cancel-write async? (core func)) 🔀
301+
| 0x13 t:<typeidx> => (canon future.new t (core func)) 🔀
302+
| 0x14 => (canon future.read (core func)) 🔀
303+
| 0x15 => (canon future.write (core func)) 🔀
304+
| 0x16 async?:<async?> => (canon future.cancel-read async? (core func)) 🔀
305+
| 0x17 async?:<async?> => (canon future.cancel-write async? (core func)) 🔀
306+
async? ::= 0x00 =>
307+
| 0x01 => async
294308
opts ::= opt*:vec(<canonopt>) => opt*
295309
canonopt ::= 0x00 => string-encoding=utf8
296310
| 0x01 => string-encoding=utf16
@@ -300,6 +314,7 @@ canonopt ::= 0x00 => string-encod
300314
| 0x05 f:<core:funcidx> => (post-return f)
301315
| 0x06 => async 🔀
302316
| 0x07 f:<core:funcidx> => (callback f) 🔀
317+
| 0x08 => always-task-return 🔀
303318
```
304319
Notes:
305320
* The second `0x00` byte in `canon` stands for the `func` sort and thus the

0 commit comments

Comments
 (0)