Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
39d31e6
An initial stab at a flow decorator; some tests
vineetbansal Oct 20, 2025
1459ac0
Merge branch 'main' into vb/flow_decorator
vineetbansal Oct 23, 2025
5c7e780
More extensive tests and cases covered.
vineetbansal Oct 24, 2025
4170f6f
Merge pull request #1 from vineetbansal/vb/flow_decorator_fixes
vineetbansal Oct 27, 2025
c6fd65e
simplifications based on discussions in PR815; nested flow decorators…
vineetbansal Nov 13, 2025
79d78dd
Merge branch 'main' into vb/flow_decorator
vineetbansal Nov 13, 2025
0f1817a
whitespace format
vineetbansal Nov 13, 2025
a697c6b
Removed xfails for dynamic jobs (seem to work okay?)
vineetbansal Nov 13, 2025
907944b
Some tweaks and an additional test based on discussion in PR815
vineetbansal Nov 17, 2025
61d43c9
Merge branch 'main' into vb/flow_decorator
vineetbansal Nov 19, 2025
30b06c1
getitem for Job delegates to its output
vineetbansal Nov 21, 2025
48611de
Merge branch 'vb/job_getitem' into vb/flow_decorator
vineetbansal Nov 21, 2025
f938106
allow Flow inside @flow decorator
gpetretto Nov 25, 2025
7479678
Merge branch 'test_flow_dec' into vb/flow_decorator
vineetbansal Dec 3, 2025
2f2edeb
re-added the automatic replacement of Job/Flow output from @flow to i…
vineetbansal Dec 3, 2025
36c9ecf
Revert "getitem for Job delegates to its output"
vineetbansal Dec 8, 2025
24b8c38
docstring tweaks and tweaks to some tests, now that Job/Flow are adde…
vineetbansal Dec 8, 2025
472c77b
Merge branch 'main' into vb/flow_decorator
vineetbansal Dec 8, 2025
0f7ed09
Merge branch 'materialsproject:main' into vb/flow_decorator
vineetbansal Dec 15, 2025
c5c0b2e
Merge branch 'main' into vb/flow_decorator
utf Feb 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ keywords = ["high-throughput", "workflow"]
license = { text = "modified BSD" }
authors = [{ name = "Alex Ganose", email = "a.ganose@imperial.ac.uk" }]
dynamic = ["version"]

classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Information Technology",
Expand Down
2 changes: 1 addition & 1 deletion src/jobflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Jobflow is a package for writing dynamic and connected workflows."""

from jobflow._version import __version__
from jobflow.core.flow import Flow, JobOrder
from jobflow.core.flow import Flow, JobOrder, flow
from jobflow.core.job import Job, JobConfig, Response, job
from jobflow.core.maker import Maker
from jobflow.core.reference import OnMissing, OutputReference
Expand Down
111 changes: 110 additions & 1 deletion src/jobflow/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import logging
import warnings
from contextlib import contextmanager
from contextvars import ContextVar
from copy import deepcopy
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -155,6 +157,12 @@ def __init__(
self.add_jobs(jobs)
self.output = output

# If we're running inside a `DecoratedFlow`, add *this* Flow to the
# context.
current_flow_children_list = _current_flow_context.get()
if current_flow_children_list is not None:
current_flow_children_list.append(self)

def __len__(self) -> int:
"""Get the number of jobs or subflows in the flow."""
return len(self.jobs)
Expand Down Expand Up @@ -828,7 +836,7 @@ def add_jobs(self, jobs: Job | Flow | Sequence[Flow | Job]) -> None:
if job.host is not None and job.host != self.uuid:
raise ValueError(
f"{type(job).__name__} {job.name} ({job.uuid}) already belongs "
f"to another flow."
f"to another flow: {job.host}."
)
if job.uuid in job_ids:
raise ValueError(
Expand Down Expand Up @@ -921,3 +929,104 @@ def get_flow(
)

return flow


class DecoratedFlow(Flow):
"""A DecoratedFlow is a Flow that is returned on using the @flow decorator."""

def __init__(self, fn, *args, **kwargs):
from jobflow import Maker

self.fn = fn
self.args = args
self.kwargs = kwargs

# Collect the jobs and flows that are used in the function
children_list = []
with flow_build_context(children_list):
output = self.fn(*self.args, **self.kwargs)

# From the collected items, remove those that have already been assigned
# to another Flow during the call of the function.
# This handles the case where a Flow object is instantiated inside
# the decorated function
children_list = [c for c in children_list if c.host is None]

name = getattr(self.fn, "__qualname__", self.fn.__name__)

# if decorates a make() in a Maker use that as a name
if (
len(self.args) > 0
and name.split(".")[-1] == "make"
and getattr(args[0], self.fn.__name__, None)
and isinstance(args[0], Maker)
):
name = args[0].name

if isinstance(output, (jobflow.Job, jobflow.Flow)):
warnings.warn(
f"@flow decorated function '{name}' contains a Flow or"
f"Job as an output. Usually the output should be the output of"
f"a Job or another Flow (e.g. job.output). Replacing the"
f"output of the @flow with the output of the Flow/Job."
f"If this message is unexpected then double check the outputs"
f"of your @flow decorated function.",
stacklevel=2,
)
output = output.output

super().__init__(name=name, jobs=children_list, output=output)


def flow(fn):
"""
Turn a function into a DecoratedFlow object.

Parameters
----------
fn (Callable): The function to be wrapped in a DecoratedFlow object.

Returns
-------
Callable: A wrapper function that, when called, creates and returns
an instance of DecoratedFlow initialized with the provided function
and its arguments.
"""
from functools import wraps

@wraps(fn)
def wrapper(*args, **kwargs):
return DecoratedFlow(fn, *args, **kwargs)

return wrapper


@contextmanager
def flow_build_context(children_list):
"""Provide a context manager for flows.

Provides a context manager for setting and resetting the `Job` and `Flow`
objects in the current flow context.

Parameters
----------
children_list: The `Job` or `Flow` objects that are part of the current
flow context.

Yields
------
None: Temporarily sets the provided `Job` or `Flow` objects as
belonging to the current flow context within the managed block.

Raises
------
None
"""
token = _current_flow_context.set(children_list)
try:
yield
finally:
_current_flow_context.reset(token)


_current_flow_context = ContextVar("current_flow_context", default=None)
7 changes: 7 additions & 0 deletions src/jobflow/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from monty.json import MSONable, jsanitize
from typing_extensions import Self

from jobflow.core.flow import _current_flow_context
from jobflow.core.reference import OnMissing, OutputReference
from jobflow.utils.uid import suid

Expand Down Expand Up @@ -384,6 +385,12 @@ def __init__(
stacklevel=2,
)

# If we're running inside a `DecoratedFlow`, add *this* Job to the
# context.
current_flow_children_list = _current_flow_context.get()
if current_flow_children_list is not None:
current_flow_children_list.append(self)

def __repr__(self):
"""Get a string representation of the job."""
name, uuid = self.name, self.uuid
Expand Down
Loading
Loading