Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,89 @@ order_service.add_node(get_data)
- ✅ Want automatic concurrent async execution
- ✅ Need namespace separation

## 🧵 Run DAGs in Parallel Processes

Some workloads are CPU-bound or need to run completely isolated from the main interpreter. The
helpers `run_sync_in_process` and `run_async_in_process` let you offload an entire DAG evaluation to
`ProcessPoolExecutor` workers, giving you true parallelism across CPU cores and freeing the main
event loop/thread.

**When to use process execution:**

- ✅ CPU-intensive nodes that would block the event loop or main thread
- ✅ Run multiple DAGs side-by-side without interference
- ✅ Isolate long-running jobs from short-lived interactive work

**Things to keep in mind:**

- All node functions, their inputs, and their outputs must be picklable so they can cross process
boundaries.
- Each process has its own `ExecutionContext`. Cached results are not shared across processes unless
you design explicit shared storage.
- Spawning processes has overhead. Reuse a `ProcessPoolExecutor` when launching many DAG runs.

The synchronous helper will raise an error if the selected target or any of its
dependencies are asynchronous. In the example below we call it on
``total_energy``, whose branch is entirely synchronous, and use
``run_async_in_process`` for the combined branch that includes the async
``fetch_multiplier`` node.

```python
import asyncio
from concurrent.futures import ProcessPoolExecutor

from dag_simple import node, run_async_in_process, run_sync_in_process


@node()
def make_numbers(seed: int) -> list[int]:
return [seed + i for i in range(5)]


@node(deps=[make_numbers])
def total_energy(make_numbers: list[int]) -> int:
# Represent a CPU-heavy loop
return sum(value * value for value in make_numbers)


# The synchronous helper can only target DAGs made of synchronous nodes.
# We keep the async dependency in a separate branch for the async example below.
@node()
async def fetch_multiplier() -> int:
await asyncio.sleep(0.1)
return 2


@node(deps=[total_energy, fetch_multiplier])
def scaled(total_energy: int, fetch_multiplier: int) -> int:
return total_energy * fetch_multiplier


def main() -> None:
result = run_sync_in_process(total_energy, seed=10)
print(result) # executes in a worker process
assert result == 730

with ProcessPoolExecutor() as pool:
parallel = [
run_sync_in_process(total_energy, executor=pool, seed=seed)
for seed in range(3)
]
print(parallel)
assert parallel == [30, 55, 90]

scaled_result = run_async_in_process(scaled, executor=pool, seed=5)
print(scaled_result)
assert scaled_result == 510


if __name__ == "__main__":
main()
```

See [`examples/process_pool_example.py`](examples/process_pool_example.py) for a complete runnable
walkthrough, including sharing a process pool across many DAG invocations.

## 📖 Core Concepts

### Nodes
Expand Down
58 changes: 58 additions & 0 deletions examples/process_pool_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Example showing how to execute DAG runs inside worker processes."""

from __future__ import annotations

import asyncio
from concurrent.futures import ProcessPoolExecutor

from dag_simple import node, run_async_in_process, run_sync_in_process


@node()
def make_numbers(seed: int) -> list[int]:
"""Generate a small range of numbers from a seed."""
return [seed + offset for offset in range(5)]


@node(deps=[make_numbers])
def total_energy(make_numbers: list[int]) -> int:
"""Pretend CPU-bound work that squares numbers and sums them."""
total = 0
for value in make_numbers:
for _ in range(10_000): # small loop to simulate work without being too slow
total += value * value
return total


@node()
async def fetch_multiplier() -> int:
"""Async dependency that might contact a remote service."""
await asyncio.sleep(0.1)
return 2


@node(deps=[total_energy, fetch_multiplier])
def scaled_total(total_energy: int, fetch_multiplier: int) -> int:
"""Combine sync and async dependencies."""
return total_energy * fetch_multiplier


def main() -> None:
"""Show different ways to reuse worker processes for DAG execution."""
print("Single run in a dedicated worker process:")
print(run_sync_in_process(total_energy, seed=10))

print("\nReusing a shared process pool for multiple runs:")
with ProcessPoolExecutor(max_workers=2) as pool:
parallel_runs = [
run_sync_in_process(total_energy, executor=pool, seed=seed)
for seed in range(3)
]
print(parallel_runs)

print("\nMixing sync and async DAGs in the same pool:")
print(run_async_in_process(scaled_total, executor=pool, seed=5))


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions src/dag_simple/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
MissingDependencyError,
ValidationError,
)
from dag_simple.execution import (
run_async,
run_async_in_process,
run_sync,
run_sync_in_process,
)
from dag_simple.node import Node, input_node, node

__version__ = "0.1.0"
Expand All @@ -29,4 +35,8 @@
"ValidationError",
"MissingDependencyError",
"ExecutionContext",
"run_sync",
"run_async",
"run_sync_in_process",
"run_async_in_process",
]
75 changes: 75 additions & 0 deletions src/dag_simple/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import asyncio
import inspect
from concurrent.futures import ProcessPoolExecutor
from typing import TYPE_CHECKING, Any, TypeVar, cast

from dag_simple.context import ExecutionContext
Expand Down Expand Up @@ -232,3 +233,77 @@ async def _execute_node_without_cache(
validate_output_type(node, result, node.type_hints)

return cast(R, result)


def run_sync_in_process(
node: Node[R],
*,
enable_cache: bool = True,
executor: ProcessPoolExecutor | None = None,
**inputs: Any,
) -> R:
"""Execute ``run_sync`` inside a worker process.

