Skip to content

Commit c90abf1

Browse files
committed
Refactor stream/future code to have a single 'state' field instead of 2 bools
1 parent 2c17516 commit c90abf1

File tree

2 files changed

+150
-120
lines changed

2 files changed

+150
-120
lines changed

design/mvp/CanonicalABI.md

Lines changed: 90 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,48 +1422,64 @@ using blocking or completion-based I/O, no buffering is necessary. This
14221422
buffering is analogous to the buffering performed in kernel memory by a
14231423
`pipe()`.
14241424

1425-
Given the above, we can define the `{Readable,Writable}StreamEnd` classes that
1426-
are actually stored in the component instance table. The classes are almost
1427-
entirely symmetric, with the only difference being whether the polymorphic
1428-
`copy` method (used below) calls `read` or `write`:
1425+
The two ends of a stream are stored as separate elements in the component
1426+
instance's table and each end has a separate `CopyState` that reflects what
1427+
*that end* is currently doing or has done. This `state` field is factored
1428+
out into the `CopyEnd` class that is derived below:
14291429
```python
1430-
class StreamEnd(Waitable):
1431-
shared: ReadableStream|WritableStream
1432-
copying: bool
1433-
done: bool
1430+
class CopyState(Enum):
1431+
IDLE = 1
1432+
COPYING = 2
1433+
DONE = 3
14341434

1435-
def __init__(self, shared):
1435+
class CopyEnd(Waitable):
1436+
state: CopyState
1437+
1438+
def __init__(self):
14361439
Waitable.__init__(self)
1437-
self.shared = shared
1438-
self.copying = False
1439-
self.done = False
1440+
self.state = CopyState.IDLE
14401441

14411442
def drop(self):
1442-
trap_if(self.copying)
1443-
self.shared.drop()
1443+
trap_if(self.state == CopyState.COPYING)
14441444
Waitable.drop(self)
1445+
```
1446+
As shown in `drop`, attempting to drop a readable or writable end while a copy
1447+
is in progress traps. This means that client code must take care to wait for
1448+
these operations to finish (potentially cancelling them via
1449+
`stream.cancel-{read,write}`) before dropping.
1450+
1451+
Given the above, we can define the concrete `{Readable,Writable}StreamEnd`
1452+
classes which are almost entirely symmetric, with the only difference being
1453+
whether the polymorphic `copy` method (used below) calls `read` or `write`:
1454+
```python
1455+
class ReadableStreamEnd(CopyEnd):
1456+
shared: ReadableStream
1457+
1458+
def __init__(self, shared):
1459+
CopyEnd.__init__(self)
1460+
self.shared = shared
14451461

1446-
class ReadableStreamEnd(StreamEnd):
14471462
def copy(self, inst, dst, on_copy, on_copy_done):
14481463
self.shared.read(inst, dst, on_copy, on_copy_done)
14491464

1450-
class WritableStreamEnd(StreamEnd):
1465+
def drop(self):
1466+
self.shared.drop()
1467+
CopyEnd.drop(self)
1468+
1469+
class WritableStreamEnd(CopyEnd):
1470+
shared: WritableStream
1471+
1472+
def __init__(self, shared):
1473+
CopyEnd.__init__(self)
1474+
self.shared = shared
1475+
14511476
def copy(self, inst, src, on_copy, on_copy_done):
14521477
self.shared.write(inst, src, on_copy, on_copy_done)
1453-
```
1454-
The `copying` field tracks whether there is an asynchronous read or write in
1455-
progress and is maintained by the definitions of `stream.{read,write}` below.
1456-
The `done` field tracks whether this end has been notified that the other end
1457-
was dropped (via `CopyResult.DROPPED`) and thus no further read/write
1458-
operations are allowed. Importantly, `copying` and `done` are per-*end*, not
1459-
per-*stream* (unlike the fields of `SharedStreamImpl` shown above, which are
1460-
per-stream and shared by both ends via their `shared` field).
14611478

1462-
Dropping a stream end while an asynchronous read or write is in progress traps
1463-
since the async read or write cannot be cancelled without blocking and `drop`
1464-
(called by `stream.drop-{readable,writable}`) is synchronous and non-blocking.
1465-
This means that client code must take care to wait for these operations to
1466-
finish before dropping.
1479+
def drop(self):
1480+
self.shared.drop()
1481+
CopyEnd.drop(self)
1482+
```
14671483

14681484

14691485
#### Future State
@@ -1569,39 +1585,34 @@ Lastly, the `{Readable,Writable}FutureEnd` classes are mostly symmetric with
15691585
`WritableFutureEnd.drop` traps if the writer hasn't successfully written a
15701586
value or been notified of the reader dropping their end:
15711587
```python
1572-
class FutureEnd(Waitable):
1573-
shared: ReadableFuture|WritableFuture
1574-
copying: bool
1575-
done: bool
1588+
class ReadableFutureEnd(CopyEnd):
1589+
shared: ReadableFuture
15761590

