Skip to content

Commit 3cd2229

Browse files
committed
Decouple actor-state from low-level ipc-server
As much as is possible given we currently do some graceful cancellation join-waiting on any connected sub-actors whenever an active `local_nursery: AcrtorNursery` in the post-rpc teardown sequence of `handle_stream_from_peer()` is detected. In such cases we try to allow the higher level inter-actor (task) context(s) to fully cancelled-ack before conducting IPC machinery shutdown. The main immediate motivation for all this is to support unit testing the `.ipc._server` APIs but in the future may be useful for anyone wanting to use our modular IPC transport layer sin-"actors". Impl deats, - drop passing an `actor: Actor` ref from as many routines in `.ipc._server` as possible instead opting to use `._state.current_actor()` where abs needed; thus the fns dropping an `actor` input param are: - `open_ipc_server()` - `IPCServer.listen_on()` - `._serve_ipc_eps()` - `.handle_stream_from_peer()` - factor the above mentioned graceful remote-cancel-ack waiting into a new `maybe_wait_on_canced_subs()` which is called from `handle_stream_from_peer()` and delivers a maybe-`local_nursery: ActorNursery` for downstream logic; it's this new fn which primarily still needs to call `current_actor()`. - in `handle_stream_from_peer()` also use `current_actor()` to check if a handshake is needed (or if it was called as part of some actor-runtime-less operation like our unit test suite!). - also don't pass an `actor` to `._rpc.process_messages()` see how-n-why below.. Surrounding ipc-server client/caller adjustments, - `._rpc.process_messages()` no longer takes an `actor` input and now calls `current_actor()` instead. - `._portal.open_portal()` is adjusted to ^. - `._runtime.async_main()` is adjusted to the `.ipc._server`'s removal of `actor` ref passing. Also, - drop some server `log.info()`s to `.runtime()`
1 parent 2ea703c commit 3cd2229

File tree

4 files changed

+243
-198
lines changed

4 files changed

+243
-198
lines changed

tractor/_portal.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,8 +582,7 @@ async def open_portal(
582582
msg_loop_cs = await tn.start(
583583
partial(
584584
_rpc.process_messages,
585-
actor,
586-
channel,
585+
chan=channel,
587586
# if the local task is cancelled we want to keep
588587
# the msg loop running until our block ends
589588
shield=True,

tractor/_rpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
869869

870870

871871
async def process_messages(
872-
actor: Actor,
873872
chan: Channel,
874873
shield: bool = False,
875874
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
@@ -907,6 +906,7 @@ async def process_messages(
907906
(as utilized inside `Portal.cancel_actor()` ).
908907
909908
'''
909+
actor: Actor = _state.current_actor()
910910
assert actor._service_n # runtime state sanity
911911

912912
# TODO: once `trio` get's an "obvious way" for req/resp we

tractor/_runtime.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,6 +1262,10 @@ async def async_main(
12621262
the actor's "runtime" and all thus all ongoing RPC tasks.
12631263
12641264
'''
1265+
# XXX NOTE, `_state._current_actor` **must** be set prior to
1266+
# calling this core runtime entrypoint!
1267+
assert actor is _state.current_actor()
1268+
12651269
actor._task: trio.Task = trio.lowlevel.current_task()
12661270

12671271
# attempt to retreive ``trio``'s sigint handler and stash it
@@ -1321,7 +1325,6 @@ async def async_main(
13211325
) as service_nursery,
13221326

13231327
_server.open_ipc_server(
1324-
actor=actor,
13251328
parent_tn=service_nursery,
13261329
stream_handler_tn=service_nursery,
13271330
) as ipc_server,
@@ -1375,7 +1378,6 @@ async def async_main(
13751378
'Booting IPC server'
13761379
)
13771380
eps: list = await ipc_server.listen_on(
1378-
actor=actor,
13791381
accept_addrs=accept_addrs,
13801382
stream_handler_nursery=service_nursery,
13811383
)
@@ -1460,8 +1462,7 @@ async def async_main(
14601462
await root_nursery.start(
14611463
partial(
14621464
_rpc.process_messages,
1463-
actor,
1464-
actor._parent_chan,
1465+
chan=actor._parent_chan,
14651466
shield=True,
14661467
)
14671468
)

0 commit comments

Comments
 (0)