diff --git a/examples/actors/most_basic_supervisor.py b/examples/actors/most_basic_supervisor.py new file mode 100644 index 000000000..45dad3450 --- /dev/null +++ b/examples/actors/most_basic_supervisor.py @@ -0,0 +1,86 @@ +import trio +import tractor + + +class Restart(Exception): + """Restart signal""" + + +async def sleep_then_restart(): + actor = tractor.current_actor() + print(f'{actor.uid} starting up!') + await trio.sleep(0.5) + raise Restart('This is a restart signal') + + +async def signal_restart_whole_actor(): + actor = tractor.current_actor() + print(f'{actor.uid} starting up!') + await trio.sleep(0.5) + return 'restart_me' + + +async def respawn_remote_task(portal): + # start a task in the actor at the other end + # of the provided portal, when it signals a restart, + # restart it.. + + # This is much more efficient then restarting the undlerying + # process over and over since the python interpreter runtime + # stays up and we just submit a new task to run (which + # is just the original one we submitted repeatedly. + while True: + try: + await portal.run(sleep_then_restart) + except tractor.RemoteActorError as error: + if 'Restart' in str(error): + # respawn the actor task + continue + + +async def supervisor(): + + async with tractor.open_nursery() as tn: + + p0 = await tn.start_actor('task_restarter', enable_modules=[__name__]) + + # Yes, you can do this from multiple tasks on one actor + # or mulitple lone tasks in multiple subactors. + # We'll show both. + + async with trio.open_nursery() as n: + # we'll doe the first as a lone task restart in a daemon actor + for i in range(4): + n.start_soon(respawn_remote_task, p0) + + # Open another nursery that will respawn sub-actors + + # spawn a set of subactors that will signal restart + # of the group of processes on each failures + portals = [] + + # start initial subactor set + for i in range(4): + p = await tn.run_in_actor(signal_restart_whole_actor) + portals.append(p) + + # now wait on results and respawn actors + # that request it + while True: + + for p in portals: + result = await p.result() + + if result == 'restart_me': + print(f'restarting {p.channel.uid}') + await p.cancel_actor() + await trio.sleep(0.5) + p = await tn.run_in_actor(signal_restart_whole_actor) + portals.append(p) + + # this will block indefinitely so user must + # cancel with ctrl-c + + +if __name__ == '__main__': + trio.run(supervisor) diff --git a/examples/actors/mutate_remote_state.py b/examples/actors/mutate_remote_state.py new file mode 100644 index 000000000..26ab8645e --- /dev/null +++ b/examples/actors/mutate_remote_state.py @@ -0,0 +1,64 @@ +from itertools import cycle +from pprint import pformat +from dataclasses import dataclass, field + +import trio +import tractor + + +@dataclass +class MyProcessStateThing: + state: dict = field(default_factory=dict) + + def update(self, msg: dict): + self.state.update(msg) + + +_actor_state = MyProcessStateThing() + + +async def update_local_state(msg: dict): + """Update process-local state from sent message and exit. + + """ + actor = tractor.current_actor() + + global _actor_state + + + print(f'Yo we got a message {msg}') + + # update the "actor state" + _actor_state.update(msg) + + print(f'New local "state" for {actor.uid} is {pformat(_actor_state.state)}') + + # we're done so exit this task running in the subactor + + +async def main(): + # Main process/thread that spawns one sub-actor and sends messages + # to it to update it's state. + + actor_portals = [] + + # XXX: that subactor can **not** outlive it's parent, this is SC. + async with tractor.open_nursery() as tn: + + portal = await tn.start_actor('even_boy', enable_modules=[__name__]) + actor_portals.append(portal) + + portal = await tn.start_actor('odd_boy', enable_modules=[__name__]) + actor_portals.append(portal) + + for i, (count, portal) in enumerate( + zip(range(100), cycle(actor_portals)) + ): + await portal.run(update_local_state, msg={f'msg_{i}': count}) + + # blocks here indefinitely synce we spawned "daemon actors" using + # .start_actor()`, you'll need to control-c to cancel. + + +if __name__ == '__main__': + trio.run(main) diff --git a/examples/actors/ray_style_classes.py b/examples/actors/ray_style_classes.py new file mode 100644 index 000000000..e304d9307 --- /dev/null +++ b/examples/actors/ray_style_classes.py @@ -0,0 +1,153 @@ +import inspect +from typing import Any +from functools import partial +from contextlib import asynccontextmanager, AsyncExitStack +from itertools import cycle +from pprint import pformat + +import trio +import tractor + + +log = tractor.log.get_logger(__name__) + + +class ActorState: + """Singlteton actor per process. + + """ + # this is a class defined variable and is thus both + # singleton across object instances and task safe. + state: dict = {} + + def update(self, msg: dict) -> None: + _actor = tractor.current_actor() + + print(f'Yo we got a message {msg}') + self.state.update(msg) + + print(f'New local "state" for {_actor.uid} is {pformat(self.state)}') + + def close(self): + # gives headers showing which process and task is active + log.info('Actor state is closing') + + # if we wanted to support spawning or talking to other + # actors we can do that using a portal map collection? + # _portals: dict = {} + + +async def _run_proxy_method( + meth: str, + msg: dict, +) -> Any: + """Update process-local state from sent message and exit. + + """ + # Create a new actor instance per call. + # We can make this persistent by storing it either + # in a global var or are another clas scoped variable? + # If you want it somehow persisted in another namespace + # I'd be interested to know "where". + actor = ActorState() + if meth != 'close': + return getattr(actor, meth)(msg) + else: + actor.close() + + # we're done so exit this task running in the subactor + + +class MethodProxy: + def __init__( + self, + portal: tractor._portal.Portal + ) -> None: + self._portal = portal + + async def _run_method( + self, + *, + meth: str, + msg: dict, + ) -> Any: + return await self._portal.run( + _run_proxy_method, + meth=meth, + msg=msg + ) + + +def get_method_proxy(portal, target=ActorState) -> MethodProxy: + + proxy = MethodProxy(portal) + + # mock all remote methods + for name, method in inspect.getmembers( + target, predicate=inspect.isfunction + ): + if '_' == name[0]: + # skip private methods + continue + + else: + setattr(proxy, name, partial(proxy._run_method, meth=name)) + + return proxy + + +@asynccontextmanager +async def spawn_proxy_actor(name): + + # XXX: that subactor can **not** outlive it's parent, this is SC. + async with tractor.open_nursery( + debug_mode=True, + # loglevel='info', + ) as tn: + + portal = await tn.start_actor(name, enable_modules=[__name__]) + + proxy = get_method_proxy(portal) + + yield proxy + + await proxy.close(msg=None) + + +async def main(): + # Main process/thread that spawns one sub-actor and sends messages + # to it to update it's state. + + try: + stack = AsyncExitStack() + + actors = [] + for name in ['even', 'odd']: + + actor_proxy = await stack.enter_async_context( + spawn_proxy_actor(name + '_boy') + ) + actors.append(actor_proxy) + + # spin through the actors and update their states + for i, (count, actor) in enumerate( + zip(range(100), cycle(actors)) + ): + # Here we call the locally patched `.update()` method of the + # remote instance + + # NOTE: the instance created each call here is currently + # a new object - to persist it across `portal.run()` calls + # we need to store it somewhere in memory for access by + # a new task spawned in the remote actor process. + await actor.update(msg={f'msg_{i}': count}) + + # blocks here indefinitely synce we spawned "daemon actors" using + # .start_actor()`, you'll need to control-c to cancel. + + finally: + await stack.aclose() + + +if __name__ == '__main__': + trio.run(main) diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py new file mode 100644 index 000000000..3ff8dab60 --- /dev/null +++ b/examples/parallelism/concurrent_actors_primes.py @@ -0,0 +1,119 @@ +""" +Demonstration of the prime number detector example from the +``concurrent.futures`` docs: + +https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example + +This uses no extra threads, fancy semaphores or futures; all we need +is ``tractor``'s channels. + +""" +from contextlib import asynccontextmanager +from typing import List, Callable +import itertools +import math +import time + +import tractor +import trio +from async_generator import aclosing + + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419, +] + + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + + +@asynccontextmanager +async def worker_pool(workers=4): + """Though it's a trivial special case for ``tractor``, the well + known "worker pool" seems to be the defacto "but, I want this + process pattern!" for most parallelism pilgrims. + + Yes, the workers stay alive (and ready for work) until you close + the context. + """ + async with tractor.open_nursery() as tn: + + portals = [] + snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES)) + + for i in range(workers): + + # this starts a new sub-actor (process + trio runtime) and + # stores it's "portal" for later use to "submit jobs" (ugh). + portals.append( + await tn.start_actor( + f'worker_{i}', + enable_modules=[__name__], + ) + ) + + async def _map( + worker_func: Callable[[int], bool], + sequence: List[int] + ) -> List[bool]: + + # define an async (local) task to collect results from workers + async def send_result(func, value, portal): + await snd_chan.send((value, await portal.run(func, n=value))) + + async with trio.open_nursery() as n: + + for value, portal in zip(sequence, itertools.cycle(portals)): + n.start_soon( + send_result, + worker_func, + value, + portal + ) + + # deliver results as they arrive + for _ in range(len(sequence)): + yield await recv_chan.receive() + + # deliver the parallel "worker mapper" to user code + yield _map + + # tear down all "workers" on pool close + await tn.cancel() + + +async def main(): + + async with worker_pool() as actor_map: + + start = time.time() + + async with aclosing(actor_map(is_prime, PRIMES)) as results: + async for number, prime in results: + + print(f'{number} is prime: {prime}') + + print(f'processing took {time.time() - start} seconds') + + +if __name__ == '__main__': + start = time.time() + trio.run(main) + print(f'script took {time.time() - start} seconds') diff --git a/examples/parallelism/concurrent_futures_primes.py b/examples/parallelism/concurrent_futures_primes.py new file mode 100644 index 000000000..81ae23d60 --- /dev/null +++ b/examples/parallelism/concurrent_futures_primes.py @@ -0,0 +1,40 @@ +import time +import concurrent.futures +import math + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419] + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + +def main(): + with concurrent.futures.ProcessPoolExecutor() as executor: + start = time.time() + + for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): + print('%d is prime: %s' % (number, prime)) + + print(f'processing took {time.time() - start} seconds') + +if __name__ == '__main__': + + start = time.time() + main() + print(f'script took {time.time() - start} seconds') diff --git a/examples/parallelism/we_are_processes.py b/examples/parallelism/we_are_processes.py new file mode 100644 index 000000000..8283b9c5b --- /dev/null +++ b/examples/parallelism/we_are_processes.py @@ -0,0 +1,37 @@ +""" +Run with a process monitor from a terminal using: +$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $! + +""" +from multiprocessing import cpu_count +import os + +import tractor +import trio + + +async def target(): + print(f"Yo, i'm '{tractor.current_actor().name}' " + f"running in pid {os.getpid()}") + await trio.sleep_forever() + + +async def main(): + + async with tractor.open_nursery() as n: + + for i in range(cpu_count()): + await n.run_in_actor(target, name=f'worker_{i}') + + print('This process tree will self-destruct in 1 sec...') + await trio.sleep(1) + + # you could have done this yourself + raise Exception('Self Destructed') + + +if __name__ == '__main__': + try: + trio.run(main) + except Exception: + print('Zombies Contained') diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index ea676a26b..3778e0e9a 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -78,13 +78,13 @@ def run(script_code): @pytest.mark.parametrize( 'example_script', - [ - f for f in os.listdir(examples_dir()) - if ( - ('__' not in f) and - ('debugging' not in f) - ) - ], + + # walk yields: (dirpath, dirnames, filenames) + [(p[0], f) for p in os.walk(examples_dir()) for f in p[2] + + if '__' not in f + and 'debugging' not in p[0] + ] ) def test_example(run_example_in_subproc, example_script): """Load and run scripts from this repo's ``examples/`` dir as a user @@ -95,7 +95,7 @@ def test_example(run_example_in_subproc, example_script): test directory and invoke the script as a module with ``python -m test_example``. """ - ex_file = os.path.join(examples_dir(), example_script) + ex_file = os.path.join(*example_script) with open(ex_file, 'r') as ex: code = ex.read() diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 63e0d0948..bebaf6662 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -17,6 +17,8 @@ class RemoteActorError(Exception): "Remote actor exception bundled locally" def __init__(self, message, type_str, **msgdata) -> None: super().__init__(message) + self.type_str = type_str + for ns in [builtins, _this_mod, trio]: try: self.type = getattr(ns, type_str) diff --git a/tractor/_forkserver_override.py b/tractor/_forkserver_override.py index 25134ffff..d799bb819 100644 --- a/tractor/_forkserver_override.py +++ b/tractor/_forkserver_override.py @@ -6,6 +6,8 @@ .. note:: There is no type hinting in this code base (yet) to remain as a close as possible to upstream. """ +# type: ignore + import os import socket import signal diff --git a/tractor/_mp_fixup_main.py b/tractor/_mp_fixup_main.py index 7869561be..78d3e9f03 100644 --- a/tractor/_mp_fixup_main.py +++ b/tractor/_mp_fixup_main.py @@ -67,7 +67,7 @@ def _fixup_main_from_name(mod_name: str) -> None: main_module = types.ModuleType("__mp_main__") main_content = runpy.run_module(mod_name, run_name="__mp_main__", - alter_sys=True) + alter_sys=True) # type: ignore main_module.__dict__.update(main_content) sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module @@ -95,6 +95,6 @@ def _fixup_main_from_path(main_path: str) -> None: # old_main_modules.append(current_main) main_module = types.ModuleType("__mp_main__") main_content = runpy.run_path(main_path, - run_name="__mp_main__") + run_name="__mp_main__") # type: ignore main_module.__dict__.update(main_content) sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module