Skip to content

Commit 112ed27

Browse files
committed
Move peer-tracking attrs from Actor -> IPCServer
Namely transferring the `Actor` peer-`Channel` tracking attrs, - `._peers` which maps the uids to client channels (with duplicates apparently..) - the `._peer_connected: dict[tuple[str, str], trio.Event]` child-peer syncing table mostly used by parent actors to wait on sub's to connect back during spawn. - the `._no_more_peers = trio.Event()` level triggered state signal. Further we move over with some minor reworks, - `.wait_for_peer()` verbatim (adjusting all dependants). - factor the no-more-peers shielded wait branch-block out of the end of `async_main()` into 2 new server meths, * `.has_peers()` with optional chan-connected checking flag. * `.wait_for_no_more_peers()` which *just* does the maybe-shielded `._no_more_peers.wait()`
1 parent 42cf9e1 commit 112ed27

File tree

7 files changed

+178
-88
lines changed

7 files changed

+178
-88
lines changed

tests/test_spawning.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ async def spawn(
5757
)
5858

5959
assert len(an._children) == 1
60-
assert portal.channel.uid in tractor.current_actor()._peers
60+
assert (
61+
portal.channel.uid
62+
in
63+
tractor.current_actor().ipc_server._peers
64+
)
6165

6266
# get result from child subactor
6367
result = await portal.result()

tractor/_discovery.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848

4949
if TYPE_CHECKING:
5050
from ._runtime import Actor
51+
from .ipc._server import IPCServer
5152

5253

