Skip to content

Commit 3f10ea4

Browse files
committed
Allow sync task.{wait,yield,poll} and {stream,future}.{read,write}
1 parent 3a6ba35 commit 3f10ea4

File tree

5 files changed

+178
-152
lines changed

5 files changed

+178
-152
lines changed

design/mvp/Binary.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,9 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
289289
| 0x06 => (canon thread.hw_concurrency (core func)) 🧵
290290
| 0x08 => (canon task.backpressure (core func)) 🔀
291291
| 0x09 ft:<core:typeidx> => (canon task.return ft (core func)) 🔀
292-
| 0x0a m:<core:memdix> => (canon task.wait (memory m) (core func)) 🔀
293-
| 0x0b m:<core:memidx> => (canon task.poll (memory m) (core func)) 🔀
294-
| 0x0c => (canon task.yield (core func)) 🔀
292+
| 0x0a async?:<async>? m:<core:memdix> => (canon task.wait async? (memory m) (core func)) 🔀
293+
| 0x0b async?:<async>? m:<core:memidx> => (canon task.poll async? (memory m) (core func)) 🔀
294+
| 0x0c async?:<async>? => (canon task.yield async? (core func)) 🔀
295295
| 0x0d => (canon waitable.drop (core func)) 🔀
296296
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀
297297
| 0x0f t:<typeidx> opts:<opts> => (canon stream.read t opts (core func)) 🔀

design/mvp/CanonicalABI.md

Lines changed: 85 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -528,21 +528,34 @@ The conditions in `may_enter` ensure two invariants:
528528

529529
The `wait_on` method, called by `wait` and `yield_` below, blocks the
530530
current task until the given future is resolved, allowing other tasks to make
531-
progress in the meantime. While blocked, another asynchronous task can make a
532-
synchronous import call via `call_sync`, in which case, to preserve
533-
synchronicity, `wait_on` must wait until the synchronous import call is
534-
finished (signalled by `interrupt` being re-set).
535-
```python
536-
async def wait_on(self, f):
531+
progress in the meantime. If called with `sync` set, `interruptible` is
532+
cleared to ensure that no other tasks are allowed to start or resume,
533+
emulating a traditional synchronous system call. If `sync` is not set, then
534+
it's possible that between blocking and resuming, some other task executed and
535+
cleared `interruptible`, and thus `wait_on` must wait until `interruptible` is
536+
set again. If `interruptible` is already clear when `wait_on` is called, then
537+
it is already part of a synchronous call and so there's nothing extra to do.
538+
```python
539+
async def wait_on(self, sync, f):
537540
self.maybe_start_pending_task()
538541
if self.inst.interruptible.is_set():
542+
if sync:
543+
self.inst.interruptible.clear()
539544
v = await self.on_block(f)
540-
while not self.inst.interruptible.is_set():
541-
await self.on_block(self.inst.interruptible.wait())
545+
if sync:
546+
self.inst.interruptible.set()
547+
else:
548+
while not self.inst.interruptible.is_set():
549+
await self.on_block(self.inst.interruptible.wait())
542550
else:
543551
v = await self.on_block(f)
544552
return v
553+
```
545554

555+
A task can also make a synchronous call (to a `canon` built-in or another
556+
component) via `call_sync` which, like `wait_on`, clears the `interruptible`
557+
flag to block new tasks from starting or resuming.
558+
```python
546559
async def call_sync(self, callee, *args):
547560
if self.inst.interruptible.is_set():
548561
self.inst.interruptible.clear()
@@ -551,19 +564,15 @@ finished (signalled by `interrupt` being re-set).
551564
else:
552565
await callee(*args, self.on_block)
553566
```
554-
If `wait_on` or `call_sync` are called when `interruptible` is *initially*
555-
clear, then the current task must have been created for a synchronously-lifted
556-
export call and so there are no other tasks to worry about and thus `wait_on`
557-
*must not* wait for `interrupt` to be re-set (which won't happen until the
558-
current task finishes via `exit`, defined below).
559567

560568
While a task is running, it may call `wait` (via `canon task.wait` or when
561569
using a `callback`, by returning to the event loop) to learn about progress
562-
made by async subtasks which are reported to this task by `notify`.
570+
made by async subtasks, streams or futures which are reported to this task by
571+
`notify`.Queue`. (The definition of `wait_on`, used by `wait` here, is next.)
563572
```python
564-
async def wait(self) -> EventTuple:
573+
async def wait(self, sync) -> EventTuple:
565574
while True:
566-
await self.wait_on(self.has_events.wait())
575+
await self.wait_on(sync, self.has_events.wait())
567576
if (e := self.maybe_next_event()):
568577
return e
569578

@@ -585,19 +594,20 @@ flexibility allows multiple redundant events to be collapsed into one (e.g.,
585594
when a `Subtask` advances `CallState` multiple times before the event enqueued
586595
by the initial state change is delivered) and also for events to be
587596
retroactively removed (e.g., when a `stream.cancel-read` "steals" a pending
588-
`STREAM_READ` event that was enqueued but not yet delivered). Although this
589-
Python code represents `events` as a list of closures, an optimizing
590-
implementation should be able to avoid dynamically allocating this list and
591-
instead represent `events` as a linked list embedded in the elements of the
592-
`waitables` table (noting that, by design, any given `waitables` element can be
593-
in the `events` list at most once).
597+
`STREAM_READ` event that was enqueued but not yet delivered).
598+
599+
Although this Python code represents `events` as an `asyncio.Queue` of
600+
closures, an optimizing implementation should be able to avoid dynamically
601+
allocating anything and instead represent `events` as a linked list embedded
602+
in the elements of the `waitables` table (noting that, by design, any given
603+
`waitables` element can be in the `events` list at most once).
594604

595605
A task may also cooperatively yield (via `canon task.yield`), allowing the
596606
runtime to switch execution to another task without having to wait for any
597607
external I/O (as emulated in the Python code by awaiting `sleep(0)`:
598608
```python
599-
async def yield_(self):
600-
await self.wait_on(asyncio.sleep(0))
609+
async def yield_(self, sync):
610+
await self.wait_on(sync, asyncio.sleep(0))
601611
```
602612

603613
Putting these together, a task may also poll (via `canon task.poll`) for an
@@ -606,8 +616,8 @@ Importantly, `poll` starts by yielding execution (to avoid unintentionally
606616
starving other tasks) which means that the code calling `task.poll` must
607617
assume other tasks can execute, just like with `task.wait`.
608618
```python
609-
async def poll(self) -> Optional[EventTuple]:
610-
await self.yield_()
619+
async def poll(self, sync) -> Optional[EventTuple]:
620+
await self.yield_(sync)
611621
return self.maybe_next_event()
612622
```
613623

@@ -2603,10 +2613,10 @@ async def canon_lift(opts, inst, ft, callee, caller, on_start, on_return, on_blo
26032613
is_yield = bool(packed_ctx & 1)
26042614
ctx = packed_ctx & ~1
26052615
if is_yield:
2606-
await task.yield_()
2616+
await task.yield_(sync = False)
26072617
event, p1, p2 = (EventCode.YIELDED, 0, 0)
26082618
else:
2609-
event, p1, p2 = await task.wait()
2619+
event, p1, p2 = await task.wait(sync = False)
26102620
[packed_ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, p1, p2])
26112621
task.exit()
26122622
```
@@ -2875,7 +2885,7 @@ required here.
28752885

