Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Overview

Pydra is a dataflow engine for constructing and executing directed acyclic graphs (DAGs) of tasks. It is the core for Nipype 2.0. Requires Python 3.11+. Uses `attrs` extensively for dataclass-like definitions and `hatchling` + `hatch-vcs` as the build system (version derived from git tags).

## Commands

### Install

```bash
pip install -e ".[dev]" # full dev install (includes test + lint deps)
pip install -e ".[test]" # test deps only
pip install -e ".[doc]" # doc deps only
```

### Testing

```bash
pytest pydra # full test suite (parallel, with coverage)
pytest pydra/engine/tests/test_job.py # single test file
pytest pydra/engine/tests/test_job.py::test_my_func # single test
pytest pydra --only-worker=debug # single-process worker (good for debugging)
pytest pydra --only-worker=cf # ConcurrentFutures worker
pytest pydra --only-worker=slurm # SLURM (requires sbatch)
pytest pydra --with-dask # Dask worker
```

Set `_PYTEST_RAISE=1` for IDE breakpoint-friendly exception propagation.

Tests include doctests (`--doctest-modules` is on). `xfail_strict = true` means unexpected passes fail.

### Linting / Formatting

```bash
tox -e style # check style (ruff)
tox -e style-fix # auto-fix style
tox -e spellcheck # codespell check
pre-commit run --all-files # black + flake8 + codespell + nbstripout
black pydra
flake8 # max-line-length=105, ignores E203/W503/F541
```

### Tox Environments

```bash
tox -e py311-latest # Python 3.11, latest deps
tox -e py313-pre # Python 3.13, pre-release deps
tox -e py311-min # Python 3.11, minimum pinned deps
```

### Docs

```bash
make -C docs html
```

### CLI

```bash
pydracli crash <crashfile> # display crash file
pydracli crash <crashfile> --rerun # rerun crashed job
```

## Architecture

### Layer Overview

```
compose/ Task definition (decorators + task types)
engine/ Execution engine (graph, jobs, state, submitter)
workers/ Execution backends (cf, debug, slurm, sge, + plugins)
environments/ Execution environments (native, docker, singularity, lmod)
utils/ Hashing, typing helpers, plugin discovery, profiling
tasks/ Built-in reusable tasks
scripts/ CLI entry points
```

### Task Definition (`compose/`)

Three task flavors, each defined by a decorator:

**Python tasks** — wrap a Python function:
```python
@python.define
def Add(a: int, b: int) -> int:
return a + b
```

**Shell tasks** — wrap a CLI command:
```python
@shell.define
class BET(shell.Task["BET.Outputs"]):
executable = "bet"
input_image: File = shell.arg(argstr="{input_image}", position=1)
```

**Workflow tasks** — compose other tasks into a DAG:
```python
@workflow.define
def MyWorkflow(x: int) -> int:
node_a = workflow.add(Add(a=x, b=1))
return node_a.out
```

The decorator machinery is in `compose/base/builder.py` (`build_task_class()`). It converts `Arg`/`Out` field specs into `attrs` fields and dynamically creates a `Task` class + a paired `Outputs` class.

Key base types: `compose.base.Field`, `Arg`, `Out`, `Task[OutputType]`, `Outputs`.

### Execution Engine (`engine/`)

**Workflow construction** (`engine/workflow.py`): `Workflow.construct(task)` runs the workflow definition function to discover nodes and wire the `DiGraph`. Constructed workflows are cached by content hash.

**Node** (`engine/node.py`): Wraps a `Task` inside a workflow. Holds the task, its `State`, optional `Environment`, and `TaskHooks`. Exposes `lzout` — a lazy output proxy for wiring downstream nodes.

**LazyField** (`engine/lazy.py`): Promises between nodes. `node.lzout.x` returns a `LazyOutField`. Assigning it to another node's input creates a dataflow edge.

**State / Splitter / Combiner** (`engine/state.py`): Implements map-reduce semantics. A node can be split over an iterable input (producing parallel jobs) and combined (reducing results). Splitters can be scalar (zip) or outer (cartesian product), expressed in RPN. Each concrete state index corresponds to one `Job`.

