Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0f6a17d
testing against custom fork of jobflow that supports a flow decorator
vineetbansal Nov 14, 2025
32c2765
Remove pip caching from tests workflow
Andrew-S-Rosen Nov 14, 2025
bd1144a
Enable pip caching in tests workflow
Andrew-S-Rosen Nov 14, 2025
2fa1a55
Merge branch 'main' into vb/flow_decorator
vineetbansal Nov 14, 2025
d96c90b
simplifications; some more tests
vineetbansal Nov 14, 2025
7a7ec02
Fix (de)serialization of Phonopy
Andrew-S-Rosen Nov 17, 2025
4871b37
pre-commit auto-fixes
pre-commit-ci[bot] Nov 17, 2025
8b7220f
Update phonons.py
Andrew-S-Rosen Nov 17, 2025
61ff962
pre-commit auto-fixes
pre-commit-ci[bot] Nov 17, 2025
d9217a2
Update phonons.py
Andrew-S-Rosen Nov 17, 2025
910f3fa
Pass displaced_atoms to _thermo_job function
Andrew-S-Rosen Nov 17, 2025
0f4526e
Modify phonon job to use displaced atoms
Andrew-S-Rosen Nov 17, 2025
45b6d20
Update phonons.py
Andrew-S-Rosen Nov 17, 2025
74a48b3
Merge branch 'phonopy_fix' into vb/flow_decorator
vineetbansal Nov 17, 2025
ec84860
testing against branch vb/flow_decorator of jobflow fork
vineetbansal Nov 18, 2025
599df7d
Documentation examples; Job wrapper with getitem access (temporary ti…
vineetbansal Nov 19, 2025
67f3eb8
Merge branch 'main' into vb/flow_decorator
vineetbansal Nov 19, 2025
1d29471
a more robust __name__ assignment for monty serialization
vineetbansal Nov 19, 2025
18910b5
Merge branch 'main' into vb/flow_decorator
Andrew-S-Rosen Nov 20, 2025
16a5f6c
quacc subflow mapped to a jobflow job
vineetbansal Nov 21, 2025
6f5904e
Merge branch 'vb/flow_decorator' of github.com:vineetbansal/quacc int…
vineetbansal Nov 21, 2025
61b66e6
test tweaks now that subflow return jf.Job objects for Jobflow
vineetbansal Nov 21, 2025
7b66816
Merge branch 'main' into vb/flow_decorator
Andrew-S-Rosen Nov 30, 2025
3fc0871
Merge branch 'main' into vb/flow_decorator
vineetbansal Dec 8, 2025
09536ca
Merge branch 'vb/flow_decorator' of github.com:vineetbansal/quacc int…
vineetbansal Dec 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions docs/user/basics/wflow_decorators.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,16 @@ A `#!Python @subflow` in quacc is any workflow that returns a list of job output

=== "Jobflow"

Take a moment to read the Jobflow documentation's [Quick Start](https://materialsproject.github.io/jobflow/tutorials/1-quickstart.html) to get a sense of how Jobflow works. Namely, you should understand the `Job` and `Flow` definitions, which describe individual compute tasks and workflows, respectively.
Take a moment to read the Jobflow documentation's [Quick Start](https://materialsproject.github.io/jobflow/tutorials/1-quickstart.html) to get a sense of how Jobflow works. Namely, you should understand the concept of a `#!Python @job` and a `#!Python @flow`, which describe individual compute tasks and workflows, respectively.

<center>

| Quacc | Jobflow |
| ------------------- | --------------- |
| `#!Python @job` | `#!Python @job` |
| `#!Python @flow` | N/A |
| `#!Python @subflow` | N/A |
| Quacc | Jobflow |
| ------------------- | ---------------- |
| `#!Python @job` | `#!Python @job` |
| `#!Python @flow` | `#!Python @flow` |
| `#!Python @subflow` | `#!Python @job` |

</center>

!!! Warning

Due to the difference in how Jobflow handles workflows compared to other supported workflow engines, any quacc recipes that have been pre-defined with a `#!Python @flow` or `#!Python @subflow` decorator (i.e. have `_flow` in the name) cannot be run directly with Jobflow.

The quacc descriptors are drop-in replacements for the specified workflow engine analogue, which we will use for the remainder of the tutorials. Based on the value for the `WORKFLOW_ENGINE` global variable in your [quacc settings](../settings/settings.md), the appropriate decorator will be automatically selected. If the `WORKFLOW_ENGINE` setting is set to `None` (i.e. `quacc set WORKFLOW_ENGINE None`), the decorators will have no effect on the underlying function.
4 changes: 0 additions & 4 deletions docs/user/basics/wflow_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ Everyone's computing needs are different, so we ensured that quacc is interopera

[Jobflow](https://github.com/materialsproject/jobflow) is developed and maintained by the Materials Project team at Lawrence Berkeley National Laboratory and serves as a seamless interface to [FireWorks](https://github.com/materialsproject/fireworks) or [Jobflow Remote](https://github.com/Matgenix/jobflow-remote) for dispatching and monitoring compute jobs.

!!! Warning

Jobflow is not yet compatible with the `#!Python @flow` or `#!Python @subflow` decorators used in many quacc recipes and so should only be used if necessary. See [this issue](https://github.com/Quantum-Accelerators/quacc/issues/1061) to track the progress of this enhancement.

Pros:

- Native support for a variety of databases
Expand Down
18 changes: 16 additions & 2 deletions docs/user/wflow_engine/wflow_engines1.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,20 @@ graph LR

=== "Jobflow"

!!! Warning
```python
import jobflow as jf
from ase.build import bulk
from quacc.recipes.emt.slabs import bulk_to_slabs_flow

# Define the Atoms object
atoms = bulk("Cu")

Due to the difference in how Jobflow handles workflows (particularly dynamic ones) compared to other supported workflow engines, any quacc recipes that have been pre-defined with a `#!Python @flow` decorator (i.e. have `_flow` in the name) cannot be run directly with Jobflow. Rather, a Jobflow-specific `Flow` needs to be constructed by the user.
# Create the workflow with arguments
workflow = bulk_to_slabs_flow(atoms)

# Dispatch the workflow and get results
results = jf.run_locally(workflow)

# Print the results
print(results)
```
30 changes: 28 additions & 2 deletions docs/user/wflow_engine/wflow_engines2.md
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,32 @@ graph LR

=== "Jobflow"

!!! Warning "Limitations"
```python
import jobflow as jf
from ase.build import bulk
from quacc import flow
from quacc.recipes.emt.core import relax_job
from quacc.recipes.emt.slabs import bulk_to_slabs_flow


# Define the workflow
@flow
def relaxed_slabs_workflow(atoms):
relaxed_bulk = relax_job(atoms)
relaxed_slabs = bulk_to_slabs_flow(relaxed_bulk["atoms"], run_static=False)

Due to the difference in how Jobflow handles workflows (particularly dynamic ones) compared to other supported workflow engines, any quacc recipes that have been pre-defined with a `#!Python @flow` decorator (i.e. have `_flow` in the name) cannot be run directly with Jobflow. Rather, a Jobflow-specific `Flow` needs to be constructed by the user.
return relaxed_slabs


# Define the Atoms object
atoms = bulk("Cu")

# Create the workflow with arguments
workflow = relaxed_slabs_workflow(atoms)

# Dispatch the workflow and get results
results = jf.run_locally(workflow)

# print the results
print(results)
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ covalent = ["covalent>=0.234.1-rc.0; platform_system!='Windows'", "covalent-clou
dask = ["dask[distributed]>=2023.12.1", "dask-jobqueue>=0.8.2"]
defects = ["pymatgen-analysis-defects>=2024.10.22", "shakenbreak>=3.2.0"]
fairchem = ["fairchem-data-omat>=0.2", "fairchem-data-oc>=1.0.2", "fairchem-core>=2.2.0"]
jobflow = ["jobflow>=0.1.14", "jobflow-remote>=0.1.0"]
jobflow = ["jobflow @ git+https://github.com/vineetbansal/jobflow.git@vb/flow_decorator", "jobflow-remote>=0.1.0"]
orb = ["orb-models>=0.4.1"]
mace = ["mace-torch>=0.3.3", "mace-models>=0.1.6"]
matgl = ["matgl>=2.0.2"]
Expand Down
12 changes: 11 additions & 1 deletion src/quacc/wflow_tools/customizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,17 @@ def update_parameters(
func = strip_decorator(func)
return decorator_func(partial(func, **params))

return partial(func, **params)
partial_fn = partial(func, **params)
# Assigning a __name__ allows monty's jsanitize function to work correctly
# with this partial function.
if hasattr(func, "name"):
partial_fn.__name__ = func.name
elif hasattr(func, "__name__"):
partial_fn.__name__ = func.__name__
else:
partial_fn.__name__ = ""

return partial_fn


def customize_funcs(
Expand Down
89 changes: 86 additions & 3 deletions src/quacc/wflow_tools/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,7 @@ def wrapper(*f_args, **f_kwargs):

return Delayed_(delayed(wrapper, **kwargs))
elif settings.WORKFLOW_ENGINE == "jobflow":
from jobflow import job as jf_job

return jf_job(_func, **kwargs)
return _get_jobflow_wrapped_func(_func, **kwargs)
elif settings.WORKFLOW_ENGINE == "parsl":
from parsl import python_app

Expand Down Expand Up @@ -352,6 +350,8 @@ def workflow(a, b, c):
return task(_func, namespace=_func.__module__, **kwargs)
elif settings.WORKFLOW_ENGINE == "prefect":
return _get_prefect_wrapped_flow(_func, settings, **kwargs)
elif settings.WORKFLOW_ENGINE == "jobflow":
return _get_jobflow_wrapped_flow(_func)
else:
return _func

Expand Down Expand Up @@ -585,6 +585,8 @@ def wrapper(*f_args, **f_kwargs):
from redun import task

return task(_func, namespace=_func.__module__, **kwargs)
elif settings.WORKFLOW_ENGINE == "jobflow":
return _get_jobflow_wrapped_func(_func, **kwargs)
else:
return _func

Expand Down Expand Up @@ -662,6 +664,87 @@ def sync_wrapper(*f_args, **f_kwargs):
return prefect_flow(_func, validate_parameters=False, **kwargs)


def _get_jobflow_wrapped_func(method=None, **job_kwargs):
"""
Custom wrapper for `@job` decorated functions for `jobflow`.

We need this to emulate `@job` like behavior but want the decorated
function to return a `JobflowJobWrapper` class whose __getitem__ we can
intercept.

This wrapper only needs to exist till this functionality is available
in `jobflow.Job` directly, after which it can simply be implemented as:

from jobflow import job as jf_job
return jf_job(method, **job_kwargs)

"""
from jobflow import Job
from jobflow import job as jf_job

class JobflowJobWrapper(Job):
"""A small Jobflow wrapper that holds a reference to a `jobflow.Job`
object, and relays all calls to it, except for `__getitem__` calls that
it relays to the `Job`'s `.output` attribute.

This is to make the `@flow` recipes that index directly inside a `@job`
work correctly. For example:

@job
def greetings(s):
return {"hello": f"Hello {s}", "bye": f"Goodbye {s}"}

@job
def upper(s):
return s.upper()

@flow
def greet(s):
job1 = greetings(s)
job2 = upper(job1["hello"])
return job2.output

This wrapper only needs to exist till this functionality is available
in `jobflow.Job` directly.
"""

def __init__(self, job):
self._job = job

def __getitem__(self, key):
return self._job.output[key]

def __getattr__(self, item):
return getattr(self._job, item)

def decorator(func):
jobflow_wrapped = jf_job(func, **job_kwargs)

@wraps(func)
def wrapper(*args, **kw):
job = jobflow_wrapped(*args, **kw)
return JobflowJobWrapper(job)

wrapper.original = func

return wrapper

if method is None:
return decorator
return decorator(method)


def _get_jobflow_wrapped_flow(_func: Callable) -> Callable:
from jobflow import flow as jf_flow

jobflow_flow = jf_flow(_func)

def wrapper(*args, **kwargs):
return jobflow_flow(*args, **kwargs)

return wrapper


class Delayed_:
"""A small Dask-compatible, serializable object to wrap delayed functions that we
don't want to execute.
Expand Down
62 changes: 62 additions & 0 deletions tests/jobflow/test_emt_recipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from __future__ import annotations

import pytest

jobflow = pytest.importorskip("jobflow")

from ase.build import bulk

from quacc import flow, job
from quacc.recipes.emt.core import relax_job
from quacc.recipes.emt.slabs import bulk_to_slabs_flow # skipcq: PYL-C0412


@pytest.mark.parametrize("job_decorators", [None, {"relax_job": job()}])
def test_functools(tmp_path, monkeypatch, job_decorators):
monkeypatch.chdir(tmp_path)
atoms = bulk("Cu")
flow = bulk_to_slabs_flow(
atoms,
run_static=False,
job_params={"relax_job": {"opt_params": {"fmax": 0.1}}},
job_decorators=job_decorators,
)
jobflow.run_locally(flow, ensure_success=True)


def test_copy_files(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
atoms = bulk("Cu")

@flow
def myflow(atoms):
result1 = relax_job(atoms)
return relax_job(result1["atoms"], copy_files={result1["dir_name"]: "opt.*"})

output = jobflow.run_locally(myflow(atoms))
first_output = next(iter(output.values()))[1].output
assert "atoms" in first_output


def test_relax_flow(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
atoms = bulk("Cu")

@flow
def relax_flow(atoms):
result1 = relax_job(atoms)
return relax_job(result1["atoms"])

jobflow.run_locally(relax_flow(atoms), ensure_success=True)


def test_relaxed_slabs(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
atoms = bulk("Cu")

@flow
def workflow(atoms):
relaxed_bulk = relax_job(atoms)
return bulk_to_slabs_flow(relaxed_bulk["atoms"], run_static=False)

jobflow.run_locally(workflow(atoms), ensure_success=True)
8 changes: 4 additions & 4 deletions tests/jobflow/test_syntax.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def workflow(a, b, c):
assert hasattr(mult, "original")
assert isinstance(add(1, 2), jf.Job)
assert isinstance(mult(1, 2), jf.Job)
assert isinstance(workflow(1, 2, 3), jf.Job)
assert isinstance(add_distributed([1, 2, 3], 4)[0], jf.Job)
assert isinstance(workflow(1, 2, 3), jf.Flow)
assert isinstance(add_distributed([1, 2, 3], 4), jf.Job)


def test_jobflow_decorators_args(tmp_path, monkeypatch):
Expand Down Expand Up @@ -61,5 +61,5 @@ def workflow(a, b, c):
assert hasattr(mult, "original")
assert isinstance(add(1, 2), jf.Job)
assert isinstance(mult(1, 2), jf.Job)
assert isinstance(workflow(1, 2, 3), jf.Job)
assert isinstance(add_distributed([1, 2, 3], 4)[0], jf.Job)
assert isinstance(workflow(1, 2, 3), jf.Flow)
assert isinstance(add_distributed([1, 2, 3], 4), jf.Job)
Loading
Loading