From d97e6c84432c39b06f88251cdabad0baf3bc8019 Mon Sep 17 00:00:00 2001 From: Jeff Bloss <50282451+JBloss1517@users.noreply.github.com> Date: Wed, 29 Oct 2025 13:24:04 -0500 Subject: [PATCH 1/3] Document process execution helpers --- README.md | 72 ++++++++++++++++++++++++++++++ examples/process_pool_example.py | 58 ++++++++++++++++++++++++ src/dag_simple/__init__.py | 10 +++++ src/dag_simple/execution.py | 75 ++++++++++++++++++++++++++++++++ tests/__init__.py | 1 + tests/process_nodes.py | 23 ++++++++++ tests/test_process_execution.py | 35 +++++++++++++++ 7 files changed, 274 insertions(+) create mode 100644 examples/process_pool_example.py create mode 100644 tests/__init__.py create mode 100644 tests/process_nodes.py create mode 100644 tests/test_process_execution.py diff --git a/README.md b/README.md index 67b2fa6..fbd0427 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,78 @@ order_service.add_node(get_data) - โœ… Want automatic concurrent async execution - โœ… Need namespace separation +## ๐Ÿงต Run DAGs in Parallel Processes + +Some workloads are CPU-bound or need to run completely isolated from the main interpreter. The +helpers `run_sync_in_process` and `run_async_in_process` let you offload an entire DAG evaluation to +`ProcessPoolExecutor` workers, giving you true parallelism across CPU cores and freeing the main +event loop/thread. + +**When to use process execution:** + +- โœ… CPU-intensive nodes that would block the event loop or main thread +- โœ… Run multiple DAGs side-by-side without interference +- โœ… Isolate long-running jobs from short-lived interactive work + +**Things to keep in mind:** + +- All node functions, their inputs, and their outputs must be picklable so they can cross process + boundaries. +- Each process has its own `ExecutionContext`. Cached results are not shared across processes unless + you design explicit shared storage. +- Spawning processes has overhead. Reuse a `ProcessPoolExecutor` when launching many DAG runs. + +```python +import asyncio +from concurrent.futures import ProcessPoolExecutor + +from dag_simple import node, run_async_in_process, run_sync_in_process + + +@node() +def make_numbers(seed: int) -> list[int]: + return [seed + i for i in range(5)] + + +@node(deps=[make_numbers]) +def total_energy(make_numbers: list[int]) -> int: + # Represent a CPU-heavy loop + return sum(value * value for value in make_numbers) + + +@node() +async def fetch_multiplier() -> int: + await asyncio.sleep(0.1) + return 2 + + +@node(deps=[total_energy, fetch_multiplier]) +def scaled(total_energy: int, fetch_multiplier: int) -> int: + return total_energy * fetch_multiplier + + +def main() -> None: + result = run_sync_in_process(total_energy, seed=10) + print(result) # executes in a worker process + + with ProcessPoolExecutor() as pool: + parallel = [ + run_sync_in_process(total_energy, executor=pool, seed=seed) + for seed in range(3) + ] + print(parallel) + + scaled_result = run_async_in_process(scaled, executor=pool, seed=5) + print(scaled_result) + + +if __name__ == "__main__": + main() +``` + +See [`examples/process_pool_example.py`](examples/process_pool_example.py) for a complete runnable +walkthrough, including sharing a process pool across many DAG invocations. + ## ๐Ÿ“– Core Concepts ### Nodes diff --git a/examples/process_pool_example.py b/examples/process_pool_example.py new file mode 100644 index 0000000..462752d --- /dev/null +++ b/examples/process_pool_example.py @@ -0,0 +1,58 @@ +"""Example showing how to execute DAG runs inside worker processes.""" + +from __future__ import annotations + +import asyncio +from concurrent.futures import ProcessPoolExecutor + +from dag_simple import node, run_async_in_process, run_sync_in_process + + +@node() +def make_numbers(seed: int) -> list[int]: + """Generate a small range of numbers from a seed.""" + return [seed + offset for offset in range(5)] + + +@node(deps=[make_numbers]) +def total_energy(make_numbers: list[int]) -> int: + """Pretend CPU-bound work that squares numbers and sums them.""" + total = 0 + for value in make_numbers: + for _ in range(10_000): # small loop to simulate work without being too slow + total += value * value + return total + + +@node() +async def fetch_multiplier() -> int: + """Async dependency that might contact a remote service.""" + await asyncio.sleep(0.1) + return 2 + + +@node(deps=[total_energy, fetch_multiplier]) +def scaled_total(total_energy: int, fetch_multiplier: int) -> int: + """Combine sync and async dependencies.""" + return total_energy * fetch_multiplier + + +def main() -> None: + """Show different ways to reuse worker processes for DAG execution.""" + print("Single run in a dedicated worker process:") + print(run_sync_in_process(total_energy, seed=10)) + + print("\nReusing a shared process pool for multiple runs:") + with ProcessPoolExecutor(max_workers=2) as pool: + parallel_runs = [ + run_sync_in_process(total_energy, executor=pool, seed=seed) + for seed in range(3) + ] + print(parallel_runs) + + print("\nMixing sync and async DAGs in the same pool:") + print(run_async_in_process(scaled_total, executor=pool, seed=5)) + + +if __name__ == "__main__": + main() diff --git a/src/dag_simple/__init__.py b/src/dag_simple/__init__.py index bdbe9bb..8456362 100644 --- a/src/dag_simple/__init__.py +++ b/src/dag_simple/__init__.py @@ -16,6 +16,12 @@ MissingDependencyError, ValidationError, ) +from dag_simple.execution import ( + run_async, + run_async_in_process, + run_sync, + run_sync_in_process, +) from dag_simple.node import Node, input_node, node __version__ = "0.1.0" @@ -29,4 +35,8 @@ "ValidationError", "MissingDependencyError", "ExecutionContext", + "run_sync", + "run_async", + "run_sync_in_process", + "run_async_in_process", ] diff --git a/src/dag_simple/execution.py b/src/dag_simple/execution.py index 8e38003..084c2b6 100644 --- a/src/dag_simple/execution.py +++ b/src/dag_simple/execution.py @@ -6,6 +6,7 @@ import asyncio import inspect +from concurrent.futures import ProcessPoolExecutor from typing import TYPE_CHECKING, Any, TypeVar, cast from dag_simple.context import ExecutionContext @@ -232,3 +233,77 @@ async def _execute_node_without_cache( validate_output_type(node, result, node.type_hints) return cast(R, result) + + +def run_sync_in_process( + node: Node[R], + *, + enable_cache: bool = True, + executor: ProcessPoolExecutor | None = None, + **inputs: Any, +) -> R: + """Execute ``run_sync`` inside a worker process. + + Args: + node: The root node to execute. + enable_cache: Whether to enable caching for this execution. + executor: Optional ``ProcessPoolExecutor`` to submit the work to. When + omitted, a temporary single-worker executor is created for the call. + **inputs: Additional keyword arguments passed as DAG inputs. + + Returns: + The result returned by ``run_sync``. + """ + + if executor is not None: + future = executor.submit(_run_sync_entry_point, node, enable_cache, inputs) + return future.result() + + with ProcessPoolExecutor(max_workers=1) as process_pool: + future = process_pool.submit(_run_sync_entry_point, node, enable_cache, inputs) + return future.result() + + +def run_async_in_process( + node: Node[R], + *, + enable_cache: bool = True, + executor: ProcessPoolExecutor | None = None, + **inputs: Any, +) -> R: + """Execute ``run_async`` inside a worker process. + + Args: + node: The root node to execute. + enable_cache: Whether to enable caching for this execution. + executor: Optional ``ProcessPoolExecutor`` to submit the work to. When + omitted, a temporary single-worker executor is created for the call. + **inputs: Additional keyword arguments passed as DAG inputs. + + Returns: + The result returned by ``run_async``. + """ + + if executor is not None: + future = executor.submit(_run_async_entry_point, node, enable_cache, inputs) + return future.result() + + with ProcessPoolExecutor(max_workers=1) as process_pool: + future = process_pool.submit(_run_async_entry_point, node, enable_cache, inputs) + return future.result() + + +def _run_sync_entry_point( + node: Node[R], enable_cache: bool, inputs: dict[str, Any] +) -> R: + """Process entry point for ``run_sync_in_process``.""" + + return run_sync(node, enable_cache=enable_cache, **inputs) # pragma: no cover + + +def _run_async_entry_point( + node: Node[R], enable_cache: bool, inputs: dict[str, Any] +) -> R: + """Process entry point for ``run_async_in_process``.""" + + return asyncio.run(run_async(node, enable_cache=enable_cache, **inputs)) # pragma: no cover diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..f8d8de3 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Test package for dag_simple.""" diff --git a/tests/process_nodes.py b/tests/process_nodes.py new file mode 100644 index 0000000..cb83cb3 --- /dev/null +++ b/tests/process_nodes.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import asyncio + +from dag_simple.node import Node + + +def _base_value() -> int: + return 2 + + +def _double(base_value: int) -> int: + return base_value * 2 + + +async def _add_async(base_value: int) -> int: + await asyncio.sleep(0) + return base_value + 3 + + +base_value = Node(_base_value, name="base_value") +double = Node(_double, name="double", deps=[base_value]) +add_async = Node(_add_async, name="add_async", deps=[base_value]) diff --git a/tests/test_process_execution.py b/tests/test_process_execution.py new file mode 100644 index 0000000..21aee02 --- /dev/null +++ b/tests/test_process_execution.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from concurrent.futures import ProcessPoolExecutor + +from dag_simple.execution import run_async_in_process, run_sync_in_process + +from .process_nodes import add_async, double + + +def test_run_sync_in_process_returns_value() -> None: + result = run_sync_in_process(double) + assert result == 4 + + +def test_run_async_in_process_returns_value() -> None: + result = run_async_in_process(add_async) + assert result == 5 + + +def test_run_sync_in_process_with_custom_executor() -> None: + with ProcessPoolExecutor(max_workers=1) as executor: + result_one = run_sync_in_process(double, executor=executor) + result_two = run_sync_in_process(double, executor=executor) + + assert result_one == 4 + assert result_two == 4 + + +def test_run_async_in_process_with_custom_executor() -> None: + with ProcessPoolExecutor(max_workers=1) as executor: + result_one = run_async_in_process(add_async, executor=executor) + result_two = run_async_in_process(add_async, executor=executor) + + assert result_one == 5 + assert result_two == 5 From c9bf6e2f959e4f3f36549b6941b44a992ec97740 Mon Sep 17 00:00:00 2001 From: Jeff Bloss <50282451+JBloss1517@users.noreply.github.com> Date: Wed, 29 Oct 2025 13:51:45 -0500 Subject: [PATCH 2/3] Clarify process execution example for async nodes --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index fbd0427..92e2eae 100644 --- a/README.md +++ b/README.md @@ -220,6 +220,12 @@ event loop/thread. you design explicit shared storage. - Spawning processes has overhead. Reuse a `ProcessPoolExecutor` when launching many DAG runs. +The synchronous helper will raise an error if the selected target or any of its +dependencies are asynchronous. In the example below we call it on +``total_energy``, whose branch is entirely synchronous, and use +``run_async_in_process`` for the combined branch that includes the async +``fetch_multiplier`` node. + ```python import asyncio from concurrent.futures import ProcessPoolExecutor @@ -238,6 +244,8 @@ def total_energy(make_numbers: list[int]) -> int: return sum(value * value for value in make_numbers) +# The synchronous helper can only target DAGs made of synchronous nodes. +# We keep the async dependency in a separate branch for the async example below. @node() async def fetch_multiplier() -> int: await asyncio.sleep(0.1) From f64411970d31881209cf5715cc2b346a3ab475b6 Mon Sep 17 00:00:00 2001 From: Jeff Bloss <50282451+JBloss1517@users.noreply.github.com> Date: Wed, 29 Oct 2025 13:51:49 -0500 Subject: [PATCH 3/3] Assert process README example outputs --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 92e2eae..238f81a 100644 --- a/README.md +++ b/README.md @@ -260,6 +260,7 @@ def scaled(total_energy: int, fetch_multiplier: int) -> int: def main() -> None: result = run_sync_in_process(total_energy, seed=10) print(result) # executes in a worker process + assert result == 730 with ProcessPoolExecutor() as pool: parallel = [ @@ -267,9 +268,11 @@ def main() -> None: for seed in range(3) ] print(parallel) + assert parallel == [30, 55, 90] scaled_result = run_async_in_process(scaled, executor=pool, seed=5) print(scaled_result) + assert scaled_result == 510 if __name__ == "__main__":