15771591
def __init__(self, shared):
1578-
Waitable.__init__(self)
1592+
CopyEnd.__init__(self)
15791593
self.shared = shared
1580-
self.copying = False
1581-
self.done = False
1582-
1583-
def drop(self):
1584-
trap_if(self.copying)
1585-
Waitable.drop(self)
15861594

1587-
class ReadableFutureEnd(FutureEnd):
15881595
def copy(self, inst, src_buffer, on_copy_done):
15891596
self.shared.read(inst, src_buffer, on_copy_done)
15901597

15911598
def drop(self):
15921599
self.shared.drop()
1593-
FutureEnd.drop(self)
1600+
CopyEnd.drop(self)
1601+
1602+
class WritableFutureEnd(CopyEnd):
1603+
shared: WritableFuture
1604+
1605+
def __init__(self, shared):
1606+
CopyEnd.__init__(self)
1607+
self.shared = shared
15941608

1595-
class WritableFutureEnd(FutureEnd):
15961609
def copy(self, inst, dst_buffer, on_copy_done):
15971610
self.shared.write(inst, dst_buffer, on_copy_done)
15981611

15991612
def drop(self):
1600-
trap_if(not self.done)
1601-
FutureEnd.drop(self)
1613+
trap_if(self.state != CopyState.DONE)
1614+
CopyEnd.drop(self)
16021615
```
1603-
The `copying` and `done` fields are maintained by the `future` built-ins
1604-
defined below.
16051616

16061617

16071618
### Despecialization
@@ -2066,8 +2077,8 @@ transitively-borrowed handle.
20662077

20672078
Streams and futures are entirely symmetric, transferring ownership of the
20682079
readable end from the lifting component to the host or lowering component and
2069-
trapping if the readable end is in the middle of `copying` (which would create
2070-
a dangling-pointer situation) or is already `done` (in which case the only
2080+
trapping if the readable end is in the middle of copying (which would create
2081+
a dangling-pointer situation) or is in the `DONE` state (in which case the only
20712082
valid operation is `{stream,future}.drop-{readable,writable}`).
20722083
```python
20732084
def lift_stream(cx, i, t):
@@ -2081,8 +2092,7 @@ def lift_async_value(ReadableEndT, cx, i, t):
20812092
e = cx.inst.table.remove(i)
20822093
trap_if(not isinstance(e, ReadableEndT))
20832094
trap_if(e.shared.t != t)
2084-
trap_if(e.copying)
2085-
trap_if(e.done)
2095+
trap_if(e.state != CopyState.IDLE)
20862096
return e.shared
20872097
```
20882098

@@ -3953,16 +3963,16 @@ def canon_stream_write(stream_t, opts, task, i, ptr, n):
39533963
```
39543964

