Skip to content

Commit 104921d

Browse files
committed
Break waitable.drop into subtask.drop and {stream,future}.close-{readable,writable}
1 parent 487757d commit 104921d

File tree

5 files changed

+189
-103
lines changed

5 files changed

+189
-103
lines changed

design/mvp/Binary.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -292,17 +292,21 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
292292
| 0x0a async?:<async>? m:<core:memdix> => (canon task.wait async? (memory m) (core func)) 🔀
293293
| 0x0b async?:<async>? m:<core:memidx> => (canon task.poll async? (memory m) (core func)) 🔀
294294
| 0x0c async?:<async>? => (canon task.yield async? (core func)) 🔀
295-
| 0x0d => (canon waitable.drop (core func)) 🔀
295+
| 0x0d => (canon subtask.drop (core func)) 🔀
296296
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀
297297
| 0x0f t:<typeidx> opts:<opts> => (canon stream.read t opts (core func)) 🔀
298298
| 0x10 t:<typeidx> opts:<opts> => (canon stream.write t opts (core func)) 🔀
299299
| 0x11 async?:<async?> => (canon stream.cancel-read async? (core func)) 🔀
300300
| 0x12 async?:<async?> => (canon stream.cancel-write async? (core func)) 🔀
301-
| 0x13 t:<typeidx> => (canon future.new t (core func)) 🔀
302-
| 0x14 t:<typeidx> opts:<opts> => (canon future.read t opts (core func)) 🔀
303-
| 0x15 t:<typeidx> opts:<opts> => (canon future.write t opts (core func)) 🔀
304-
| 0x16 async?:<async?> => (canon future.cancel-read async? (core func)) 🔀
305-
| 0x17 async?:<async?> => (canon future.cancel-write async? (core func)) 🔀
301+
| 0x13 t:<typeidx> => (canon stream.close-readable t (core func)) 🔀
302+
| 0x14 t:<typeidx> => (canon stream.close-writable t (core func)) 🔀
303+
| 0x15 t:<typeidx> => (canon future.new t (core func)) 🔀
304+
| 0x16 t:<typeidx> opts:<opts> => (canon future.read t opts (core func)) 🔀
305+
| 0x17 t:<typeidx> opts:<opts> => (canon future.write t opts (core func)) 🔀
306+
| 0x18 async?:<async?> => (canon future.cancel-read async? (core func)) 🔀
307+
| 0x19 async?:<async?> => (canon future.cancel-write async? (core func)) 🔀
308+
| 0x1a t:<typeidx> => (canon future.close-readable t (core func)) 🔀
309+
| 0x1b t:<typeidx> => (canon future.close-writable t (core func)) 🔀
306310
async? ::= 0x00 =>
307311
| 0x01 => async
308312
opts ::= opt*:vec(<canonopt>) => opt*

design/mvp/CanonicalABI.md

Lines changed: 79 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -791,8 +791,8 @@ counter is used below to record the number of unmet obligations to drop the
791791
streams and futures connected to this `Subtask`.
792792
```python
793793
def drop(self):
794-
trap_if(self.enqueued)
795794
trap_if(self.state != CallState.DONE)
795+
assert(not self.enqueued)
796796
self.supertask.todo -= 1
797797
```
798798

@@ -1058,24 +1058,29 @@ and ensures that streams-of-borrows are dropped before the end of the call,
10581058
just like normal `borrow` handles.
10591059

