From 0f6a17d0f375241892b7a396468097dd22c27ed1 Mon Sep 17 00:00:00 2001 From: Vineet Bansal Date: Fri, 14 Nov 2025 10:50:30 -0500 Subject: [PATCH 01/17] testing against custom fork of jobflow that supports a flow decorator --- pyproject.toml | 2 +- src/quacc/settings.py | 13 +++++++++++ src/quacc/wflow_tools/decorators.py | 34 +++++++++++++++++++++++++++++ tests/jobflow/test_syntax.py | 4 ++-- tests/requirements-jobflow.txt | 2 +- 5 files changed, 51 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1a84b8f7c8..949f73c41d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ covalent = ["covalent>=0.234.1-rc.0; platform_system!='Windows'", "covalent-clou dask = ["dask[distributed]>=2023.12.1", "dask-jobqueue>=0.8.2"] db = ["maggma>=0.64.0"] defects = ["pymatgen-analysis-defects>=2024.10.22", "shakenbreak>=3.2.0"] -jobflow = ["jobflow>=0.1.14", "jobflow-remote>=0.1.0"] +jobflow = ["jobflow @ git+https://github.com/vineetbansal/jobflow.git@vb/flow_decorator_quacc", "jobflow-remote>=0.1.0"] mlp1 = ["torch-dftd>=0.4.0", "sevenn>=0.10.1", "orb-models>=0.4.1", "fairchem-core>=2.2.0"] mlp2 = ["mace-torch>=0.3.3", "matgl>=1.1.2"] mp = ["atomate2>=0.0.14"] diff --git a/src/quacc/settings.py b/src/quacc/settings.py index 95570dbeab..8fdf18f8fb 100644 --- a/src/quacc/settings.py +++ b/src/quacc/settings.py @@ -116,6 +116,19 @@ class QuaccSettings(BaseSettings): description="Whether to resolve all futures in flow results to data and fail if not possible", ) + # --------------------------- + # Jobflow Settings + # --------------------------- + # TODO: How is the user expected to supply the task runner in jobflow? + # For now, we assume `run_locally` is implied. + JOBFLOW_AUTO_SUBMIT: bool = Field( + False, description="Whether to auto-submit tasks to a local task runner." + ) + JOBFLOW_RESOLVE_FLOW_RESULTS: bool = Field( + False, + description="Whether to resolve all output references in flow results to data and fail if not possible", + ) + # --------------------------- # ORCA Settings # --------------------------- diff --git a/src/quacc/wflow_tools/decorators.py b/src/quacc/wflow_tools/decorators.py index f730b5038c..2b7ef5f29a 100644 --- a/src/quacc/wflow_tools/decorators.py +++ b/src/quacc/wflow_tools/decorators.py @@ -2,6 +2,8 @@ from __future__ import annotations +import asyncio +import inspect from collections.abc import Callable from functools import partial, wraps from typing import TYPE_CHECKING, Any @@ -352,6 +354,8 @@ def workflow(a, b, c): return task(_func, namespace=_func.__module__, **kwargs) elif settings.WORKFLOW_ENGINE == "prefect": return _get_prefect_wrapped_flow(_func, settings, **kwargs) + elif settings.WORKFLOW_ENGINE == "jobflow": + return _get_jobflow_wrapped_flow(_func, settings) else: return _func @@ -585,6 +589,8 @@ def wrapper(*f_args, **f_kwargs): from redun import task return task(_func, namespace=_func.__module__, **kwargs) + elif settings.WORKFLOW_ENGINE == "jobflow": + return _get_jobflow_wrapped_flow(_func, settings) else: return _func @@ -662,6 +668,34 @@ def sync_wrapper(*f_args, **f_kwargs): return prefect_flow(_func, validate_parameters=False, **kwargs) +def _get_jobflow_wrapped_flow(_func: Callable, settings: QuaccSettings) -> Callable: + from jobflow import flow as jf_flow + from jobflow.managers.local import run_locally + + def is_async_fn(func): + """ + Returns `True` if a function returns a coroutine. + An exact reproduction of `refect.utilities.asyncutils.is_async_fn` + """ + func = inspect.unwrap(func) + return asyncio.iscoroutinefunction(func) + + if is_async_fn(_func): + raise NotImplementedError + # If the Jobflow flow decorator is instantiated with return_dict=False, + # it resolves the references in its output dict. + return_dict = not settings.JOBFLOW_RESOLVE_FLOW_RESULTS + jobflow_flow = jf_flow(_func, return_dict=return_dict) + + def wrapper(*args, **kwargs): + bound_jobflow_flow = jobflow_flow(*args, **kwargs) + if settings.JOBFLOW_AUTO_SUBMIT: + return run_locally(bound_jobflow_flow, ensure_success=True) + return bound_jobflow_flow + + return wrapper + + class Delayed_: """A small Dask-compatible, serializable object to wrap delayed functions that we don't want to execute. diff --git a/tests/jobflow/test_syntax.py b/tests/jobflow/test_syntax.py index 332f074570..3cf85ab063 100644 --- a/tests/jobflow/test_syntax.py +++ b/tests/jobflow/test_syntax.py @@ -32,7 +32,7 @@ def workflow(a, b, c): assert hasattr(mult, "original") assert isinstance(add(1, 2), jf.Job) assert isinstance(mult(1, 2), jf.Job) - assert isinstance(workflow(1, 2, 3), jf.Job) + assert isinstance(workflow(1, 2, 3), jf.Flow) assert isinstance(add_distributed([1, 2, 3], 4)[0], jf.Job) @@ -61,5 +61,5 @@ def workflow(a, b, c): assert hasattr(mult, "original") assert isinstance(add(1, 2), jf.Job) assert isinstance(mult(1, 2), jf.Job) - assert isinstance(workflow(1, 2, 3), jf.Job) + assert isinstance(workflow(1, 2, 3), jf.Flow) assert isinstance(add_distributed([1, 2, 3], 4)[0], jf.Job) diff --git a/tests/requirements-jobflow.txt b/tests/requirements-jobflow.txt index 9d4745ae8e..303455a9b7 100644 --- a/tests/requirements-jobflow.txt +++ b/tests/requirements-jobflow.txt @@ -1,2 +1,2 @@ -jobflow==0.2.1 +git+https://github.com/vineetbansal/jobflow.git@v0.2.1.post1 jobflow-remote[gui]==0.1.8 From 32c276551896a3b91a9472b4c4b99436e68b7a52 Mon Sep 17 00:00:00 2001 From: "Andrew S. Rosen" Date: Fri, 14 Nov 2025 14:09:12 -0500 Subject: [PATCH 02/17] Remove pip caching from tests workflow Removed caching for pip packages in workflow. --- .github/workflows/tests.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7ec27c31fa..daf06231cb 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -362,8 +362,6 @@ jobs: uses: actions/setup-python@v6 with: python-version: "3.12" - cache: pip - cache-dependency-path: tests/requirements**.txt - name: Install pip packages run: | From bd1144ac48156cc007d57ae84049930d9ae950e7 Mon Sep 17 00:00:00 2001 From: "Andrew S. Rosen" Date: Fri, 14 Nov 2025 14:39:36 -0500 Subject: [PATCH 03/17] Enable pip caching in tests workflow Add caching for pip dependencies in GitHub Actions --- .github/workflows/tests.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index daf06231cb..7ec27c31fa 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -362,6 +362,8 @@ jobs: uses: actions/setup-python@v6 with: python-version: "3.12" + cache: pip + cache-dependency-path: tests/requirements**.txt - name: Install pip packages run: | From d96c90bb9b81601c17440a76e6343de5dfa59d5d Mon Sep 17 00:00:00 2001 From: Vineet Bansal Date: Fri, 14 Nov 2025 17:57:07 -0500 Subject: [PATCH 04/17] simplifications; some more tests --- docs/user/basics/wflow_overview.md | 4 -- src/quacc/settings.py | 13 ---- src/quacc/wflow_tools/decorators.py | 29 ++------ tests/jobflow/test_tutorials.py | 105 +++++++++++++++++++++++++++- 4 files changed, 109 insertions(+), 42 deletions(-) diff --git a/docs/user/basics/wflow_overview.md b/docs/user/basics/wflow_overview.md index 48cce36c70..4d3973e212 100644 --- a/docs/user/basics/wflow_overview.md +++ b/docs/user/basics/wflow_overview.md @@ -110,10 +110,6 @@ Everyone's computing needs are different, so we ensured that quacc is interopera [Jobflow](https://github.com/materialsproject/jobflow) is developed and maintained by the Materials Project team at Lawrence Berkeley National Laboratory and serves as a seamless interface to [FireWorks](https://github.com/materialsproject/fireworks) or [Jobflow Remote](https://github.com/Matgenix/jobflow-remote) for dispatching and monitoring compute jobs. - !!! Warning - - Jobflow is not yet compatible with the `#!Python @flow` or `#!Python @subflow` decorators used in many quacc recipes and so should only be used if necessary. See [this issue](https://github.com/Quantum-Accelerators/quacc/issues/1061) to track the progress of this enhancement. - Pros: - Native support for a variety of databases diff --git a/src/quacc/settings.py b/src/quacc/settings.py index 8fdf18f8fb..95570dbeab 100644 --- a/src/quacc/settings.py +++ b/src/quacc/settings.py @@ -116,19 +116,6 @@ class QuaccSettings(BaseSettings): description="Whether to resolve all futures in flow results to data and fail if not possible", ) - # --------------------------- - # Jobflow Settings - # --------------------------- - # TODO: How is the user expected to supply the task runner in jobflow? - # For now, we assume `run_locally` is implied. - JOBFLOW_AUTO_SUBMIT: bool = Field( - False, description="Whether to auto-submit tasks to a local task runner." - ) - JOBFLOW_RESOLVE_FLOW_RESULTS: bool = Field( - False, - description="Whether to resolve all output references in flow results to data and fail if not possible", - ) - # --------------------------- # ORCA Settings # --------------------------- diff --git a/src/quacc/wflow_tools/decorators.py b/src/quacc/wflow_tools/decorators.py index 2b7ef5f29a..ff4128a78b 100644 --- a/src/quacc/wflow_tools/decorators.py +++ b/src/quacc/wflow_tools/decorators.py @@ -2,8 +2,6 @@ from __future__ import annotations -import asyncio -import inspect from collections.abc import Callable from functools import partial, wraps from typing import TYPE_CHECKING, Any @@ -355,7 +353,7 @@ def workflow(a, b, c): elif settings.WORKFLOW_ENGINE == "prefect": return _get_prefect_wrapped_flow(_func, settings, **kwargs) elif settings.WORKFLOW_ENGINE == "jobflow": - return _get_jobflow_wrapped_flow(_func, settings) + return _get_jobflow_wrapped_flow(_func) else: return _func @@ -590,7 +588,7 @@ def wrapper(*f_args, **f_kwargs): return task(_func, namespace=_func.__module__, **kwargs) elif settings.WORKFLOW_ENGINE == "jobflow": - return _get_jobflow_wrapped_flow(_func, settings) + return _get_jobflow_wrapped_flow(_func) else: return _func @@ -668,30 +666,13 @@ def sync_wrapper(*f_args, **f_kwargs): return prefect_flow(_func, validate_parameters=False, **kwargs) -def _get_jobflow_wrapped_flow(_func: Callable, settings: QuaccSettings) -> Callable: +def _get_jobflow_wrapped_flow(_func: Callable) -> Callable: from jobflow import flow as jf_flow - from jobflow.managers.local import run_locally - def is_async_fn(func): - """ - Returns `True` if a function returns a coroutine. - An exact reproduction of `refect.utilities.asyncutils.is_async_fn` - """ - func = inspect.unwrap(func) - return asyncio.iscoroutinefunction(func) - - if is_async_fn(_func): - raise NotImplementedError - # If the Jobflow flow decorator is instantiated with return_dict=False, - # it resolves the references in its output dict. - return_dict = not settings.JOBFLOW_RESOLVE_FLOW_RESULTS - jobflow_flow = jf_flow(_func, return_dict=return_dict) + jobflow_flow = jf_flow(_func, return_dict=False) def wrapper(*args, **kwargs): - bound_jobflow_flow = jobflow_flow(*args, **kwargs) - if settings.JOBFLOW_AUTO_SUBMIT: - return run_locally(bound_jobflow_flow, ensure_success=True) - return bound_jobflow_flow + return jobflow_flow(*args, **kwargs) return wrapper diff --git a/tests/jobflow/test_tutorials.py b/tests/jobflow/test_tutorials.py index 311fad9321..66f2d8f131 100644 --- a/tests/jobflow/test_tutorials.py +++ b/tests/jobflow/test_tutorials.py @@ -6,7 +6,7 @@ from ase.build import bulk, molecule -from quacc import job +from quacc import flow, job from quacc.recipes.emt.core import relax_job, static_job # skipcq: PYL-C0412 @@ -42,6 +42,24 @@ def test_tutorial2a(tmp_path, monkeypatch): jf.run_locally(workflow, create_folders=True, ensure_success=True) +def test_tutorial2a_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + # Make an Atoms object of a bulk Cu structure + atoms = bulk("Cu") + + # Define the workflow + @flow + def workflow(atoms): + # Define Job 1 + job1 = relax_job(atoms) + # Define Job 2, which takes the output of Job 1 as input + static_job(job1.output["atoms"]) + + # Run the workflow locally + jf.run_locally(workflow(atoms), create_folders=True, ensure_success=True) + + def test_tutorial2b(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) @@ -60,6 +78,24 @@ def test_tutorial2b(tmp_path, monkeypatch): jf.run_locally(workflow, create_folders=True, ensure_success=True) +def test_tutorial2b_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + # Define two Atoms objects + atoms1 = bulk("Cu") + atoms2 = molecule("N2") + + # Define the workflow + @flow + def workflow(atoms1, atoms2): + # Define two independent relaxation jobs + relax_job(atoms1) + relax_job(atoms2) + + # Run the workflow locally + jf.run_locally(workflow(atoms1, atoms2), create_folders=True, ensure_success=True) + + def test_comparison1(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) @@ -79,6 +115,27 @@ def mult(a, b): assert responses[job2.uuid][1].output == 9 +def test_comparison1_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + @job # (1)! + def add(a, b): + return a + b + + @job + def mult(a, b): + return a * b + + @flow + def workflow(): + job1 = add(1, 2) + job2 = mult(job1.output, 3) + return job2.output # or `return job` + + response = jf.run_locally(workflow(), ensure_success=True) + assert response == 9 + + def test_comparison2(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) @@ -106,6 +163,33 @@ def add_distributed(vals, c): jf.run_locally(flow, ensure_success=True) # [6, 6, 6] in final 3 jobs +def test_comparison2_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + @jf.job + def add(a, b): + return a + b + + @jf.job + def make_more(val): + return [val] * 3 + + @jf.job + def add_distributed(vals, c): + jobs = [add(val, c) for val in vals] + + flow = jf.Flow(jobs) + return jf.Response(replace=flow) + + @jf.flow + def workflow(): + job1 = add(1, 2) + job2 = make_more(job1.output) + add_distributed(job2.output, 3) + + jf.run_locally(workflow(), ensure_success=True) # [6, 6, 6] in final 3 jobs + + def test_comparison3(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) @@ -122,3 +206,22 @@ def mult(a, b): flow = jf.Flow([job1, job2]) jf.run_locally(flow, ensure_success=True) + + +def test_comparison3_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + @job # (1)! + def add(a, b): + return a + b + + @job + def mult(a, b): + return a * b + + @flow + def workflow(): + job1 = add(1, 2) + mult(job1.output, 3) + + jf.run_locally(workflow(), ensure_success=True) From 7a7ec02228cec6bb8367b2bdec1637d8d1dc8351 Mon Sep 17 00:00:00 2001 From: Andrew Rosen Date: Mon, 17 Nov 2025 14:34:45 -0500 Subject: [PATCH 05/17] Fix (de)serialization of Phonopy --- src/quacc/recipes/common/phonons.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/quacc/recipes/common/phonons.py b/src/quacc/recipes/common/phonons.py index c8899d2ee9..8cd96a2872 100644 --- a/src/quacc/recipes/common/phonons.py +++ b/src/quacc/recipes/common/phonons.py @@ -97,14 +97,14 @@ def phonon_subflow( displaced_atoms, non_displaced_atoms = atoms[~mask_to_fix], atoms[mask_to_fix] - phonopy = get_phonopy( - displaced_atoms, - min_lengths=min_lengths, - supercell_matrix=supercell_matrix, - symprec=symprec, - displacement=displacement, - phonopy_kwargs=phonopy_kwargs, - ) + get_phonopy_kwargs = { + "min_lengths": min_lengths, + "supercell_matrix": supercell_matrix, + "symprec": symprec, + "displacement": displacement, + "phonopy_kwargs": phonopy_kwargs, + } + phonopy = get_phonopy(displaced_atoms, **get_phonopy_kwargs) if non_displaced_atoms: non_displaced_atoms_supercell = get_atoms_supercell_by_phonopy( @@ -127,13 +127,14 @@ def _get_forces_subflow(supercells: list[Atoms]) -> list[dict]: @job def _thermo_job( atoms: Atoms, - phonopy: Phonopy, + get_phonopy_kwargs: dict[str, Any], force_job_results: list[dict], t_step: float, t_min: float, t_max: float, additional_fields: dict[str, Any] | None, ) -> PhononSchema: + phonopy = get_phonopy(**get_phonopy_kwargs) parameters = force_job_results[-1].get("parameters") forces = [ output["results"]["forces"][: len(phonopy.supercell)] @@ -167,5 +168,11 @@ def _thermo_job( force_job_results = _get_forces_subflow(supercells) return _thermo_job( - atoms, phonopy, force_job_results, t_step, t_min, t_max, additional_fields + atoms, + get_phonopy_kwargs, + force_job_results, + t_step, + t_min, + t_max, + additional_fields, ) From 4871b37d6575adf0948b1159e0e07fa136af76e2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 17 Nov 2025 19:35:39 +0000 Subject: [PATCH 06/17] pre-commit auto-fixes --- src/quacc/recipes/common/phonons.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/quacc/recipes/common/phonons.py b/src/quacc/recipes/common/phonons.py index 8cd96a2872..a0329deaab 100644 --- a/src/quacc/recipes/common/phonons.py +++ b/src/quacc/recipes/common/phonons.py @@ -29,7 +29,7 @@ from quacc.types import PhononSchema if has_phonopy: - from phonopy import Phonopy + pass @subflow From 8b7220ff49ffeddcb7cb7a0099bc4c2ac59c6f11 Mon Sep 17 00:00:00 2001 From: "Andrew S. Rosen" Date: Mon, 17 Nov 2025 14:36:14 -0500 Subject: [PATCH 07/17] Update phonons.py --- src/quacc/recipes/common/phonons.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/quacc/recipes/common/phonons.py b/src/quacc/recipes/common/phonons.py index a0329deaab..0fb3b48818 100644 --- a/src/quacc/recipes/common/phonons.py +++ b/src/quacc/recipes/common/phonons.py @@ -28,8 +28,6 @@ from quacc import Job from quacc.types import PhononSchema - if has_phonopy: - pass @subflow From 61ff9627ac0cf7c0e486ad6c4405a24b69ae36cf Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 17 Nov 2025 19:36:23 +0000 Subject: [PATCH 08/17] pre-commit auto-fixes --- src/quacc/recipes/common/phonons.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/quacc/recipes/common/phonons.py b/src/quacc/recipes/common/phonons.py index 0fb3b48818..32438aa182 100644 --- a/src/quacc/recipes/common/phonons.py +++ b/src/quacc/recipes/common/phonons.py @@ -29,7 +29,6 @@ from quacc.types import PhononSchema - @subflow @requires(has_phonopy, "Phonopy must be installed. Run `pip install quacc[phonons]`") @requires(has_seekpath, "Seekpath must be installed. Run `pip install quacc[phonons]`") From d9217a2cdc4f63d01fec77f067777efc890d8ce8 Mon Sep 17 00:00:00 2001 From: "Andrew S. Rosen" Date: Mon, 17 Nov 2025 14:38:17 -0500 Subject: [PATCH 09/17] Update phonons.py --- src/quacc/recipes/common/phonons.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/quacc/recipes/common/phonons.py b/src/quacc/recipes/common/phonons.py index 32438aa182..668cefe4f2 100644 --- a/src/quacc/recipes/common/phonons.py +++ b/src/quacc/recipes/common/phonons.py @@ -131,7 +131,7 @@ def _thermo_job( t_max: float, additional_fields: dict[str, Any] | None, ) -> PhononSchema: - phonopy = get_phonopy(**get_phonopy_kwargs) + phonopy = get_phonopy(atoms, **get_phonopy_kwargs) parameters = force_job_results[-1].get("parameters") forces = [ output["results"]["forces"][: len(phonopy.supercell)] From 910f3fa2fea3ae4c1bf2a2151aa81349a42ea047 Mon Sep 17 00:00:00 2001 From: "Andrew S. Rosen" Date: Mon, 17 Nov 2025 14:53:04 -0500 Subject: [PATCH 10/17] Pass displaced_atoms to _thermo_job function --- src/quacc/recipes/common/phonons.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/quacc/recipes/common/phonons.py b/src/quacc/recipes/common/phonons.py index 668cefe4f2..875f95ebbf 100644 --- a/src/quacc/recipes/common/phonons.py +++ b/src/quacc/recipes/common/phonons.py @@ -165,7 +165,7 @@ def _thermo_job( force_job_results = _get_forces_subflow(supercells) return _thermo_job( - atoms, + displaced_atoms, get_phonopy_kwargs, force_job_results, t_step, From 0f4526e4b881ff0256b807fca613a4d6326cd0f2 Mon Sep 17 00:00:00 2001 From: "Andrew S. Rosen" Date: Mon, 17 Nov 2025 14:55:34 -0500 Subject: [PATCH 11/17] Modify phonon job to use displaced atoms --- src/quacc/recipes/common/phonons.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/quacc/recipes/common/phonons.py b/src/quacc/recipes/common/phonons.py index 875f95ebbf..c3c890f4ea 100644 --- a/src/quacc/recipes/common/phonons.py +++ b/src/quacc/recipes/common/phonons.py @@ -124,6 +124,8 @@ def _get_forces_subflow(supercells: list[Atoms]) -> list[dict]: @job def _thermo_job( atoms: Atoms, + displaced_atoms: Atoms, + non_displaced_atoms: Atoms, get_phonopy_kwargs: dict[str, Any], force_job_results: list[dict], t_step: float, @@ -131,7 +133,7 @@ def _thermo_job( t_max: float, additional_fields: dict[str, Any] | None, ) -> PhononSchema: - phonopy = get_phonopy(atoms, **get_phonopy_kwargs) + phonopy = get_phonopy(displaced_atoms, **get_phonopy_kwargs) parameters = force_job_results[-1].get("parameters") forces = [ output["results"]["forces"][: len(phonopy.supercell)] From 45b6d20112a51dce2efebeeae824f29721a7b56c Mon Sep 17 00:00:00 2001 From: "Andrew S. Rosen" Date: Mon, 17 Nov 2025 14:56:30 -0500 Subject: [PATCH 12/17] Update phonons.py --- src/quacc/recipes/common/phonons.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/quacc/recipes/common/phonons.py b/src/quacc/recipes/common/phonons.py index c3c890f4ea..a9a8197b06 100644 --- a/src/quacc/recipes/common/phonons.py +++ b/src/quacc/recipes/common/phonons.py @@ -167,7 +167,9 @@ def _thermo_job( force_job_results = _get_forces_subflow(supercells) return _thermo_job( + atoms, displaced_atoms, + non_displaced_atoms, get_phonopy_kwargs, force_job_results, t_step, From ec848603ad08668d5916522300f7ba5bdb2583f7 Mon Sep 17 00:00:00 2001 From: Vineet Bansal Date: Tue, 18 Nov 2025 11:19:08 -0500 Subject: [PATCH 13/17] testing against branch vb/flow_decorator of jobflow fork --- pyproject.toml | 2 +- src/quacc/wflow_tools/decorators.py | 2 +- tests/jobflow/test_tutorials.py | 5 +++-- tests/requirements-jobflow.txt | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 99da3bd3f1..36d5f0254e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ covalent = ["covalent>=0.234.1-rc.0; platform_system!='Windows'", "covalent-clou dask = ["dask[distributed]>=2023.12.1", "dask-jobqueue>=0.8.2"] db = ["maggma>=0.64.0"] defects = ["pymatgen-analysis-defects>=2024.10.22", "shakenbreak>=3.2.0"] -jobflow = ["jobflow @ git+https://github.com/vineetbansal/jobflow.git@vb/flow_decorator_quacc", "jobflow-remote>=0.1.0"] +jobflow = ["jobflow @ git+https://github.com/vineetbansal/jobflow.git@vb/flow_decorator", "jobflow-remote>=0.1.0"] mlp1 = ["torch-dftd>=0.4.0", "sevenn>=0.10.1", "orb-models>=0.4.1", "fairchem-core>=2.2.0"] mlp2 = ["mace-torch>=0.3.3", "matgl>=2.0.2"] mp = ["atomate2>=0.0.14"] diff --git a/src/quacc/wflow_tools/decorators.py b/src/quacc/wflow_tools/decorators.py index ff4128a78b..22c428ac78 100644 --- a/src/quacc/wflow_tools/decorators.py +++ b/src/quacc/wflow_tools/decorators.py @@ -669,7 +669,7 @@ def sync_wrapper(*f_args, **f_kwargs): def _get_jobflow_wrapped_flow(_func: Callable) -> Callable: from jobflow import flow as jf_flow - jobflow_flow = jf_flow(_func, return_dict=False) + jobflow_flow = jf_flow(_func) def wrapper(*args, **kwargs): return jobflow_flow(*args, **kwargs) diff --git a/tests/jobflow/test_tutorials.py b/tests/jobflow/test_tutorials.py index 66f2d8f131..79984a75b2 100644 --- a/tests/jobflow/test_tutorials.py +++ b/tests/jobflow/test_tutorials.py @@ -132,8 +132,9 @@ def workflow(): job2 = mult(job1.output, 3) return job2.output # or `return job` - response = jf.run_locally(workflow(), ensure_success=True) - assert response == 9 + f = workflow() + response = jf.run_locally(f, ensure_success=True) + assert response[f.output.uuid][1].output == 9 def test_comparison2(tmp_path, monkeypatch): diff --git a/tests/requirements-jobflow.txt b/tests/requirements-jobflow.txt index 303455a9b7..c65842ff74 100644 --- a/tests/requirements-jobflow.txt +++ b/tests/requirements-jobflow.txt @@ -1,2 +1,2 @@ -git+https://github.com/vineetbansal/jobflow.git@v0.2.1.post1 +git+https://github.com/vineetbansal/jobflow.git@v0.2.1.post2 jobflow-remote[gui]==0.1.8 From 599df7d223a5f3b968730d51457aa41238d084a7 Mon Sep 17 00:00:00 2001 From: Vineet Bansal Date: Wed, 19 Nov 2025 10:37:47 -0500 Subject: [PATCH 14/17] Documentation examples; Job wrapper with getitem access (temporary till we have it upstream); an xfail test --- docs/user/basics/wflow_decorators.md | 16 ++--- docs/user/wflow_engine/wflow_engines1.md | 18 +++++- docs/user/wflow_engine/wflow_engines2.md | 30 ++++++++- src/quacc/wflow_tools/customizers.py | 7 ++- src/quacc/wflow_tools/decorators.py | 74 ++++++++++++++++++++++- tests/jobflow/test_emt_recipes.py | 77 ++++++++++++++++++++++++ tests/jobflow/test_tutorials.py | 24 +++++++- 7 files changed, 226 insertions(+), 20 deletions(-) create mode 100644 tests/jobflow/test_emt_recipes.py diff --git a/docs/user/basics/wflow_decorators.md b/docs/user/basics/wflow_decorators.md index a93e57aeff..511efedd62 100644 --- a/docs/user/basics/wflow_decorators.md +++ b/docs/user/basics/wflow_decorators.md @@ -94,20 +94,16 @@ A `#!Python @subflow` in quacc is any workflow that returns a list of job output === "Jobflow" - Take a moment to read the Jobflow documentation's [Quick Start](https://materialsproject.github.io/jobflow/tutorials/1-quickstart.html) to get a sense of how Jobflow works. Namely, you should understand the `Job` and `Flow` definitions, which describe individual compute tasks and workflows, respectively. + Take a moment to read the Jobflow documentation's [Quick Start](https://materialsproject.github.io/jobflow/tutorials/1-quickstart.html) to get a sense of how Jobflow works. Namely, you should understand the concept of a `#!Python @job` and a `#!Python @flow` `Job`, which describe individual compute tasks and workflows, respectively.
- | Quacc | Jobflow | - | ------------------- | --------------- | - | `#!Python @job` | `#!Python @job` | - | `#!Python @flow` | N/A | - | `#!Python @subflow` | N/A | + | Quacc | Jobflow | + | ------------------- | ---------------- | + | `#!Python @job` | `#!Python @job` | + | `#!Python @flow` | `#!Python @flow` | + | `#!Python @subflow` | `#!Python @flow` |
- !!! Warning - - Due to the difference in how Jobflow handles workflows compared to other supported workflow engines, any quacc recipes that have been pre-defined with a `#!Python @flow` or `#!Python @subflow` decorator (i.e. have `_flow` in the name) cannot be run directly with Jobflow. - The quacc descriptors are drop-in replacements for the specified workflow engine analogue, which we will use for the remainder of the tutorials. Based on the value for the `WORKFLOW_ENGINE` global variable in your [quacc settings](../settings/settings.md), the appropriate decorator will be automatically selected. If the `WORKFLOW_ENGINE` setting is set to `None` (i.e. `quacc set WORKFLOW_ENGINE None`), the decorators will have no effect on the underlying function. diff --git a/docs/user/wflow_engine/wflow_engines1.md b/docs/user/wflow_engine/wflow_engines1.md index 053824a8bd..1392268b09 100644 --- a/docs/user/wflow_engine/wflow_engines1.md +++ b/docs/user/wflow_engine/wflow_engines1.md @@ -335,6 +335,20 @@ graph LR === "Jobflow" - !!! Warning + ```python + import jobflow as jf + from ase.build import bulk + from quacc.recipes.emt.slabs import bulk_to_slabs_flow + + # Define the Atoms object + atoms = bulk("Cu") - Due to the difference in how Jobflow handles workflows (particularly dynamic ones) compared to other supported workflow engines, any quacc recipes that have been pre-defined with a `#!Python @flow` decorator (i.e. have `_flow` in the name) cannot be run directly with Jobflow. Rather, a Jobflow-specific `Flow` needs to be constructed by the user. + # Create the workflow with arguments + workflow = bulk_to_slabs_flow(atoms) + + # Dispatch the workflow and get results + results = jf.run_locally(workflow) + + # Print the results + print(results) + ``` diff --git a/docs/user/wflow_engine/wflow_engines2.md b/docs/user/wflow_engine/wflow_engines2.md index 6a7bd57674..d430b57e73 100644 --- a/docs/user/wflow_engine/wflow_engines2.md +++ b/docs/user/wflow_engine/wflow_engines2.md @@ -795,6 +795,32 @@ graph LR === "Jobflow" - !!! Warning "Limitations" + ```python + import jobflow as jf + from ase.build import bulk + from quacc import flow + from quacc.recipes.emt.core import relax_job + from quacc.recipes.emt.slabs import bulk_to_slabs_flow + + + # Define the workflow + @flow + def relaxed_slabs_workflow(atoms): + relaxed_bulk = relax_job(atoms) + relaxed_slabs = bulk_to_slabs_flow(relaxed_bulk["atoms"], run_static=False) - Due to the difference in how Jobflow handles workflows (particularly dynamic ones) compared to other supported workflow engines, any quacc recipes that have been pre-defined with a `#!Python @flow` decorator (i.e. have `_flow` in the name) cannot be run directly with Jobflow. Rather, a Jobflow-specific `Flow` needs to be constructed by the user. + return relaxed_slabs + + + # Define the Atoms object + atoms = bulk("Cu") + + # Create the workflow with arguments + workflow = relaxed_slabs_workflow(atoms) + + # Dispatch the workflow and get results + results = jf.run_locally(workflow) + + # print the results + print(results) + ``` diff --git a/src/quacc/wflow_tools/customizers.py b/src/quacc/wflow_tools/customizers.py index d7fcb8cdbc..dd25a57dd1 100644 --- a/src/quacc/wflow_tools/customizers.py +++ b/src/quacc/wflow_tools/customizers.py @@ -142,7 +142,12 @@ def update_parameters( func = strip_decorator(func) return decorator_func(partial(func, **params)) - return partial(func, **params) + partial_fn = partial(func, **params) + # Assigning a __name__ allows monty's jsanitize function to work correctly + # with this partial function. + partial_fn.__name__ = func.__name__ + + return partial_fn def customize_funcs( diff --git a/src/quacc/wflow_tools/decorators.py b/src/quacc/wflow_tools/decorators.py index 22c428ac78..96181a348c 100644 --- a/src/quacc/wflow_tools/decorators.py +++ b/src/quacc/wflow_tools/decorators.py @@ -163,9 +163,7 @@ def wrapper(*f_args, **f_kwargs): return Delayed_(delayed(wrapper, **kwargs)) elif settings.WORKFLOW_ENGINE == "jobflow": - from jobflow import job as jf_job - - return jf_job(_func, **kwargs) + return _get_jobflow_wrapped_func(_func, **kwargs) elif settings.WORKFLOW_ENGINE == "parsl": from parsl import python_app @@ -666,6 +664,76 @@ def sync_wrapper(*f_args, **f_kwargs): return prefect_flow(_func, validate_parameters=False, **kwargs) +def _get_jobflow_wrapped_func(method=None, **job_kwargs): + """ + Custom wrapper for `@job` decorated functions for `jobflow`. + + We need this to emulate `@job` like behavior but want the decorated + function to return a `JobflowJobWrapper` class whose __getitem__ we can + intercept. + + This wrapper only needs to exist till this functionality is available + in `jobflow.Job` directly, after which it can simply be implemented as: + + from jobflow import job as jf_job + return jf_job(method, **job_kwargs) + + """ + from jobflow import Job + from jobflow import job as jf_job + + class JobflowJobWrapper(Job): + """A small Jobflow wrapper that holds a reference to a `jobflow.Job` + object, and relays all calls to it, except for `__getitem__` calls that + it relays to the `Job`'s `.output` attribute. + + This is to make the `@flow` recipes that index directly inside a `@job` + work correctly. For example: + + @job + def greetings(s): + return {"hello": f"Hello {s}", "bye": f"Goodbye {s}"} + + @job + def upper(s): + return s.upper() + + @flow + def greet(s): + job1 = greetings(s) + job2 = upper(job1["hello"]) + return job2.output + + This wrapper only needs to exist till this functionality is available + in `jobflow.Job` directly. + """ + + def __init__(self, job): + self._job = job + + def __getitem__(self, key): + return self._job.output[key] + + def __getattr__(self, item): + return getattr(self._job, item) + + def decorator(func): + jobflow_wrapped = jf_job(func, **job_kwargs) + + @wraps(func) + def wrapper(*args, **kw): + job = jobflow_wrapped(*args, **kw) + return JobflowJobWrapper(job) + + wrapper.original = func + + return wrapper + + if method is None: + return decorator + return decorator(method) + + def _get_jobflow_wrapped_flow(_func: Callable) -> Callable: from jobflow import flow as jf_flow diff --git a/tests/jobflow/test_emt_recipes.py b/tests/jobflow/test_emt_recipes.py new file mode 100644 index 0000000000..9bcbd36608 --- /dev/null +++ b/tests/jobflow/test_emt_recipes.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import pytest + +jobflow = pytest.importorskip("jobflow") + +from ase.build import bulk + +from quacc import flow, job +from quacc.recipes.emt.core import relax_job +from quacc.recipes.emt.slabs import bulk_to_slabs_flow # skipcq: PYL-C0412 + + +@pytest.mark.parametrize("job_decorators", [None, {"relax_job": job()}]) +def test_functools(tmp_path, monkeypatch, job_decorators): + monkeypatch.chdir(tmp_path) + atoms = bulk("Cu") + flow = bulk_to_slabs_flow( + atoms, + run_static=False, + job_params={"relax_job": {"opt_params": {"fmax": 0.1}}}, + job_decorators=job_decorators, + ) + output = jobflow.run_locally(flow, ensure_success=True) + assert len(output) == 4 + first_output = next(iter(output.values()))[1].output + assert "atoms" in first_output + assert first_output["parameters_opt"]["fmax"] == 0.1 + + +def test_copy_files(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + atoms = bulk("Cu") + + @flow + def myflow(atoms): + result1 = relax_job(atoms) + return relax_job(result1["atoms"], copy_files={result1["dir_name"]: "opt.*"}) + + output = jobflow.run_locally(myflow(atoms)) + first_output = next(iter(output.values()))[1].output + assert "atoms" in first_output + + +def test_relax_flow(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + atoms = bulk("Cu") + + @flow + def relax_flow(atoms): + result1 = relax_job(atoms) + return relax_job(result1["atoms"]) + + jobflow.run_locally(relax_flow(atoms), ensure_success=True) + + +@pytest.mark.xfail(reason="TBD", strict=True) +def test_relaxed_slabs(tmp_path, monkeypatch): + """ + The `recipes/common/slabs/bulk_to_slabs_subflow` subflow is currently not + equipped to work with the `atoms` argument being an `OutputReference`. + (since it passes that argument to external libraries that only work with + a realized `atoms` object). + + There are likely other cases of this pattern. This test should serve + as a placeholder till we find a general solution to this. + """ + + monkeypatch.chdir(tmp_path) + atoms = bulk("Cu") + + @flow + def workflow(atoms): + relaxed_bulk = relax_job(atoms) + return bulk_to_slabs_flow(relaxed_bulk["atoms"], run_static=False) + + jobflow.run_locally(workflow(atoms), ensure_success=True) diff --git a/tests/jobflow/test_tutorials.py b/tests/jobflow/test_tutorials.py index 79984a75b2..9a333d56be 100644 --- a/tests/jobflow/test_tutorials.py +++ b/tests/jobflow/test_tutorials.py @@ -33,7 +33,7 @@ def test_tutorial2a(tmp_path, monkeypatch): job1 = relax_job(atoms) # (1)! # Define Job 2, which takes the output of Job 1 as input - job2 = static_job(job1.output["atoms"]) # (2)! + job2 = static_job(job1["atoms"]) # (2)! # Define the workflow workflow = jf.Flow([job1, job2]) @@ -54,7 +54,7 @@ def workflow(atoms): # Define Job 1 job1 = relax_job(atoms) # Define Job 2, which takes the output of Job 1 as input - static_job(job1.output["atoms"]) + static_job(job1["atoms"]) # Run the workflow locally jf.run_locally(workflow(atoms), create_folders=True, ensure_success=True) @@ -96,6 +96,26 @@ def workflow(atoms1, atoms2): jf.run_locally(workflow(atoms1, atoms2), create_folders=True, ensure_success=True) +def test_job_getitem(): + @job + def greetings(s): + return {"hello": f"Hello {s}", "bye": f"Goodbye {s}"} + + @job + def upper(s): + return s.upper() + + @flow + def greet(s): + job1 = greetings(s) + job2 = upper(job1["hello"]) # No need for `job1.output["hello"]` + return job2.output + + workflow = greet("World") + response = jf.run_locally(workflow) + assert response[workflow.output.uuid][1].output == "HELLO WORLD" + + def test_comparison1(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) From 1d2947181b0a5dc1ad68dcbcd805cd88b6971039 Mon Sep 17 00:00:00 2001 From: Vineet Bansal Date: Wed, 19 Nov 2025 11:02:48 -0500 Subject: [PATCH 15/17] a more robust __name__ assignment for monty serialization --- src/quacc/wflow_tools/customizers.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/quacc/wflow_tools/customizers.py b/src/quacc/wflow_tools/customizers.py index dd25a57dd1..4ca0ed3b94 100644 --- a/src/quacc/wflow_tools/customizers.py +++ b/src/quacc/wflow_tools/customizers.py @@ -145,7 +145,12 @@ def update_parameters( partial_fn = partial(func, **params) # Assigning a __name__ allows monty's jsanitize function to work correctly # with this partial function. - partial_fn.__name__ = func.__name__ + if hasattr(func, "name"): + partial_fn.__name__ = func.name + elif hasattr(func, "__name__"): + partial_fn.__name__ = func.__name__ + else: + partial_fn.__name__ = "" return partial_fn From 16a5f6c41d9cedb2896a4f526708ef8a0cccb994 Mon Sep 17 00:00:00 2001 From: Vineet Bansal Date: Thu, 20 Nov 2025 21:04:32 -0500 Subject: [PATCH 16/17] quacc subflow mapped to a jobflow job --- docs/user/basics/wflow_decorators.md | 4 ++-- src/quacc/wflow_tools/decorators.py | 2 +- tests/jobflow/test_emt_recipes.py | 17 +---------------- 3 files changed, 4 insertions(+), 19 deletions(-) diff --git a/docs/user/basics/wflow_decorators.md b/docs/user/basics/wflow_decorators.md index 511efedd62..86986b36b1 100644 --- a/docs/user/basics/wflow_decorators.md +++ b/docs/user/basics/wflow_decorators.md @@ -94,7 +94,7 @@ A `#!Python @subflow` in quacc is any workflow that returns a list of job output === "Jobflow" - Take a moment to read the Jobflow documentation's [Quick Start](https://materialsproject.github.io/jobflow/tutorials/1-quickstart.html) to get a sense of how Jobflow works. Namely, you should understand the concept of a `#!Python @job` and a `#!Python @flow` `Job`, which describe individual compute tasks and workflows, respectively. + Take a moment to read the Jobflow documentation's [Quick Start](https://materialsproject.github.io/jobflow/tutorials/1-quickstart.html) to get a sense of how Jobflow works. Namely, you should understand the concept of a `#!Python @job` and a `#!Python @flow`, which describe individual compute tasks and workflows, respectively.
@@ -102,7 +102,7 @@ A `#!Python @subflow` in quacc is any workflow that returns a list of job output | ------------------- | ---------------- | | `#!Python @job` | `#!Python @job` | | `#!Python @flow` | `#!Python @flow` | - | `#!Python @subflow` | `#!Python @flow` | + | `#!Python @subflow` | `#!Python @job` |
diff --git a/src/quacc/wflow_tools/decorators.py b/src/quacc/wflow_tools/decorators.py index 96181a348c..3dd5811126 100644 --- a/src/quacc/wflow_tools/decorators.py +++ b/src/quacc/wflow_tools/decorators.py @@ -586,7 +586,7 @@ def wrapper(*f_args, **f_kwargs): return task(_func, namespace=_func.__module__, **kwargs) elif settings.WORKFLOW_ENGINE == "jobflow": - return _get_jobflow_wrapped_flow(_func) + return _get_jobflow_wrapped_func(_func, **kwargs) else: return _func diff --git a/tests/jobflow/test_emt_recipes.py b/tests/jobflow/test_emt_recipes.py index 9bcbd36608..23fc93a4d9 100644 --- a/tests/jobflow/test_emt_recipes.py +++ b/tests/jobflow/test_emt_recipes.py @@ -21,11 +21,7 @@ def test_functools(tmp_path, monkeypatch, job_decorators): job_params={"relax_job": {"opt_params": {"fmax": 0.1}}}, job_decorators=job_decorators, ) - output = jobflow.run_locally(flow, ensure_success=True) - assert len(output) == 4 - first_output = next(iter(output.values()))[1].output - assert "atoms" in first_output - assert first_output["parameters_opt"]["fmax"] == 0.1 + jobflow.run_locally(flow, ensure_success=True) def test_copy_files(tmp_path, monkeypatch): @@ -54,18 +50,7 @@ def relax_flow(atoms): jobflow.run_locally(relax_flow(atoms), ensure_success=True) -@pytest.mark.xfail(reason="TBD", strict=True) def test_relaxed_slabs(tmp_path, monkeypatch): - """ - The `recipes/common/slabs/bulk_to_slabs_subflow` subflow is currently not - equipped to work with the `atoms` argument being an `OutputReference`. - (since it passes that argument to external libraries that only work with - a realized `atoms` object). - - There are likely other cases of this pattern. This test should serve - as a placeholder till we find a general solution to this. - """ - monkeypatch.chdir(tmp_path) atoms = bulk("Cu") From 61b66e6c9ac1ca2059f7dd5abdda1a147dcfa666 Mon Sep 17 00:00:00 2001 From: Vineet Bansal Date: Thu, 20 Nov 2025 21:12:30 -0500 Subject: [PATCH 17/17] test tweaks now that subflow return jf.Job objects for Jobflow --- tests/jobflow/test_syntax.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/jobflow/test_syntax.py b/tests/jobflow/test_syntax.py index 3cf85ab063..87c0205f69 100644 --- a/tests/jobflow/test_syntax.py +++ b/tests/jobflow/test_syntax.py @@ -33,7 +33,7 @@ def workflow(a, b, c): assert isinstance(add(1, 2), jf.Job) assert isinstance(mult(1, 2), jf.Job) assert isinstance(workflow(1, 2, 3), jf.Flow) - assert isinstance(add_distributed([1, 2, 3], 4)[0], jf.Job) + assert isinstance(add_distributed([1, 2, 3], 4), jf.Job) def test_jobflow_decorators_args(tmp_path, monkeypatch): @@ -62,4 +62,4 @@ def workflow(a, b, c): assert isinstance(add(1, 2), jf.Job) assert isinstance(mult(1, 2), jf.Job) assert isinstance(workflow(1, 2, 3), jf.Flow) - assert isinstance(add_distributed([1, 2, 3], 4)[0], jf.Job) + assert isinstance(add_distributed([1, 2, 3], 4), jf.Job)