Args:
node: The root node to execute.
enable_cache: Whether to enable caching for this execution.
executor: Optional ``ProcessPoolExecutor`` to submit the work to. When
omitted, a temporary single-worker executor is created for the call.
**inputs: Additional keyword arguments passed as DAG inputs.

Returns:
The result returned by ``run_sync``.
"""

if executor is not None:
future = executor.submit(_run_sync_entry_point, node, enable_cache, inputs)
return future.result()

with ProcessPoolExecutor(max_workers=1) as process_pool:
future = process_pool.submit(_run_sync_entry_point, node, enable_cache, inputs)
return future.result()


def run_async_in_process(
node: Node[R],
*,
enable_cache: bool = True,
executor: ProcessPoolExecutor | None = None,
**inputs: Any,
) -> R:
"""Execute ``run_async`` inside a worker process.

Args:
node: The root node to execute.
enable_cache: Whether to enable caching for this execution.
executor: Optional ``ProcessPoolExecutor`` to submit the work to. When
omitted, a temporary single-worker executor is created for the call.
**inputs: Additional keyword arguments passed as DAG inputs.

Returns:
The result returned by ``run_async``.
"""

if executor is not None:
future = executor.submit(_run_async_entry_point, node, enable_cache, inputs)
return future.result()

with ProcessPoolExecutor(max_workers=1) as process_pool:
future = process_pool.submit(_run_async_entry_point, node, enable_cache, inputs)
return future.result()


def _run_sync_entry_point(
node: Node[R], enable_cache: bool, inputs: dict[str, Any]
) -> R:
"""Process entry point for ``run_sync_in_process``."""

return run_sync(node, enable_cache=enable_cache, **inputs) # pragma: no cover


def _run_async_entry_point(
node: Node[R], enable_cache: bool, inputs: dict[str, Any]
) -> R:
"""Process entry point for ``run_async_in_process``."""

return asyncio.run(run_async(node, enable_cache=enable_cache, **inputs)) # pragma: no cover
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Test package for dag_simple."""
23 changes: 23 additions & 0 deletions tests/process_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

import asyncio

from dag_simple.node import Node


def _base_value() -> int:
return 2


def _double(base_value: int) -> int:
return base_value * 2


async def _add_async(base_value: int) -> int:
await asyncio.sleep(0)
return base_value + 3


base_value = Node(_base_value, name="base_value")
double = Node(_double, name="double", deps=[base_value])
add_async = Node(_add_async, name="add_async", deps=[base_value])
35 changes: 35 additions & 0 deletions tests/test_process_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

from concurrent.futures import ProcessPoolExecutor

from dag_simple.execution import run_async_in_process, run_sync_in_process

from .process_nodes import add_async, double


def test_run_sync_in_process_returns_value() -> None:
result = run_sync_in_process(double)
assert result == 4


def test_run_async_in_process_returns_value() -> None:
result = run_async_in_process(add_async)
assert result == 5


def test_run_sync_in_process_with_custom_executor() -> None:
with ProcessPoolExecutor(max_workers=1) as executor:
result_one = run_sync_in_process(double, executor=executor)
result_two = run_sync_in_process(double, executor=executor)

assert result_one == 4
assert result_two == 4


def test_run_async_in_process_with_custom_executor() -> None:
with ProcessPoolExecutor(max_workers=1) as executor:
result_one = run_async_in_process(add_async, executor=executor)
result_two = run_async_in_process(add_async, executor=executor)

assert result_one == 5
assert result_two == 5
Loading