Extend the to_asyncio inter-loop-task channel iface#413
Extend the to_asyncio inter-loop-task channel iface#413
to_asyncio inter-loop-task channel iface#413Conversation
2616f4b to
ec65521
Compare
There was a problem hiding this comment.
Pull request overview
Extends tractor.to_asyncio’s inter-event-loop linked-task channel so asyncio tasks can interact via a single LinkedTaskChannel object (instead of separate to_trio/from_trio params), and updates tests to use the new API.
Changes:
- Switch debug import usage to
tractor.devxand route debug helpers throughdevx.debug. - Extend
LinkedTaskChannelwithget()(asyncio receives from trio) and a new non-blocking send helper (asyncio sends to trio). - Update infected-asyncio echo server test to accept a
LinkedTaskChanneldirectly and use the new methods.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
tractor/to_asyncio.py |
Adds/updates LinkedTaskChannel methods, adjusts debug plumbing, and updates open_channel_from() typing/docs. |
tests/test_infected_asyncio.py |
Refactors the asyncio echo-server test to use the channel object directly (chan.get(), chan.send_nowait(), chan.started_nowait()). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Send a value through `trio.Task` -> `asyncio.Task` | ||
| presuming | ||
| it defines a `from_trio` argument or makes calls | ||
| to `chan.get()` , if it does not | ||
| this method will raise an error. | ||
|
|
There was a problem hiding this comment.
LinkedTaskChannel.send() docstring still says it will raise an error if the asyncio task doesn't accept from_trio / use chan.get(), but the implementation unconditionally does _to_aio.put_nowait(item) and doesn't validate the target signature or consumer presence. Either adjust the docstring to reflect actual behavior (possible QueueFull if backpressured) or add explicit validation earlier (e.g. in _run_asyncio_task).
| Send a value through `trio.Task` -> `asyncio.Task` | |
| presuming | |
| it defines a `from_trio` argument or makes calls | |
| to `chan.get()` , if it does not | |
| this method will raise an error. | |
| Send a value from the `trio.Task` to the `asyncio.Task`. | |
| This enqueues ``item`` into the internal asyncio-side queue | |
| using ``put_nowait``. It does not validate that the target | |
| asyncio task accepts a ``from_trio`` argument or calls | |
| ``chan.get()``; correct consumption of items is the caller's | |
| responsibility. | |
| This call is non-blocking and may raise ``asyncio.QueueFull`` | |
| if the underlying queue is backpressured. |
| ) -> AsyncIterator[ | ||
| tuple[LinkedTaskChannel, Any] | ||
| ]: | ||
| ''' | ||
| Open an inter-loop linked task channel for streaming between a target | ||
| spawned ``asyncio`` task and ``trio``. | ||
| Start an `asyncio.Task` as `target()` and open an inter-loop | ||
| (linked) channel for streaming between it and the current | ||
| `trio.Task`. | ||
|
|
||
| A pair `(chan: LinkedTaskChannel, Any)` is delivered to the caller | ||
| where the 2nd element is the value provided by the | ||
| `asyncio.Task`'s unblocking call to `chan.started_nowait()`. |
There was a problem hiding this comment.
open_channel_from()’s return type annotation and docstring say the context manager yields (LinkedTaskChannel, Any) / “(chan, Any)”, but the implementation yields yield first, chan and all call sites unpack (first, chan). Align the type hint + docstring with the actual yielded tuple order (or swap the yield order if you intend (chan, first), but that would be a breaking API change).
| Send a value through FROM the `asyncio.Task` to | ||
| the `trio.Task` NON-BLOCKING. | ||
|
|
||
| This is equiv to `self._to_trio.send_nowait()`. | ||
|
|
||
| ''' |
There was a problem hiding this comment.
LinkedTaskChannel.send_nowait() is introduced with reversed direction compared to send()/receive(): send() sends trio -> asyncio (queue put), but send_nowait() sends asyncio -> trio (_to_trio.send_nowait). Because LinkedTaskChannel subclasses trio.abc.Channel, callers will reasonably expect send_nowait to be the non-blocking variant of send (same direction). As-is, trio-side code can accidentally call send_nowait() and silently send to the wrong endpoint (or even back into its own receive stream), causing very hard-to-debug behavior. Consider renaming this to an explicit direction (e.g. send_to_trio_nowait) and/or implementing send_nowait to match send semantics (with a separate method for aio->trio).
| Send a value through FROM the `asyncio.Task` to | |
| the `trio.Task` NON-BLOCKING. | |
| This is equiv to `self._to_trio.send_nowait()`. | |
| ''' | |
| Send a value through `trio.Task` -> `asyncio.Task` | |
| NON-BLOCKING. | |
| This is the non-blocking variant of :meth:`send` and is | |
| equivalent to calling ``self._to_aio.put_nowait(item)``. | |
| ''' | |
| self._to_aio.put_nowait(item) | |
| def send_to_trio_nowait( | |
| self, | |
| item: Any, | |
| ) -> None: | |
| ''' | |
| Send a value FROM the `asyncio.Task` to the `trio.Task` | |
| NON-BLOCKING. | |
| This is equivalent to ``self._to_trio.send_nowait(item)``. | |
| ''' |
| This is equiv to `await self._from_trio.get()`. | ||
|
|
||
| ''' |
There was a problem hiding this comment.
Docstring for LinkedTaskChannel.get() references self._from_trio, but this attribute doesn't exist on LinkedTaskChannel (the asyncio-side receive queue is _to_aio). This looks like a stale name and will mislead users reading the API docs; update the wording to match the actual field/method (_to_aio.get() / “asyncio receives from trio via the queue”).
| This is equiv to `await self._from_trio.get()`. | |
| ''' | |
| This is equivalent to `await self._to_aio.get()`, i.e. asyncio | |
| receives from trio via the internal queue. |
|
Obviously needs (some) tests adjusted to the new API(s). |
ec65521 to
3a7be32
Compare
With methods to comms similar to those that exist for the `trio` side, - `.get()` which proxies verbatim to the `._to_aio: asyncio.Queue`, - `.send_nowait()` which thin-wraps to `._to_trio: trio.MemorySendChannel`. Obviously the more correct design is to break up the channel type into a pair of handle types, one for each "side's" task in each event-loop, that's hopefully coming shortly in a follow up patch B) Also, - fill in some missing doc strings, tweak some explanation comments and update todos. - adjust the `test_aio_errors_and_channel_propagates_and_closes()` suite to use the new `chan` fn-sig-API with `.open_channel_from()` including the new methods for msg comms; ensures everything added here works e2e.
This change is masked out now BUT i'm leaving it in for reference.
I was debugging a multi-actor fault where the primary source actor was
an infected-aio-subactor (`brokerd.ib`) and it seemed like the REPL was only
entering on the `trio` side (at a `.open_channel_from()`) and not
eventually breaking in the `asyncio.Task`. But, since (changing
something?) it seems to be working now, it's just that the `trio` side
seems to sometimes handle before the (source/causing and more
child-ish) `asyncio`-task, which is a bit odd and not expected..
We could likely refine (maybe with an inter-loop-task REPL lock?) this
at some point and ensure a child-`asyncio` task which errors always
grabs the REPL **first**?
Lowlevel deats/further-todos,
- add (masked) `maybe_open_crash_handler()` block around
`asyncio.Task` execution with notes about weird parent-addr
delivery bug in `test_sync_pause_from_aio_task`
* yeah dunno what that's about but made a bug; seems to be IPC
serialization of the `TCPAddress` struct somewhere??
- add inter-loop lock TODO for avoiding aio-task clobbering
trio-tasks when both crash in debug-mode
Also,
- change import from `tractor.devx.debug` to `tractor.devx`
- adjust `get_logger()` call to use new implicit mod-name detection
added to `.log.get_logger()`, i.e. sin `name=__name__`.
- some teensie refinements to `open_channel_from()`:
* swap return type annotation for to `tuple[LinkedTaskChannel, Any]`
(was `Any`).
* update doc-string to clarify started-value delivery
* add err-log before `.pause()` in what should be an unreachable path.
* add todo to swap the `(first, chan)` pair to match that of ctx..
(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
3a7be32 to
7f499cf
Compare
Convert every remaining `to_trio`/`from_trio` fn-sig style to the new unified `chan: LinkedTaskChannel` iface added in prior commit (c46e9ee). Deats, - `to_trio.send_nowait(val)` (1st call) -> `chan.started_nowait(val)` - `to_trio.send_nowait(val)` (subsequent) -> `chan.send_nowait(val)` - `await from_trio.get()` -> `await chan.get()` Converted fns, - `sleep_and_err()`, `push_from_aio_task()` in `tests/test_infected_asyncio.py` - `sync_and_err()` in `tests/test_root_infect_asyncio.py` - `aio_streamer()` in `tests/test_child_manages_service_nursery.py` - `aio_echo_server()` in `examples/infected_asyncio_echo_server.py` - `bp_then_error()` in `examples/debugging/asyncio_bp.py` Also, - drop stale comments referencing old param names. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) -> AsyncIterator[ | ||
| tuple[LinkedTaskChannel, Any] | ||
| ]: |
There was a problem hiding this comment.
open_channel_from() advertises AsyncIterator[tuple[LinkedTaskChannel, Any]], but the body yields (first, chan) where first is the started value and chan is the LinkedTaskChannel. Please fix the return type (and any related typing) to match the actual yielded tuple order, or swap the yield order to match the annotation.
| yield first, chan | ||
| # ^TODO! swap these!! |
There was a problem hiding this comment.
There is an explicit # ^TODO! swap these!! next to yield first, chan, which indicates the tuple order is still in flux. Because this is a public-facing contextmanager API used throughout tests/examples, it should be finalized in this PR (choose an order and make typing/docs/callers consistent) to avoid follow-up breakage.
| yield first, chan | |
| # ^TODO! swap these!! | |
| yield chan, first |
| chan.started_nowait('Uhh we shouldve RTE-d ^^ ??') | ||
| await asyncio.sleep(float('inf')) |
There was a problem hiding this comment.
raise_before_started() raises at line 1112, so the subsequent chan.started_nowait(...) and await asyncio.sleep(...) are unreachable. If this is meant as explanatory text, it should stay commented/removed; otherwise restructure the test helper to exercise the intended path.
| chan.started_nowait('Uhh we shouldve RTE-d ^^ ??') | |
| await asyncio.sleep(float('inf')) | |
| # The following lines are intentionally left as explanatory and | |
| # are unreachable because of the unconditional `raise` above. | |
| # chan.started_nowait('Uhh we shouldve RTE-d ^^ ??') | |
| # await asyncio.sleep(float('inf')) |
| ''' | ||
| Receive a value `asyncio.Task` <- `trio.Task`. | ||
|
|
||
| This is equiv to `await self._from_trio.get()`. |
There was a problem hiding this comment.
LinkedTaskChannel.get() docstring says it's equivalent to await self._from_trio.get(), but the implementation awaits self._to_aio.get(). Please update the docstring (or rename the underlying attribute) so it matches the actual direction/source of messages.
| This is equiv to `await self._from_trio.get()`. | |
| This is equiv to `await self._to_aio.get()`. |
Summary
see the description below from
copilot.Todo before merge
(1ad2c28) adjust/add tests which use the new API,
test_infected_asyncio.py(1ad2c28) adjustments to any
examplesscripts,debugging/asyncio_bp.pyinfected_asyncio_echo_server.pyadjust any docs?
In follow up?
(chan, first)like we deliver fromPortal.open_context()?