10601060
Given the above logic, the [readable and writable ends] of a stream can be
1061-
concretely implemented by the following two classes. The `copy`, `cancel_copy`
1062-
and `drop` methods are called polymorphically by the common `async_copy`
1063-
routine shared by the `stream.read` and `stream.write` built-ins below.
1061+
concretely implemented by the following two classes. The readable end
1062+
inherits `StreamHandle`'s constructor, which takes an already-created abstract
1063+
`ReadableStream` passed into the component. In contrast, constructing a
1064+
writable end constructs a fresh `ReadableStreamGuestImpl` that will later
1065+
be given to the readable end paired with this writable end. The `copy`,
1066+
`cancel_copy` and `drop` methods are called polymorphically by the common
1067+
`async_copy` routine shared by the `stream.read` and `stream.write` built-ins
1068+
below.
10641069
```python
10651070
class ReadableStreamHandle(StreamHandle):
10661071
async def copy(self, dst, on_block):
10671072
await self.stream.read(dst, on_block)
10681073
async def cancel_copy(self, dst, on_block):
10691074
await self.stream.cancel_read(dst, on_block)
10701075

1071-
class WritableStreamHandle(ReadableStreamGuestImpl, StreamHandle):
1076+
class WritableStreamHandle(StreamHandle):
10721077
def __init__(self, t, inst):
1073-
ReadableStreamGuestImpl.__init__(self, inst)
1074-
StreamHandle.__init__(self, self, t)
1078+
stream = ReadableStreamGuestImpl(inst)
1079+
StreamHandle.__init__(self, stream, t)
10751080
async def copy(self, src, on_block):
1076-
await self.write(src, on_block)
1081+
await self.stream.write(src, on_block)
10771082
async def cancel_copy(self, src, on_block):
1078-
await self.cancel_write(src, on_block)
1083+
await self.stream.cancel_write(src, on_block)
10791084
```
10801085

10811086
Given the above definitions of how `stream` works, a `future` can simply be
@@ -1097,24 +1102,24 @@ class ReadableFutureHandle(FutureHandle):
10971102
if dst.remain() == 0:
10981103
self.stream.close()
10991104

1100-
class WritableFutureHandle(ReadableStreamGuestImpl, FutureHandle):
1105+
class WritableFutureHandle(FutureHandle):
11011106
def __init__(self, t, inst):
1102-
ReadableStreamGuestImpl.__init__(self, inst)
1103-
FutureHandle.__init__(self, self, t)
1107+
stream = ReadableStreamGuestImpl(inst)
1108+
FutureHandle.__init__(self, stream, t)
11041109

11051110
async def copy(self, src, on_block):
11061111
assert(src.remain() == 1)
1107-
await self.write(src, on_block)
1112+
await self.stream.write(src, on_block)
11081113
if src.remain() == 0:
1109-
self.close()
1114+
self.stream.close()
11101115

11111116
async def cancel_copy(self, src, on_block):
11121117
await self.cancel_write(src, on_block)
11131118
if src.remain() == 0:
1114-
self.close()
1119+
self.stream.close()
11151120