**Job** (`engine/job.py`): The concrete unit of work submitted to a worker. Holds the fully-resolved `Task`, a `cache_dir` (from content hash of task inputs + definition), and uses `filelock.SoftFileLock` for safe parallel execution.

**Submitter** (`engine/submitter.py`): The async dispatch loop. Constructs the `DiGraph` of `NodeExecution` objects, drives an asyncio event loop to submit ready jobs to the configured `Worker`, handles caching (skips jobs with a valid cached result), and manages concurrency.

```python
with Submitter(worker="cf", cache_root="/tmp/cache") as sub:
result = sub(my_task)
```

### Workers (`workers/`)

| Class | Module | Description |
|---|---|---|
| `ConcurrentFuturesWorker` | `cf.py` | `ProcessPoolExecutor`-based (default) |
| `DebugWorker` | `debug.py` | Single-process, synchronous |
| `SlurmWorker` | `slurm.py` | Submits via `sbatch`, polls with `sacct` |
| `SGEWorker` | `sge.py` | SGE qsub |

Workers are discovered via a plugin system (`get_plugin_classes` in `utils/general.py`). External workers (`pydra-workers-psij`, `pydra-workers-dask`) are installable as separate packages.

### Environments (`environments/`)

Control *how* shell tasks execute: `native.py` (bare OS), `docker.py`, `singularity.py`, `lmod.py` (load environment modules before executing).

### Caching (`utils/hash.py`, `engine/job.py`)

Cache keys are content hashes of the `Task` (all inputs + task definition). Results are stored as cloudpickled files under `~/.cache/pydra/<version>/run-cache/` (via `platformdirs`). Lock files prevent race conditions.

### Provenance Tracking (`engine/audit.py`)

Optional JSON-LD provenance tracking controlled via `AuditFlag` bits. Messengers: `PrintMessenger`, `FileMessenger`, `RemoteRESTMessenger`. Schema at `schema/context.jsonld`.

### Data Flow Summary

```
User code
├─ @python.define / @shell.define / @workflow.define
│ └─> compose/base/builder.py: build_task_class()
│ creates Task(attrs) + Outputs(attrs)
├─ Submitter(worker="cf", cache_root=...)
│ ├─ Workflow.construct(task) → DiGraph of Nodes (engine/graph.py)
│ ├─ State resolution → list of state indices (engine/state.py)
│ ├─ per state-index: Job(task, cache_dir) (engine/job.py)
│ │ └─ if not cached → Worker.run(job) (workers/)
│ │ └─ Environment.execute(job) (environments/)
│ └─ Result(outputs, runtime, cache_dir) (engine/result.py)
└─ LazyField wiring between Nodes (engine/lazy.py)
```

## Key Files