39553965
Introducing the `stream_copy` function in chunks, `stream_copy` first checks
3956-
that the element at index `i` is of the right type, not already `copying`, and
3957-
not already `done` (as defined next). (In the future, the `copying` trap could
3958-
be relaxed, allowing a finite number of pipelined reads or writes.)
3966+
that the element at index `i` is of the right type and allowed to start a new
3967+
copy. (In the future, the "trap if not `IDLE`" condition could be relaxed to
3968+
allow multiple pipelined reads or writes.)
39593969
```python
39603970
def stream_copy(EndT, BufferT, event_code, stream_t, opts, task, i, ptr, n):
39613971
trap_if(not task.inst.may_leave)
39623972
e = task.inst.table.get(i)
39633973
trap_if(not isinstance(e, EndT))
39643974
trap_if(e.shared.t != stream_t.t)
3965-
trap_if(e.copying or e.done)
3975+
trap_if(e.state != CopyState.IDLE)
39663976
```
39673977
Then a readable or writable buffer is created which (in `Buffer`'s constructor)
39683978
eagerly checks the alignment and bounds of (`i`, `n`). (In the future, the
@@ -3982,18 +3992,19 @@ event is delivered to core wasm. `stream_event` first calls `reclaim_buffer` to
39823992
regain ownership of `buffer` and prevent any further partial reads/writes.
39833993
Thus, up until event delivery, the other end of the stream is free to
39843994
repeatedly read/write from/to `buffer`, ideally filling it up and minimizing
3985-
context switches. Next, `copying` is cleared to reenable future
3986-
`stream.{read,write}` calls. However, if the `CopyResult` is `DROPPED`, `done`
3987-
is set to disallow all future use of this stream end. Lastly, `stream_event`
3988-
packs the `CopyResult` and number of elements copied up until this point into a
3989-
single `i32` payload for core wasm.
3995+
context switches. Next, the stream's `state` is updated based on the result
3996+
being delivered to core wasm so that, once a stream end has been notified that
3997+
the other end dropped, calling anything other than `stream.drop-*` traps.
3998+
Lastly, `stream_event` packs the `CopyResult` and number of elements copied up
3999+
until this point into a single `i32` payload for core wasm.
39904000
```python
39914001
def stream_event(result, reclaim_buffer):
39924002
reclaim_buffer()
3993-
assert(e.copying)
3994-
e.copying = False
4003+
assert(e.state == CopyState.COPYING)
39954004
if result == CopyResult.DROPPED:
3996-
e.done = True
4005+
e.state = CopyState.DONE
4006+
else:
4007+
e.state = CopyState.IDLE
39974008
assert(0 <= result < 2**4)
39984009
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
39994010
packed_result = result | (buffer.progress << 4)
@@ -4005,7 +4016,7 @@ single `i32` payload for core wasm.
40054016
def on_copy_done(result):
40064017
e.set_pending_event(partial(stream_event, result, reclaim_buffer = lambda:()))
40074018

4008-
e.copying = True
4019+
e.state = CopyState.COPYING
40094020
e.copy(task.inst, buffer, on_copy, on_copy_done)
40104021
```
40114022

@@ -4063,7 +4074,7 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, task, i, ptr):
40634074
e = task.inst.table.get(i)
40644075
trap_if(not isinstance(e, EndT))
40654076
trap_if(e.shared.t != future_t.t)
4066-
trap_if(e.copying or e.done)
4077+
trap_if(e.state != CopyState.IDLE)
40674078

40684079
assert(not contains_borrow(future_t))
40694080
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
@@ -4072,27 +4083,28 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, task, i, ptr):
40724083
Next, the `copy` method of `{Readable,Writable}FutureEnd.copy` is called to
40734084
perform the actual read/write. Other than the simplifications allowed by the
40744085
absence of repeated partial copies, the main difference in the following code
4075-
from the stream code is that `future_event` sets the `done` flag for *both* the
4076-
`DROPPED` and `COMPLETED` results, whereas `stream_event` sets `done` only for
4077-
`DROPPED`. This ensures that futures are read/written at most once and futures
4078-
are only passed to other components in a state where they are ready to be
4079-
read/written. Another important difference is that, since the buffer length is
4080-
always implied by the `CopyResult`, the number of elements copied is not packed
4081-
in the high 28 bits; they're always zero.
4086+
from the stream code is that `future_event` transitions the end to the `DONE`
4087+
state (in which the only valid operation is to call `future.drop-*`) on
4088+
*either* the `DROPPED` and `COMPLETED` results. This ensures that futures are
4089+
read/written at most once and futures are only passed to other components in a
4090+
state where they are ready to be read/written. Another important difference is
4091+
that, since the buffer length is always implied by the `CopyResult`, the number
4092+
of elements copied is not packed in the high 28 bits; they're always zero.
40824093
```python
40834094
def future_event(result):
40844095
assert((buffer.remain() == 0) == (result == CopyResult.COMPLETED))
4085-
assert(e.copying)
4086-
e.copying = False
4096+
assert(e.state == CopyState.COPYING)
40874097
if result == CopyResult.DROPPED or result == CopyResult.COMPLETED:
4088-
e.done = True
4098+
e.state = CopyState.DONE
4099+
else:
4100+
e.state = CopyState.IDLE
40894101
return (event_code, i, result)
40904102

40914103
def on_copy_done(result):
40924104
assert(result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE)
40934105
e.set_pending_event(partial(future_event, result))
40944106

4095-
e.copying = True
4107+
e.state = CopyState.COPYING
40964108
e.copy(task.inst, buffer, on_copy_done)
40974109
```
40984110

@@ -4144,7 +4156,7 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
41444156
e = task.inst.table.get(i)
41454157
trap_if(not isinstance(e, EndT))
41464158
trap_if(e.shared.t != stream_or_future_t.t)
4147-
trap_if(not e.copying)
4159+
trap_if(e.state != CopyState.COPYING)
41484160
if not e.has_pending_event():
41494161
e.shared.cancel()
41504162
if not e.has_pending_event():
@@ -4153,7 +4165,7 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
41534165
else:
41544166
return [BLOCKED]
41554167
code,index,payload = e.get_pending_event()
4156-
assert(not e.copying and code == event_code and index == i)
4168+
assert(e.state != CopyState.COPYING and code == event_code and index == i)
41574169
return [payload]
41584170
```
41594171
The *first* check for `e.has_pending_event()` catches the case where the copy has

0 commit comments

Comments
 (0)