Skip to content

Commit d97e6c8

Browse files
committed
Document process execution helpers
1 parent 38a1d46 commit d97e6c8

File tree

7 files changed

+274
-0
lines changed

7 files changed

+274
-0
lines changed

README.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,78 @@ order_service.add_node(get_data)
199199
- ✅ Want automatic concurrent async execution
200200
- ✅ Need namespace separation
201201

202+
## 🧵 Run DAGs in Parallel Processes
203+
204+
Some workloads are CPU-bound or need to run completely isolated from the main interpreter. The
205+
helpers `run_sync_in_process` and `run_async_in_process` let you offload an entire DAG evaluation to
206+
`ProcessPoolExecutor` workers, giving you true parallelism across CPU cores and freeing the main
207+
event loop/thread.
208+
209+
**When to use process execution:**
210+
211+
- ✅ CPU-intensive nodes that would block the event loop or main thread
212+
- ✅ Run multiple DAGs side-by-side without interference
213+
- ✅ Isolate long-running jobs from short-lived interactive work
214+
215+
**Things to keep in mind:**
216+
217+
- All node functions, their inputs, and their outputs must be picklable so they can cross process
218+
boundaries.
219+
- Each process has its own `ExecutionContext`. Cached results are not shared across processes unless
220+
you design explicit shared storage.
221+
- Spawning processes has overhead. Reuse a `ProcessPoolExecutor` when launching many DAG runs.
222+
223+
```python
224+
import asyncio
225+
from concurrent.futures import ProcessPoolExecutor
226+
227+
from dag_simple import node, run_async_in_process, run_sync_in_process
228+
229+
230+
@node()
231+
def make_numbers(seed: int) -> list[int]:
232+
return [seed + i for i in range(5)]
233+
234+
235+
@node(deps=[make_numbers])
236+
def total_energy(make_numbers: list[int]) -> int:
237+
# Represent a CPU-heavy loop
238+
return sum(value * value for value in make_numbers)
239+
240+
241+
@node()
242+
async def fetch_multiplier() -> int:
243+
await asyncio.sleep(0.1)
244+
return 2
245+
246+
247+
@node(deps=[total_energy, fetch_multiplier])
248+
def scaled(total_energy: int, fetch_multiplier: int) -> int:
249+
return total_energy * fetch_multiplier
250+
251+
252+
def main() -> None:
253+
result = run_sync_in_process(total_energy, seed=10)
254+
print(result) # executes in a worker process
255+
256+
with ProcessPoolExecutor() as pool:
257+
parallel = [
258+
run_sync_in_process(total_energy, executor=pool, seed=seed)
259+
for seed in range(3)
260+
]
261+
print(parallel)
262+
263+
scaled_result = run_async_in_process(scaled, executor=pool, seed=5)
264+
print(scaled_result)
265+
266+
267+
if __name__ == "__main__":
268+
main()
269+
```
270+
271+
See [`examples/process_pool_example.py`](examples/process_pool_example.py) for a complete runnable
272+
walkthrough, including sharing a process pool across many DAG invocations.
273+
202274
## 📖 Core Concepts
203275

204276
### Nodes