| File | Purpose |
|---|---|
| `pyproject.toml` | Build, deps, pytest config, coverage config |
| `tox.ini` | tox envs: test, style, style-fix, spellcheck, build, publish |
| `.flake8` | Flake8: max-line-length=105 |
| `pydra/conftest.py` | `worker`/`any_worker` fixtures; `--only-worker`, `--with-dask` flags |
| `pydra/compose/base/builder.py` | Decorator machinery (`build_task_class`) |
| `pydra/compose/base/field.py` | `Field`, `Arg`, `Out`, `NO_DEFAULT`, `Requirement` |
| `pydra/compose/base/task.py` | `Task` and `Outputs` base classes |
| `pydra/compose/python.py` | `@python.define` |
| `pydra/compose/shell/task.py` | Shell `Task`, CLI construction |
| `pydra/compose/workflow.py` | `@workflow.define`, `workflow.add`, `workflow.this` |
| `pydra/engine/submitter.py` | Async dispatch loop |
| `pydra/engine/job.py` | Single unit of work, caching, locking |
| `pydra/engine/state.py` | Splitter/combiner map-reduce |
| `pydra/engine/lazy.py` | `LazyField` — dataflow wiring |
| `pydra/engine/workflow.py` | DAG construction and caching |
| `pydra/engine/node.py` | `Node` — task wrapper in workflow graph |
| `pydra/utils/hash.py` | Content hashing for cache keys |
| `pydra/utils/general.py` | Plugin discovery, cache root, platform utils |
| `pydra/utils/typing.py` | `StateArray`, `TypeParser`, type helpers |
43 changes: 42 additions & 1 deletion pydra/engine/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ class Job(ty.Generic[TaskType]):
"""

_api_version: str = "0.0.1" # Should generally not be touched by subclasses
_etelemetry_version_data = None # class variable to store etelemetry information
# Sentinel meaning "check not yet performed". Distinct from None so that a
# failed/empty etelemetry response (which returns None) does not cause the
# network check to be repeated on every subsequent task invocation.
_ETELEMETRY_UNCHECKED = object()
_etelemetry_version_data = _ETELEMETRY_UNCHECKED # class variable
_version: str # Version of tool being wrapped
_task_version: ty.Optional[str] = None
# Job writers encouraged to define and increment when implementation changes sufficiently
Expand Down Expand Up @@ -141,6 +145,10 @@ def __init__(
self.hooks = hooks if hooks is not None else TaskHooks()
self._errored = False
self._lzout = None
# In-memory result cache: avoids reading back from disk the result that
# was just written by run(). Not included in __getstate__ so it is
# never pickled (subprocess workers will still use the disk path).
self._cached_result = None

# Save the submitter attributes needed to run the job later
self.audit = submitter.audit
Expand Down Expand Up @@ -179,11 +187,18 @@ def __str__(self):
def __getstate__(self):
state = self.__dict__.copy()
state["task"] = cp.dumps(state["task"])
# Never serialise the in-memory result cache: subprocess workers must
# go through the normal disk-based load_result() path.
state.pop("_cached_result", None)
return state

def __setstate__(self, state):
state["task"] = cp.loads(state["task"])
self.__dict__.update(state)
# _cached_result is excluded from __getstate__; ensure it always exists
# so that result() works correctly on deserialized jobs.
if "_cached_result" not in self.__dict__:
self._cached_result = None

@property
def errored(self):
Expand Down Expand Up @@ -349,6 +364,10 @@ def run(self, rerun: bool = False):
# Check for any changes to the input hashes that have occurred during the execution
# of the job
self._check_for_hash_changes()
# Cache the completed result in memory so that callers who immediately
# call job.result() (e.g. Submitter.__call__) do not need to deserialise
# it back from disk.
self._cached_result = result
return result

async def run_async(self, rerun: bool = False) -> Result:
Expand Down Expand Up @@ -487,6 +506,11 @@ def result(self, return_inputs=False):
task=self.task,
)

# Fast path: return the in-memory result cached by run() to avoid
# deserialising from disk for callers in the same process.
if self._cached_result is not None and not return_inputs:
return self._cached_result

checksum = self.checksum
result = load_result(checksum, self.all_caches)
if result and result.errored:
Expand All @@ -504,6 +528,23 @@ def result(self, return_inputs=False):
return result

def _check_for_hash_changes(self):
from pydra.utils.typing import TypeParser

# For tasks whose input fields contain no FileSet types, hashes cannot
# change during execution (scalar/pure-Python values are immutable from
# Pydra's perspective). Skip the expensive full recomputation in that
# common case.
if not any(
TypeParser.contains_type(FileSet, f.type) for f in get_fields(self.task)
):
logger.debug(
"Input values and hashes for '%s' %s node:\n%s\n%s",
self.name,
type(self).__name__,
self.task,
self.task._hashes,
)
return
hash_changes = self.task._hash_changes()
details = ""
for changed in hash_changes:
Expand Down
2 changes: 1 addition & 1 deletion pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __init__(

from pydra.utils.etelemetry import check_latest_version

if Job._etelemetry_version_data is None:
if Job._etelemetry_version_data is Job._ETELEMETRY_UNCHECKED:
Job._etelemetry_version_data = check_latest_version()

self.audit = Audit(
Expand Down
Loading
Loading