Skip to content
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions examples/actors/mutate_remote_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from itertools import cycle
from pprint import pformat
from dataclasses import dataclass, field

import trio
import tractor


@dataclass
class MyProcessStateThing:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this what are you after?

A function that creates some object and then makes that object mutateable from another inbound message?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you want something like rays "actors" (which i would argue aren't really "actor model" actors):
https://docs.ray.io/en/latest/actors.html

We can also accomplish this but it will require a slight bit more machinery.

state: dict = field(default_factory=dict)

def update(self, msg: dict):
self.state.update(msg)


_actor_state = MyProcessStateThing()


async def update_local_state(msg: dict):
"""Update process-local state from sent message and exit.

"""
actor = tractor.current_actor()

global _actor_state


print(f'Yo we got a message {msg}')

# update the "actor state"
_actor_state.update(msg)

print(f'New local "state" for {actor.uid} is {pformat(_actor_state.state)}')

# we're done so exit this task running in the subactor


async def main():
# Main process/thread that spawns one sub-actor and sends messages
# to it to update it's state.

actor_portals = []

# XXX: that subactor can **not** outlive it's parent, this is SC.
async with tractor.open_nursery() as tn:

portal = await tn.start_actor('even_boy', enable_modules=[__name__])
actor_portals.append(portal)

portal = await tn.start_actor('odd_boy', enable_modules=[__name__])
actor_portals.append(portal)

for i, (count, portal) in enumerate(
zip(range(100), cycle(actor_portals))
):
await portal.run(update_local_state, msg={f'msg_{i}': count})

# blocks here indefinitely synce we spawned "daemon actors" using
# .start_actor()`, you'll need to control-c to cancel.


if __name__ == '__main__':
trio.run(main)
119 changes: 119 additions & 0 deletions examples/parallelism/concurrent_actors_primes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Demonstration of the prime number detector example from the
``concurrent.futures`` docs:

https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example

This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s channels.

"""
from contextlib import asynccontextmanager
from typing import List, Callable
import itertools
import math
import time

import tractor
import trio
from async_generator import aclosing


PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
]


def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


@asynccontextmanager
async def worker_pool(workers=4):
"""Though it's a trivial special case for ``tractor``, the well
known "worker pool" seems to be the defacto "but, I want this
process pattern!" for most parallelism pilgrims.

Yes, the workers stay alive (and ready for work) until you close
the context.
"""
async with tractor.open_nursery() as tn:

portals = []
snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES))

for i in range(workers):

# this starts a new sub-actor (process + trio runtime) and
# stores it's "portal" for later use to "submit jobs" (ugh).
portals.append(
await tn.start_actor(
f'worker_{i}',
enable_modules=[__name__],
)
)

async def _map(
worker_func: Callable[[int], bool],
sequence: List[int]
) -> List[bool]:

# define an async (local) task to collect results from workers
async def send_result(func, value, portal):
await snd_chan.send((value, await portal.run(func, n=value)))

async with trio.open_nursery() as n:

for value, portal in zip(sequence, itertools.cycle(portals)):
n.start_soon(
send_result,
worker_func,
value,
portal
)

# deliver results as they arrive
for _ in range(len(sequence)):
yield await recv_chan.receive()

# deliver the parallel "worker mapper" to user code
yield _map

# tear down all "workers" on pool close
await tn.cancel()


async def main():

async with worker_pool() as actor_map:

start = time.time()

async with aclosing(actor_map(is_prime, PRIMES)) as results:
async for number, prime in results:

print(f'{number} is prime: {prime}')

print(f'processing took {time.time() - start} seconds')


if __name__ == '__main__':
start = time.time()
trio.run(main)
print(f'script took {time.time() - start} seconds')
40 changes: 40 additions & 0 deletions examples/parallelism/concurrent_futures_primes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import time
import concurrent.futures
import math

PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]

def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
start = time.time()

for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))

print(f'processing took {time.time() - start} seconds')

if __name__ == '__main__':

start = time.time()
main()
print(f'script took {time.time() - start} seconds')
37 changes: 37 additions & 0 deletions examples/parallelism/we_are_processes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
Run with a process monitor from a terminal using:
$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $!

"""
from multiprocessing import cpu_count
import os

import tractor
import trio


async def target():
print(f"Yo, i'm '{tractor.current_actor().name}' "
f"running in pid {os.getpid()}")
await trio.sleep_forever()


async def main():

async with tractor.open_nursery() as n:

for i in range(cpu_count()):
await n.run_in_actor(target, name=f'worker_{i}')

print('This process tree will self-destruct in 1 sec...')
await trio.sleep(1)

# you could have done this yourself
raise Exception('Self Destructed')


if __name__ == '__main__':
try:
trio.run(main)
except Exception:
print('Zombies Contained')
16 changes: 8 additions & 8 deletions tests/test_docs_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ def run(script_code):

@pytest.mark.parametrize(
'example_script',
[
f for f in os.listdir(examples_dir())
if (
('__' not in f) and
('debugging' not in f)
)
],

# walk yields: (dirpath, dirnames, filenames)
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2]

if '__' not in f
and 'debugging' not in p[0]
]
)
def test_example(run_example_in_subproc, example_script):
"""Load and run scripts from this repo's ``examples/`` dir as a user
Expand All @@ -95,7 +95,7 @@ def test_example(run_example_in_subproc, example_script):
test directory and invoke the script as a module with ``python -m
test_example``.
"""
ex_file = os.path.join(examples_dir(), example_script)
ex_file = os.path.join(*example_script)
with open(ex_file, 'r') as ex:
code = ex.read()

Expand Down
2 changes: 2 additions & 0 deletions tractor/_forkserver_override.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
.. note:: There is no type hinting in this code base (yet) to remain as
a close as possible to upstream.
"""
# type: ignore

import os
import socket
import signal
Expand Down
4 changes: 2 additions & 2 deletions tractor/_mp_fixup_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _fixup_main_from_name(mod_name: str) -> None:
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_module(mod_name,
run_name="__mp_main__",
alter_sys=True)
alter_sys=True) # type: ignore
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module

Expand Down Expand Up @@ -95,6 +95,6 @@ def _fixup_main_from_path(main_path: str) -> None:
# old_main_modules.append(current_main)
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_path(main_path,
run_name="__mp_main__")
run_name="__mp_main__") # type: ignore
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module