Skip to content

Commit 81c33bf

Browse files
committed
Extend .to_asyncio.LinkedTaskChannel for aio side
With methods to comms similar to those that exist for the `trio` side, - `.get()` which proxies verbatim to the `._to_aio: asyncio.Queue`, - `.send_nowait()` which thin-wraps to `._to_trio: trio.MemorySendChannel`. Obviously the more correct design is to break up the channel type into a pair of handle types, one for each "side's" task in each event-loop, that's hopefully coming shortly in a follow up patch B) Also, - fill in some missing doc strings, tweak some explanation comments and update todos. - adjust the `test_aio_errors_and_channel_propagates_and_closes()` suite to use the new `chan` fn-sig-API with `.open_channel_from()` including the new methods for msg comms; ensures everything added here works e2e.
1 parent fee1ee3 commit 81c33bf

File tree

2 files changed

+62
-20
lines changed

2 files changed

+62
-20
lines changed

tests/test_infected_asyncio.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -732,23 +732,29 @@ async def main():
732732

733733

734734
async def aio_echo_server(
735-
to_trio: trio.MemorySendChannel,
736-
from_trio: asyncio.Queue,
735+
chan: to_asyncio.LinkedTaskChannel,
737736
) -> None:
737+
'''
738+
An IPC-msg "echo server" with msgs received and relayed by
739+
a parent `trio.Task` into a child `asyncio.Task`
740+
and then repeated back to that local parent (`trio.Task`)
741+
and sent again back to the original calling remote actor.
738742
739-
to_trio.send_nowait('start')
743+
'''
744+
# same semantics as `trio.TaskStatus.started()`
745+
chan.started_nowait('start')
740746

741747
while True:
742748
try:
743-
msg = await from_trio.get()
749+
msg = await chan.get()
744750
except to_asyncio.TrioTaskExited:
745751
print(
746752
'breaking aio echo loop due to `trio` exit!'
747753
)
748754
break
749755

750756
# echo the msg back
751-
to_trio.send_nowait(msg)
757+
chan.send_nowait(msg)
752758

753759
# if we get the terminate sentinel
754760
# break the echo loop
@@ -765,7 +771,10 @@ async def trio_to_aio_echo_server(
765771
):
766772
async with to_asyncio.open_channel_from(
767773
aio_echo_server,
768-
) as (first, chan):
774+
) as (
775+
first, # value from `chan.started_nowait()` above
776+
chan,
777+
):
769778
assert first == 'start'
770779

771780
await ctx.started(first)
@@ -776,7 +785,8 @@ async def trio_to_aio_echo_server(
776785
await chan.send(msg)
777786

778787
out = await chan.receive()
779-
# echo back to parent actor-task
788+
789+
# echo back to parent-actor's remote parent-ctx-task!
780790
await stream.send(out)
781791

782792
if out is None:
@@ -1090,14 +1100,12 @@ async def main():
10901100

10911101

10921102
# ?TODO asyncio.Task fn-deco?
1093-
# -[ ] do sig checkingat import time like @context?
1094-
# -[ ] maybe name it @aio_task ??
10951103
# -[ ] chan: to_asyncio.InterloopChannel ??
1104+
# -[ ] do fn-sig checking at import time like @context?
1105+
# |_[ ] maybe name it @a(sync)io_task ??
1106+
# @asyncio_task <- not bad ??
10961107
async def raise_before_started(
1097-
# from_trio: asyncio.Queue,
1098-
# to_trio: trio.abc.SendChannel,
10991108
chan: to_asyncio.LinkedTaskChannel,
1100-
11011109
) -> None:
11021110
'''
11031111
`asyncio.Task` entry point which RTEs before calling

tractor/to_asyncio.py

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,14 @@
9494
QueueShutDown = False
9595

9696

97-
# TODO, generally speaking we can generalize this abstraction, a "SC linked
98-
# parent->child task pair", as the same "supervision scope primitive"
99-
# **that is** our `._context.Context` with the only difference being
100-
# in how the tasks conduct msg-passing comms.
97+
# TODO, generally speaking we can generalize this abstraction as,
98+
#
99+
# > A "SC linked, inter-event-loop" channel for comms between
100+
# > a `parent: trio.Task` -> `child: asyncio.Task` pair.
101+
#
102+
# It is **very similar** in terms of its operation as a "supervision
103+
# scope primitive" to that of our `._context.Context` with the only
104+
# difference being in how the tasks conduct msg-passing comms.
101105
#
102106
# For `LinkedTaskChannel` we are passing the equivalent of (once you
103107
# include all the recently added `._trio/aio_to_raise`
@@ -122,6 +126,7 @@ class LinkedTaskChannel(
122126
task scheduled in the host loop.
123127
124128
'''
129+
# ?TODO, rename as `._aio_q` since it's 2-way?
125130
_to_aio: asyncio.Queue
126131
_from_aio: trio.MemoryReceiveChannel
127132

@@ -235,9 +240,11 @@ def started_nowait(
235240
#
236241
async def receive(self) -> Any:
237242
'''
238-
Receive a value from the paired `asyncio.Task` with
243+
Receive a value `trio.Task` <- `asyncio.Task`.
244+
245+
Note the tasks in each loop are "SC linked" as a pair with
239246
exception/cancel handling to teardown both sides on any
240-
unexpected error.
247+
unexpected error or cancellation.
241248
242249
'''
243250
try:
@@ -261,15 +268,42 @@ async def receive(self) -> Any:
261268
):
262269
raise err
263270

271+
async def get(self) -> Any:
272+
'''
273+
Receive a value `asyncio.Task` <- `trio.Task`.
274+
275+
This is equiv to `await self._from_trio.get()`.
276+
277+
'''
278+
return await self._to_aio.get()
279+
264280
async def send(self, item: Any) -> None:
265281
'''
266-
Send a value through to the asyncio task presuming
267-
it defines a ``from_trio`` argument, if it does not
282+
Send a value through `trio.Task` -> `asyncio.Task`
283+
presuming
284+
it defines a `from_trio` argument or makes calls
285+
to `chan.get()` , if it does not
268286
this method will raise an error.
269287
270288
'''
271289
self._to_aio.put_nowait(item)
272290

291+
# TODO? could we only compile-in this method on an instance
292+
# handed to the `asyncio`-side, i.e. the fn invoked with
293+
# `.open_channel_from()`.
294+
def send_nowait(
295+
self,
296+
item: Any,
297+
) -> None:
298+
'''
299+
Send a value through FROM the `asyncio.Task` to
300+
the `trio.Task` NON-BLOCKING.
301+
302+
This is equiv to `self._to_trio.send_nowait()`.
303+
304+
'''
305+
self._to_trio.send_nowait(item)
306+
273307
# TODO? needed?
274308
# async def wait_aio_complete(self) -> None:
275309
# await self._aio_task_complete.wait()

0 commit comments

Comments
 (0)