11161121
def drop(self):
1117-
trap_if(not self.closed())
1122+
trap_if(not self.stream.closed())
11181123
FutureHandle.drop(self)
11191124
```
11201125
The overridden `WritableFutureHandle.drop` method traps if the internal stream
@@ -2061,12 +2066,13 @@ def lower_future(cx, v, t):
20612066

20622067
def lower_async_value(ReadableHandleT, WritableHandleT, cx, v, t):
20632068
assert(isinstance(v, ReadableStream))
2064-
if isinstance(v, WritableHandleT) and cx.inst is v.impl:
2065-
i = cx.inst.waitables.array.index(v)
2066-
assert(v.paired)
2067-
v.paired = False
2069+
if isinstance(v, ReadableStreamGuestImpl) and cx.inst is v.impl:
2070+
[h] = [h for h in cx.inst.waitables.array if h and h.stream is v]
2071+
assert(h.paired)
2072+
h.paired = False
20682073
if contains_borrow(t):
2069-
v.borrow_scope = None
2074+
h.borrow_scope = None
2075+
i = cx.inst.waitables.array.index(h)
20702076
assert(2**31 > Table.MAX_LENGTH >= i)
20712077
return i | (2**31)
20722078
else:
@@ -2091,12 +2097,14 @@ it itself holds the `WritableStreamHandle` for. Without specially handling
20912097
this case, this would lead to copies from a single linear memory into itself
20922098
which is both inefficient and raises subtle semantic interleaving questions
20932099
that we would rather avoid. To avoid both, this case is detected and the
2094-
`ReadableStream` is "unwrapped" to writable handle, returning the existing
2100+
`ReadableStream` is "unwrapped" to the writable handle, returning the existing
20952101
index of it in the `waitables` table, setting the high bit to signal this fact
20962102
to guest code. Guest code must therefore handle this special case by
20972103
collapsing the two ends of the stream to work fully without guest code (since
20982104
the Canonical ABI is now wholly unnecessary to pass values from writer to
2099-
reader).
2105+
reader). The O(N) searches through the `waitables` table are expected to be
2106+
optimized away by instead storing a pointer or index of the writable handle in
2107+
the stream itself (alongside the `impl` field).
21002108

21012109

21022110
### Flattening
@@ -2969,6 +2977,27 @@ execute, however tasks in *other* component instances may execute. This allows
29692977
a long-running task in one component to avoid starving other components
29702978
without needing support full reentrancy.
29712979

2980+
### 🔀 `canon subtask.drop`
2981+
2982+
For a canonical definition:
2983+
```wasm
2984+
(canon subtask.drop (core func $f))
2985+
```
2986+
validation specifies:
2987+
* `$f` is given type `(func (param i32))`
2988+
2989+
Calling `$f` removes the subtask at the given index from the current
2990+
component instance's `watiable` table, performing the guards and bookkeeping
2991+
defined by `Subtask.drop()`.
2992+
```python
2993+
async def canon_subtask_drop(task, i):
2994+
trap_if(not task.inst.may_leave)
2995+
h = task.inst.waitables.remove(i)
2996+
trap_if(not isinstance(h, Subtask))
2997+
h.drop()
2998+
return []
2999+
```
3000+
29723001
### 🔀 `canon {stream,future}.new`
29733002

29743003
For canonical definitions:
@@ -3177,22 +3206,41 @@ new `waitable` element allocated; the cancellation is simply reported as a
31773206
normal `{STREAM,FUTURE}_{READ,WRITE}` event by the original, now-unblocked
31783207
`read` or `write`.
31793208

3180-
### 🔀 `canon waitable.drop`
3209+
### 🔀 `canon {stream,future}.close-{readable,writable}`
31813210

3182-
For a canonical definition:
3211+
For canonical definitions:
31833212
```wasm
3184-
(canon waitable.drop (core func $f))
3213+
(canon stream.close-readable $t (core func $f))
3214+
(canon stream.close-writable $t (core func $f))
3215+
(canon future.close-readable $t (core func $f))
3216+
(canon future.close-writable $t (core func $f))
31853217
```
31863218
validation specifies:
31873219
* `$f` is given type `(func (param i32))`
31883220

3189-
Calling `$f` removes the indicated waitable (subtask, stream or future) from
3190-
the instance's table, trapping if various conditions aren't met in the
3191-
waitable's `drop()` method.
3221+
Calling `$f` removes the readable or writable end of the stream or future at
3222+
the given index from the current component instance's `waitable` table,
3223+
performing the guards and bookkeeping defined by
3224+
`{Readable,Writable}{Stream,Future}Handle.drop()` above.
31923225
```python
3193-
async def canon_waitable_drop(task, i):
3226+
async def canon_stream_close_readable(t, task, i):
3227+
return await close_async_value(ReadableStreamHandle, t, task, i)
3228+
3229+
async def canon_stream_close_writable(t, task, i):
3230+
return await close_async_value(WritableStreamHandle, t, task, i)
3231+
3232+
async def canon_future_close_readable(t, task, i):
3233+
return await close_async_value(ReadableFutureHandle, t, task, i)
3234+
3235+
async def canon_future_close_writable(t, task, i):
3236+
return await close_async_value(WritableFutureHandle, t, task, i)
3237+
3238+
async def close_async_value(HandleT, t, task, i):
31943239
trap_if(not task.inst.may_leave)
3195-
task.inst.waitables.remove(i).drop()
3240+
h = task.inst.waitables.remove(i)
3241+
trap_if(not isinstance(h, HandleT))
3242+
trap_if(h.t != t)
3243+
h.drop()
31963244
return []
31973245
```
31983246

design/mvp/Explainer.md

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1372,16 +1372,21 @@ canon ::= ...
13721372
| (canon task.wait async? (memory <core:memidx>) (core func <id>?)) 🔀
13731373
| (canon task.poll async? (memory <core:memidx>) (core func <id>?)) 🔀
13741374
| (canon task.yield async? (core func <id>?)) 🔀
1375+
| (canon subtask.drop (core func <id>?)) 🔀
13751376
| (canon stream.new <typeidx> (core func <id>?)) 🔀
13761377
| (canon stream.read <typeidx> <canonopt>* (core func <id>?)) 🔀
13771378
| (canon stream.write <typeidx> <canonopt>* (core func <id>?)) 🔀
13781379
| (canon stream.cancel-read async? (core func <id>?)) 🔀
13791380
| (canon stream.cancel-write async? (core func <id>?)) 🔀
1381+
| (canon stream.close-readable <typeidx> (core func <id>?)) 🔀
1382+
| (canon stream.close-writable <typeidx> (core func <id>?)) 🔀
13801383
| (canon future.new <typeidx> (core func <id>?)) 🔀
13811384
| (canon future.read <typeidx> <canonopt>* (core func <id>?)) 🔀
13821385
| (canon future.write <typeidx> <canonopt>* (core func <id>?)) 🔀
13831386
| (canon future.cancel-read async? (core func <id>?)) 🔀
13841387
| (canon future.cancel-write async? (core func <id>?)) 🔀
1388+
| (canon future.close-readable <typeidx> (core func <id>?)) 🔀
1389+
| (canon future.close-writable <typeidx> (core func <id>?)) 🔀
13851390
| (canon waitable.drop (core func <id>?)) 🔀
13861391
| (canon thread.spawn <typeidx> (core func <id>?)) 🧵
13871392
| (canon thread.hw_concurrency (core func <id>?)) 🧵
@@ -1470,6 +1475,10 @@ switch to another task, allowing a long-running computation to cooperatively
14701475
interleave with other tasks. (See also [`canon_task_yield`] in the Canonical
14711476
ABI explainer.)
14721477

1478+
The `subtask.drop` built-in has type `[i32] -> []` and removes the indicated
1479+
[subtask](Async.md#subtask-and-supertask) from the current instance's subtask
1480+
table, trapping if the subtask isn't done.
1481+
14731482
The `{stream,future}.new` built-ins have type `[] -> [i32]` and return a new
14741483
[writable end](Async.md#streams-and-futures) of a stream or future. (See
14751484
[`canon_stream_new`] in the Canonical ABI explainer for details.)
@@ -1500,11 +1509,11 @@ blocks, the return value is the sentinel "`BLOCKED`" value and the caller must
15001509
or `write`. (See [`canon_stream_cancel_read`] in the Canonical ABI explainer
15011510
for details.)
15021511

1503-
The `waitable.drop` built-in has type `[i32] -> []` and removes the indicated
1504-
[subtask](Async.md#subtask-and-supertask) or [stream or future](Async.md#streams-and-futures)
1505-
from the current instance's [waitables](Async.md#waiting) table, trapping if
1506-
the subtask isn't done or the stream or future is in the middle of reading
1507-
or writing.
1512+
The `{stream,future}.close-{readable,writable}` built-ins have type
1513+
`[i32] -> []` and removes the indicated [stream or future](Async.md#streams-and-futures)
1514+
from the current component instance's [waitables](Async.md#waiting) table,
1515+
trapping if the stream or future has a mismatched direction or type or are in
1516+
the middle of a `read` or `write`.
15081517

15091518
##### 🧵 Threading built-ins
15101519

0 commit comments

Comments
 (0)