5354
log = get_logger(__name__)
@@ -79,7 +80,7 @@ async def get_registry(
7980
)
8081
else:
8182
# TODO: try to look pre-existing connection from
82-
# `Actor._peers` and use it instead?
83+
# `IPCServer._peers` and use it instead?
8384
async with (
8485
_connect_chan(addr) as chan,
8586
open_portal(chan) as regstr_ptl,
@@ -111,14 +112,15 @@ def get_peer_by_name(
111112
) -> list[Channel]|None: # at least 1
112113
'''
113114
Scan for an existing connection (set) to a named actor
114-
and return any channels from `Actor._peers`.
115+
and return any channels from `IPCServer._peers: dict`.
115116
116117
This is an optimization method over querying the registrar for
117118
the same info.
118119
119120
'''
120121
actor: Actor = current_actor()
121-
to_scan: dict[tuple, list[Channel]] = actor._peers.copy()
122+
server: IPCServer = actor.ipc_server
123+
to_scan: dict[tuple, list[Channel]] = server._peers.copy()
122124
pchan: Channel|None = actor._parent_chan
123125
if pchan:
124126
to_scan[pchan.uid].append(pchan)

tractor/_runtime.py

Lines changed: 20 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@
4040
from contextlib import (
4141
ExitStack,
4242
)
43-
from collections import defaultdict
4443
from functools import partial
45-
from itertools import chain
4644
import importlib
4745
import importlib.util
4846
import os
@@ -76,6 +74,7 @@
7674
)
7775
from .ipc import (
7876
Channel,
77+
# IPCServer, # causes cycles atm..
7978
_server,
8079
)
8180
from ._addr import (
@@ -156,7 +155,6 @@ def is_registrar(self) -> bool:
156155
_root_n: Nursery|None = None
157156
_service_n: Nursery|None = None
158157

159-
# XXX moving to IPCServer!
160158
_ipc_server: _server.IPCServer|None = None
161159

162160
@property
@@ -246,14 +244,6 @@ def __init__(
246244
# by the user (currently called the "arbiter")
247245
self._spawn_method: str = spawn_method
248246

249-
self._peers: defaultdict[
250-
str, # uaid
251-
list[Channel], # IPC conns from peer
252-
] = defaultdict(list)
253-
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
254-
self._no_more_peers = trio.Event()
255-
self._no_more_peers.set()
256-
257247
# RPC state
258248
self._ongoing_rpc_tasks = trio.Event()
259249
self._ongoing_rpc_tasks.set()
@@ -338,7 +328,12 @@ def pformat(self) -> str:
338328
parent_uid: tuple|None = None
339329
if rent_chan := self._parent_chan:
340330
parent_uid = rent_chan.uid
341-
peers: list[tuple] = list(self._peer_connected)
331+
332+
peers: list = []
333+
server: _server.IPCServer = self.ipc_server
334+
if server:
335+
peers: list[tuple] = list(server._peer_connected)
336+
342337
fmtstr: str = (
343338
f' |_id: {self.aid!r}\n'
344339
# f" aid{ds}{self.aid!r}\n"
@@ -394,25 +389,6 @@ def reg_addrs(
394389

395390
self._reg_addrs = addrs
396391

397-
async def wait_for_peer(
398-
self,
399-
uid: tuple[str, str],
400-
401-
) -> tuple[trio.Event, Channel]:
402-
'''
403-
Wait for a connection back from a (spawned sub-)actor with
404-
a `uid` using a `trio.Event` for sync.
405-
406-
'''
407-
log.debug(f'Waiting for peer {uid!r} to connect')
408-
event = self._peer_connected.setdefault(uid, trio.Event())
409-
await event.wait()
410-
log.debug(f'{uid!r} successfully connected back to us')
411-
return (
412-
event,
413-
self._peers[uid][-1],
414-
)
415-
416392
def load_modules(
417393
self,
418394
# debug_mode: bool = False,
@@ -724,7 +700,7 @@ async def _from_parent(
724700
)
725701
assert isinstance(chan, Channel)
726702

727-
# Initial handshake: swap names.
703+
# init handshake: swap actor-IDs.
728704
await chan._do_handshake(aid=self.aid)
729705

730706
accept_addrs: list[UnwrappedAddress]|None = None
@@ -1620,16 +1596,18 @@ async def async_main(
16201596
)
16211597

16221598
# Ensure all peers (actors connected to us as clients) are finished
1623-
if not actor._no_more_peers.is_set():
1624-
if any(
1625-
chan.connected() for chan in chain(*actor._peers.values())
1626-
):
1627-
teardown_report += (
1628-
f'-> Waiting for remaining peers {actor._peers} to clear..\n'
1629-
)
1630-
log.runtime(teardown_report)
1631-
with CancelScope(shield=True):
1632-
await actor._no_more_peers.wait()
1599+
if (
1600+
(ipc_server := actor.ipc_server)
1601+
and
1602+
ipc_server.has_peers(check_chans=True)
1603+
):
1604+
teardown_report += (
1605+
f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
1606+
)
1607+
log.runtime(teardown_report)
1608+
await ipc_server.wait_for_no_more_peers(
1609+
shield=True,
1610+
)
16331611

16341612
teardown_report += (
16351613
'-> All peer channels are complete\n'

tractor/_spawn.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@
5858

5959

6060
if TYPE_CHECKING:
61+
from ipc import IPCServer
6162
from ._supervise import ActorNursery
6263
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
6364

65+
6466
log = get_logger('tractor')
6567

6668
# placeholder for an mp start context if so using that backend
@@ -481,6 +483,7 @@ async def trio_proc(
481483

482484
cancelled_during_spawn: bool = False
483485
proc: trio.Process|None = None
486+
ipc_server: IPCServer = actor_nursery._actor.ipc_server
484487
try:
485488
try:
486489
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
@@ -492,7 +495,7 @@ async def trio_proc(
492495
# wait for actor to spawn and connect back to us
493496
# channel should have handshake completed by the
494497
# local actor by the time we get a ref to it
495-
event, chan = await actor_nursery._actor.wait_for_peer(
498+
event, chan = await ipc_server.wait_for_peer(
496499
subactor.uid
497500
)
498501

@@ -724,11 +727,12 @@ async def mp_proc(
724727

725728
log.runtime(f"Started {proc}")
726729

730+
ipc_server: IPCServer = actor_nursery._actor.ipc_server
727731
try:
728732
# wait for actor to spawn and connect back to us
729733
# channel should have handshake completed by the
730734
# local actor by the time we get a ref to it
731-
event, chan = await actor_nursery._actor.wait_for_peer(
735+
event, chan = await ipc_server.wait_for_peer(
732736
subactor.uid,
733737
)
734738

tractor/_supervise.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353

5454
if TYPE_CHECKING:
5555
import multiprocessing as mp
56+
# from .ipc._server import IPCServer
57+
from .ipc import IPCServer
58+
5659

5760
log = get_logger(__name__)
5861

@@ -315,6 +318,9 @@ async def cancel(
315318
children: dict = self._children
316319
child_count: int = len(children)
317320
msg: str = f'Cancelling actor nursery with {child_count} children\n'
321+
322+
server: IPCServer = self._actor.ipc_server
323+
318324
with trio.move_on_after(3) as cs:
319325
async with trio.open_nursery(
320326
strict_exception_groups=False,
@@ -337,7 +343,7 @@ async def cancel(
337343

338344
else:
339345
if portal is None: # actor hasn't fully spawned yet
340-
event = self._actor._peer_connected[subactor.uid]
346+
event: trio.Event = server._peer_connected[subactor.uid]
341347
log.warning(
342348
f"{subactor.uid} never 't finished spawning?"
343349
)
@@ -353,7 +359,7 @@ async def cancel(
353359
if portal is None:
354360
# cancelled while waiting on the event
355361
# to arrive
356-
chan = self._actor._peers[subactor.uid][-1]
362+
chan = server._peers[subactor.uid][-1]
357363
if chan:
358364
portal = Portal(chan)
359365
else: # there's no other choice left

tractor/devx/_debug.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@
9292
if TYPE_CHECKING:
9393
from trio.lowlevel import Task
9494
from threading import Thread
95-
from tractor.ipc import Channel
95+
from tractor.ipc import (
96+
Channel,
97+
IPCServer,
98+
# _server, # TODO? export at top level?
99+
)
96100
from tractor._runtime import (
97101
Actor,
98102
)
@@ -1434,6 +1438,7 @@ def any_connected_locker_child() -> bool:
14341438
14351439
'''
14361440
actor: Actor = current_actor()
1441+
server: IPCServer = actor.ipc_server
14371442

14381443
if not is_root_process():
14391444
raise InternalError('This is a root-actor only API!')
@@ -1443,7 +1448,7 @@ def any_connected_locker_child() -> bool:
14431448
and
14441449
(uid_in_debug := ctx.chan.uid)
14451450
):
1446-
chans: list[tractor.Channel] = actor._peers.get(
1451+
chans: list[tractor.Channel] = server._peers.get(
14471452
tuple(uid_in_debug)
14481453
)
14491454
if chans:

0 commit comments

Comments
 (0)