28762886
For a canonical definition:
28772887
```wasm
2878-
(canon task.wait (core func $f))
2888+
(canon task.wait $async? (memory $mem) (core func $f))
28792889
```
28802890
validation specifies:
28812891
* `$f` is given type `(func (param i32) (result i32))`
@@ -2884,30 +2894,31 @@ Calling `$f` waits for progress to be made in a subtask of the current task,
28842894
returning the event (which is currently simply a `CallState` value) and
28852895
writing the subtask index as an outparam:
28862896
```python
2887-
async def canon_task_wait(opts, task, ptr):
2897+
async def canon_task_wait(sync, mem, task, ptr):
28882898
trap_if(not task.inst.may_leave)
2889-
trap_if(task.opts.callback is not None)
2890-
event, p1, p2 = await task.wait()
2891-
cx = LiftLowerContext(opts, None, None)
2899+
event, p1, p2 = await task.wait(sync)
2900+
cx = LiftLowerContext(CanonicalOptions(memory = mem), None, None)
28922901
store(cx, p1, U32Type(), ptr)
28932902
store(cx, p2, U32Type(), ptr + 4)
28942903
return [event]
28952904
```
2896-
The `trap_if` ensures that, when a component uses a `callback` all events flow
2897-
through the event loop at the base of the stack.
2905+
If `async` is not set, no other tasks may execute during `task.wait`, which
2906+
can be useful for producer toolchains in situations where interleaving is not
2907+
supported. However, this is generally worse for concurrency and thus producer
2908+
toolchains should set `async` when possible. When `$async` is set, `task.wait`
2909+
will only block the current `Task`, allowing other tasks to start or resume.
28982910

2899-
Note that `task.wait` will only block the current `Task`, allowing other tasks
2900-
to run. Note also that `task.wait` can be called from a synchronously-lifted
2901-
export so that even synchronous code can make concurrent import calls. In these
2902-
synchronous cases, though, the automatic backpressure (applied by `Task.enter`)
2903-
will ensure there is only ever at most once synchronously-lifted task executing
2904-
in a component instance at a time.
2911+
`task.wait` can be called from a synchronously-lifted export so that even
2912+
synchronous code can make concurrent import calls. In these synchronous cases,
2913+
though, the automatic backpressure (applied by `Task.enter`) will ensure there
2914+
is only ever at most once synchronously-lifted task executing in a component
2915+
instance at a time.
29052916

29062917
### 🔀 `canon task.poll`
29072918