examples/process_pool_example.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
"""Example showing how to execute DAG runs inside worker processes."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
from concurrent.futures import ProcessPoolExecutor
7+
8+
from dag_simple import node, run_async_in_process, run_sync_in_process
9+
10+
11+
@node()
12+
def make_numbers(seed: int) -> list[int]:
13+
"""Generate a small range of numbers from a seed."""
14+
return [seed + offset for offset in range(5)]
15+
16+
17+
@node(deps=[make_numbers])
18+
def total_energy(make_numbers: list[int]) -> int:
19+
"""Pretend CPU-bound work that squares numbers and sums them."""
20+
total = 0
21+
for value in make_numbers:
22+
for _ in range(10_000): # small loop to simulate work without being too slow
23+
total += value * value
24+
return total
25+
26+
27+
@node()
28+
async def fetch_multiplier() -> int:
29+
"""Async dependency that might contact a remote service."""
30+
await asyncio.sleep(0.1)
31+
return 2
32+
33+
34+
@node(deps=[total_energy, fetch_multiplier])
35+
def scaled_total(total_energy: int, fetch_multiplier: int) -> int:
36+
"""Combine sync and async dependencies."""
37+
return total_energy * fetch_multiplier
38+
39+
40+
def main() -> None:
41+
"""Show different ways to reuse worker processes for DAG execution."""
42+
print("Single run in a dedicated worker process:")
43+
print(run_sync_in_process(total_energy, seed=10))
44+
45+
print("\nReusing a shared process pool for multiple runs:")
46+
with ProcessPoolExecutor(max_workers=2) as pool:
47+
parallel_runs = [
48+
run_sync_in_process(total_energy, executor=pool, seed=seed)
49+
for seed in range(3)
50+
]
51+
print(parallel_runs)
52+
53+
print("\nMixing sync and async DAGs in the same pool:")
54+
print(run_async_in_process(scaled_total, executor=pool, seed=5))
55+
56+
57+
if __name__ == "__main__":
58+
main()

src/dag_simple/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
MissingDependencyError,
1717
ValidationError,
1818
)
19+
from dag_simple.execution import (
20+
run_async,
21+
run_async_in_process,
22+
run_sync,
23+
run_sync_in_process,
24+
)
1925
from dag_simple.node import Node, input_node, node
2026

2127
__version__ = "0.1.0"
@@ -29,4 +35,8 @@
2935
"ValidationError",
3036
"MissingDependencyError",
3137
"ExecutionContext",
38+
"run_sync",
39+
"run_async",
40+
"run_sync_in_process",
41+
"run_async_in_process",
3242
]

src/dag_simple/execution.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import asyncio
88
import inspect
9+
from concurrent.futures import ProcessPoolExecutor
910
from typing import TYPE_CHECKING, Any, TypeVar, cast
1011

1112
from dag_simple.context import ExecutionContext
@@ -232,3 +233,77 @@ async def _execute_node_without_cache(
232233
validate_output_type(node, result, node.type_hints)
233234

234235
return cast(R, result)
236+
237+
238+
def run_sync_in_process(
239+
node: Node[R],
240+
*,
241+
enable_cache: bool = True,
242+
executor: ProcessPoolExecutor | None = None,
243+
**inputs: Any,
244+
) -> R:
245+
"""Execute ``run_sync`` inside a worker process.
246+
247+
Args:
248+
node: The root node to execute.
249+
enable_cache: Whether to enable caching for this execution.
250+
executor: Optional ``ProcessPoolExecutor`` to submit the work to. When
251+
omitted, a temporary single-worker executor is created for the call.
252+
**inputs: Additional keyword arguments passed as DAG inputs.
253+
254+
Returns:
255+
The result returned by ``run_sync``.
256+
"""
257+
258+
if executor is not None:
259+
future = executor.submit(_run_sync_entry_point, node, enable_cache, inputs)
260+
return future.result()
261+
262+
with ProcessPoolExecutor(max_workers=1) as process_pool:
263+
future = process_pool.submit(_run_sync_entry_point, node, enable_cache, inputs)
264+
return future.result()
265+
266+
267+
def run_async_in_process(
268+
node: Node[R],
269+
*,
270+
enable_cache: bool = True,
271+
executor: ProcessPoolExecutor | None = None,
272+
**inputs: Any,
273+
) -> R:
274+
"""Execute ``run_async`` inside a worker process.
275+
276+
Args:
277+
node: The root node to execute.
278+
enable_cache: Whether to enable caching for this execution.
279+
executor: Optional ``ProcessPoolExecutor`` to submit the work to. When
280+
omitted, a temporary single-worker executor is created for the call.
281+
**inputs: Additional keyword arguments passed as DAG inputs.
282+
283+
Returns:
284+
The result returned by ``run_async``.
285+
"""
286+
287+
if executor is not None:
288+
future = executor.submit(_run_async_entry_point, node, enable_cache, inputs)
289+
return future.result()
290+
291+
with ProcessPoolExecutor(max_workers=1) as process_pool:
292+
future = process_pool.submit(_run_async_entry_point, node, enable_cache, inputs)
293+
return future.result()
294+
295+
296+
def _run_sync_entry_point(
297+
node: Node[R], enable_cache: bool, inputs: dict[str, Any]
298+
) -> R:
299+
"""Process entry point for ``run_sync_in_process``."""
300+
301+
return run_sync(node, enable_cache=enable_cache, **inputs) # pragma: no cover
302+
303+
304+
def _run_async_entry_point(
305+
node: Node[R], enable_cache: bool, inputs: dict[str, Any]
306+
) -> R:
307+
"""Process entry point for ``run_async_in_process``."""
308+
309+
return asyncio.run(run_async(node, enable_cache=enable_cache, **inputs)) # pragma: no cover

tests/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Test package for dag_simple."""

tests/process_nodes.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
5+
from dag_simple.node import Node
6+
7+
8+
def _base_value() -> int:
9+
return 2
10+
11+
12+
def _double(base_value: int) -> int:
13+
return base_value * 2
14+
15+
16+
async def _add_async(base_value: int) -> int:
17+
await asyncio.sleep(0)
18+
return base_value + 3
19+
20+
21+
base_value = Node(_base_value, name="base_value")
22+
double = Node(_double, name="double", deps=[base_value])
23+
add_async = Node(_add_async, name="add_async", deps=[base_value])

tests/test_process_execution.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from __future__ import annotations
2+
3+
from concurrent.futures import ProcessPoolExecutor
4+
5+
from dag_simple.execution import run_async_in_process, run_sync_in_process
6+
7+
from .process_nodes import add_async, double
8+
9+
10+
def test_run_sync_in_process_returns_value() -> None:
11+
result = run_sync_in_process(double)
12+
assert result == 4
13+
14+
15+
def test_run_async_in_process_returns_value() -> None:
16+
result = run_async_in_process(add_async)
17+
assert result == 5
18+
19+
20+
def test_run_sync_in_process_with_custom_executor() -> None:
21+
with ProcessPoolExecutor(max_workers=1) as executor:
22+
result_one = run_sync_in_process(double, executor=executor)
23+
result_two = run_sync_in_process(double, executor=executor)
24+
25+
assert result_one == 4
26+
assert result_two == 4
27+
28+
29+
def test_run_async_in_process_with_custom_executor() -> None:
30+
with ProcessPoolExecutor(max_workers=1) as executor:
31+
result_one = run_async_in_process(add_async, executor=executor)
32+
result_two = run_async_in_process(add_async, executor=executor)
33+
34+
assert result_one == 5
35+
assert result_two == 5

0 commit comments

Comments
 (0)