29082919
For a canonical definition:
29092920
```wasm
2910-
(canon task.poll (core func $f))
2921+
(canon task.poll $async? (memory $mem) (core func $f))
29112922
```
29122923
validation specifies:
29132924
* `$f` is given type `(func (param i32) (result i32))`
@@ -2916,36 +2927,38 @@ Calling `$f` does a non-blocking check for whether an event is already
29162927
available, returning whether or not there was such an event as a boolean and,
29172928
if there was an event, storing the `i32` event and payloads as outparams.
29182929
```python
2919-
async def canon_task_poll(opts, task, ptr):
2930+
async def canon_task_poll(sync, mem, task, ptr):
29202931
trap_if(not task.inst.may_leave)
2921-
ret = await task.poll()
2932+
ret = await task.poll(sync)
29222933
if ret is None:
29232934
return [0]
2924-
cx = LiftLowerContext(opts, None, None)
2935+
cx = LiftLowerContext(CanonicalOptions(memory = mem), None, None)
29252936
store(cx, ret, TupleType([U32Type(), U32Type(), U32Type()]), ptr)
29262937
return [1]
29272938
```
2928-
Note that the `await` of `task.poll` indicates that `task.poll` can yield to
2929-
other tasks (in this or other components) as part of polling for an event.
2939+
When `async` is set, `task.poll` can yield to other tasks (in this or other
2940+
components) as part of polling for an event.
29302941

29312942
### 🔀 `canon task.yield`
29322943

29332944
For a canonical definition:
29342945
```wasm
2935-
(canon task.yield (core func $f))
2946+
(canon task.yield $async? (core func $f))
29362947
```
29372948
validation specifies:
29382949
* `$f` is given type `(func)`
29392950

2940-
Calling `$f` calls `Task.yield_`, trapping if called when there is a `callback`.
2941-
(When there is a callback, yielding is achieved by returning with the LSB set.)
2951+
Calling `$f` calls `Task.yield_` to allow other tasks to execute:
29422952
```python
2943-
async def canon_task_yield(task):
2953+
async def canon_task_yield(sync, task):
29442954
trap_if(not task.inst.may_leave)
2945-
trap_if(task.opts.callback is not None)
2946-
await task.yield_()
2955+
await task.yield_(sync)
29472956
return []
29482957
```
2958+
If `async` is set, no other tasks *in the same component instance* can
2959+
execute, however tasks in *other* component instances may execute. This allows
2960+
a long-running task in one component to avoid starving other components
2961+
without needing support full reentrancy.
29492962

29502963
### 🔀 `canon {stream,future}.new`
29512964

@@ -3032,21 +3045,25 @@ async def async_copy(HandleT, BufferT, t, opts, event_code, task, i, ptr, n):
30323045
if h.stream.closed():
30333046
flat_results = [CLOSED]
30343047
else:
3035-
async def do_copy(on_block):
3036-
await h.copy(buffer, on_block)
3037-
def copy_event():
3038-
if h.copying_buffer is buffer:
3039-
h.copying_buffer = None
3040-
return (event_code, i, pack_async_copy_result(buffer, h))
3041-
else:
3042-
return None
3043-
h.call.task().notify(copy_event)
3044-
match await call_and_handle_blocking(do_copy):
3045-
case Blocked():
3046-
h.copying_buffer = buffer
3047-
flat_results = [BLOCKED]
3048-
case Returned():
3049-
flat_results = [pack_async_copy_result(buffer, h)]
3048+
if opts.sync:
3049+
await task.call_sync(h.copy, buffer)
3050+
flat_results = [pack_async_copy_result(buffer, h)]
3051+
else:
3052+
async def do_copy(on_block):
3053+
await h.copy(buffer, on_block)
3054+
def copy_event():
3055+
if h.copying_buffer is buffer:
3056+
h.copying_buffer = None
3057+
return (event_code, i, pack_async_copy_result(buffer, h))
3058+
else:
3059+
return None
3060+
h.call.task().notify(copy_event)
3061+
match await call_and_handle_blocking(do_copy):
3062+
case Blocked():
3063+
h.copying_buffer = buffer
3064+
flat_results = [BLOCKED]
3065+
case Returned():
3066+
flat_results = [pack_async_copy_result(buffer, h)]
30503067
return flat_results
30513068
```
30523069
The trap if `not h.call` prevents `write`s on the writable end of streams or

design/mvp/Explainer.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,9 +1369,9 @@ canon ::= ...
13691369
| (canon resource.rep <typeidx> (core func <id>?))
13701370
| (canon task.backpressure (core func <id>?)) 🔀
13711371
| (canon task.return <core:typeidx> (core func <id>?)) 🔀
1372-
| (canon task.wait (memory <core:memidx>) (core func <id>?)) 🔀
1373-
| (canon task.poll (memory <core:memidx>) (core func <id>?)) 🔀
1374-
| (canon task.yield (core func <id>?)) 🔀
1372+
| (canon task.wait async? (memory <core:memidx>) (core func <id>?)) 🔀
1373+
| (canon task.poll async? (memory <core:memidx>) (core func <id>?)) 🔀
1374+
| (canon task.yield async? (core func <id>?)) 🔀
13751375
| (canon stream.new <typeidx> (core func <id>?)) 🔀
13761376
| (canon stream.read <typeidx> <canonopt>* (core func <id>?)) 🔀
13771377
| (canon stream.write <typeidx> <canonopt>* (core func <id>?)) 🔀

0 commit comments

Comments
 (0)