diff --git a/docs/user/basics/wflow_decorators.md b/docs/user/basics/wflow_decorators.md index a93e57aeff..86986b36b1 100644 --- a/docs/user/basics/wflow_decorators.md +++ b/docs/user/basics/wflow_decorators.md @@ -94,20 +94,16 @@ A `#!Python @subflow` in quacc is any workflow that returns a list of job output === "Jobflow" - Take a moment to read the Jobflow documentation's [Quick Start](https://materialsproject.github.io/jobflow/tutorials/1-quickstart.html) to get a sense of how Jobflow works. Namely, you should understand the `Job` and `Flow` definitions, which describe individual compute tasks and workflows, respectively. + Take a moment to read the Jobflow documentation's [Quick Start](https://materialsproject.github.io/jobflow/tutorials/1-quickstart.html) to get a sense of how Jobflow works. Namely, you should understand the concept of a `#!Python @job` and a `#!Python @flow`, which describe individual compute tasks and workflows, respectively.
- | Quacc | Jobflow | - | ------------------- | --------------- | - | `#!Python @job` | `#!Python @job` | - | `#!Python @flow` | N/A | - | `#!Python @subflow` | N/A | + | Quacc | Jobflow | + | ------------------- | ---------------- | + | `#!Python @job` | `#!Python @job` | + | `#!Python @flow` | `#!Python @flow` | + | `#!Python @subflow` | `#!Python @job` |
- !!! Warning - - Due to the difference in how Jobflow handles workflows compared to other supported workflow engines, any quacc recipes that have been pre-defined with a `#!Python @flow` or `#!Python @subflow` decorator (i.e. have `_flow` in the name) cannot be run directly with Jobflow. - The quacc descriptors are drop-in replacements for the specified workflow engine analogue, which we will use for the remainder of the tutorials. Based on the value for the `WORKFLOW_ENGINE` global variable in your [quacc settings](../settings/settings.md), the appropriate decorator will be automatically selected. If the `WORKFLOW_ENGINE` setting is set to `None` (i.e. `quacc set WORKFLOW_ENGINE None`), the decorators will have no effect on the underlying function. diff --git a/docs/user/basics/wflow_overview.md b/docs/user/basics/wflow_overview.md index 48cce36c70..4d3973e212 100644 --- a/docs/user/basics/wflow_overview.md +++ b/docs/user/basics/wflow_overview.md @@ -110,10 +110,6 @@ Everyone's computing needs are different, so we ensured that quacc is interopera [Jobflow](https://github.com/materialsproject/jobflow) is developed and maintained by the Materials Project team at Lawrence Berkeley National Laboratory and serves as a seamless interface to [FireWorks](https://github.com/materialsproject/fireworks) or [Jobflow Remote](https://github.com/Matgenix/jobflow-remote) for dispatching and monitoring compute jobs. - !!! Warning - - Jobflow is not yet compatible with the `#!Python @flow` or `#!Python @subflow` decorators used in many quacc recipes and so should only be used if necessary. See [this issue](https://github.com/Quantum-Accelerators/quacc/issues/1061) to track the progress of this enhancement. - Pros: - Native support for a variety of databases diff --git a/docs/user/wflow_engine/wflow_engines1.md b/docs/user/wflow_engine/wflow_engines1.md index 053824a8bd..1392268b09 100644 --- a/docs/user/wflow_engine/wflow_engines1.md +++ b/docs/user/wflow_engine/wflow_engines1.md @@ -335,6 +335,20 @@ graph LR === "Jobflow" - !!! Warning + ```python + import jobflow as jf + from ase.build import bulk + from quacc.recipes.emt.slabs import bulk_to_slabs_flow + + # Define the Atoms object + atoms = bulk("Cu") - Due to the difference in how Jobflow handles workflows (particularly dynamic ones) compared to other supported workflow engines, any quacc recipes that have been pre-defined with a `#!Python @flow` decorator (i.e. have `_flow` in the name) cannot be run directly with Jobflow. Rather, a Jobflow-specific `Flow` needs to be constructed by the user. + # Create the workflow with arguments + workflow = bulk_to_slabs_flow(atoms) + + # Dispatch the workflow and get results + results = jf.run_locally(workflow) + + # Print the results + print(results) + ``` diff --git a/docs/user/wflow_engine/wflow_engines2.md b/docs/user/wflow_engine/wflow_engines2.md index 6a7bd57674..d430b57e73 100644 --- a/docs/user/wflow_engine/wflow_engines2.md +++ b/docs/user/wflow_engine/wflow_engines2.md @@ -795,6 +795,32 @@ graph LR === "Jobflow" - !!! Warning "Limitations" + ```python + import jobflow as jf + from ase.build import bulk + from quacc import flow + from quacc.recipes.emt.core import relax_job + from quacc.recipes.emt.slabs import bulk_to_slabs_flow + + + # Define the workflow + @flow + def relaxed_slabs_workflow(atoms): + relaxed_bulk = relax_job(atoms) + relaxed_slabs = bulk_to_slabs_flow(relaxed_bulk["atoms"], run_static=False) - Due to the difference in how Jobflow handles workflows (particularly dynamic ones) compared to other supported workflow engines, any quacc recipes that have been pre-defined with a `#!Python @flow` decorator (i.e. have `_flow` in the name) cannot be run directly with Jobflow. Rather, a Jobflow-specific `Flow` needs to be constructed by the user. + return relaxed_slabs + + + # Define the Atoms object + atoms = bulk("Cu") + + # Create the workflow with arguments + workflow = relaxed_slabs_workflow(atoms) + + # Dispatch the workflow and get results + results = jf.run_locally(workflow) + + # print the results + print(results) + ``` diff --git a/pyproject.toml b/pyproject.toml index bd5bc99da9..64cf17975b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ covalent = ["covalent>=0.234.1-rc.0; platform_system!='Windows'", "covalent-clou dask = ["dask[distributed]>=2023.12.1", "dask-jobqueue>=0.8.2"] defects = ["pymatgen-analysis-defects>=2024.10.22", "shakenbreak>=3.2.0"] fairchem = ["fairchem-data-omat>=0.2", "fairchem-data-oc>=1.0.2", "fairchem-core>=2.2.0"] -jobflow = ["jobflow>=0.1.14", "jobflow-remote>=0.1.0"] +jobflow = ["jobflow @ git+https://github.com/vineetbansal/jobflow.git@vb/flow_decorator", "jobflow-remote>=0.1.0"] orb = ["orb-models>=0.4.1"] mace = ["mace-torch>=0.3.3", "mace-models>=0.1.6"] matgl = ["matgl>=2.0.2"] diff --git a/src/quacc/wflow_tools/customizers.py b/src/quacc/wflow_tools/customizers.py index d7fcb8cdbc..4ca0ed3b94 100644 --- a/src/quacc/wflow_tools/customizers.py +++ b/src/quacc/wflow_tools/customizers.py @@ -142,7 +142,17 @@ def update_parameters( func = strip_decorator(func) return decorator_func(partial(func, **params)) - return partial(func, **params) + partial_fn = partial(func, **params) + # Assigning a __name__ allows monty's jsanitize function to work correctly + # with this partial function. + if hasattr(func, "name"): + partial_fn.__name__ = func.name + elif hasattr(func, "__name__"): + partial_fn.__name__ = func.__name__ + else: + partial_fn.__name__ = "" + + return partial_fn def customize_funcs( diff --git a/src/quacc/wflow_tools/decorators.py b/src/quacc/wflow_tools/decorators.py index f730b5038c..3dd5811126 100644 --- a/src/quacc/wflow_tools/decorators.py +++ b/src/quacc/wflow_tools/decorators.py @@ -163,9 +163,7 @@ def wrapper(*f_args, **f_kwargs): return Delayed_(delayed(wrapper, **kwargs)) elif settings.WORKFLOW_ENGINE == "jobflow": - from jobflow import job as jf_job - - return jf_job(_func, **kwargs) + return _get_jobflow_wrapped_func(_func, **kwargs) elif settings.WORKFLOW_ENGINE == "parsl": from parsl import python_app @@ -352,6 +350,8 @@ def workflow(a, b, c): return task(_func, namespace=_func.__module__, **kwargs) elif settings.WORKFLOW_ENGINE == "prefect": return _get_prefect_wrapped_flow(_func, settings, **kwargs) + elif settings.WORKFLOW_ENGINE == "jobflow": + return _get_jobflow_wrapped_flow(_func) else: return _func @@ -585,6 +585,8 @@ def wrapper(*f_args, **f_kwargs): from redun import task return task(_func, namespace=_func.__module__, **kwargs) + elif settings.WORKFLOW_ENGINE == "jobflow": + return _get_jobflow_wrapped_func(_func, **kwargs) else: return _func @@ -662,6 +664,87 @@ def sync_wrapper(*f_args, **f_kwargs): return prefect_flow(_func, validate_parameters=False, **kwargs) +def _get_jobflow_wrapped_func(method=None, **job_kwargs): + """ + Custom wrapper for `@job` decorated functions for `jobflow`. + + We need this to emulate `@job` like behavior but want the decorated + function to return a `JobflowJobWrapper` class whose __getitem__ we can + intercept. + + This wrapper only needs to exist till this functionality is available + in `jobflow.Job` directly, after which it can simply be implemented as: + + from jobflow import job as jf_job + return jf_job(method, **job_kwargs) + + """ + from jobflow import Job + from jobflow import job as jf_job + + class JobflowJobWrapper(Job): + """A small Jobflow wrapper that holds a reference to a `jobflow.Job` + object, and relays all calls to it, except for `__getitem__` calls that + it relays to the `Job`'s `.output` attribute. + + This is to make the `@flow` recipes that index directly inside a `@job` + work correctly. For example: + + @job + def greetings(s): + return {"hello": f"Hello {s}", "bye": f"Goodbye {s}"} + + @job + def upper(s): + return s.upper() + + @flow + def greet(s): + job1 = greetings(s) + job2 = upper(job1["hello"]) + return job2.output + + This wrapper only needs to exist till this functionality is available + in `jobflow.Job` directly. + """ + + def __init__(self, job): + self._job = job + + def __getitem__(self, key): + return self._job.output[key] + + def __getattr__(self, item): + return getattr(self._job, item) + + def decorator(func): + jobflow_wrapped = jf_job(func, **job_kwargs) + + @wraps(func) + def wrapper(*args, **kw): + job = jobflow_wrapped(*args, **kw) + return JobflowJobWrapper(job) + + wrapper.original = func + + return wrapper + + if method is None: + return decorator + return decorator(method) + + +def _get_jobflow_wrapped_flow(_func: Callable) -> Callable: + from jobflow import flow as jf_flow + + jobflow_flow = jf_flow(_func) + + def wrapper(*args, **kwargs): + return jobflow_flow(*args, **kwargs) + + return wrapper + + class Delayed_: """A small Dask-compatible, serializable object to wrap delayed functions that we don't want to execute. diff --git a/tests/jobflow/test_emt_recipes.py b/tests/jobflow/test_emt_recipes.py new file mode 100644 index 0000000000..23fc93a4d9 --- /dev/null +++ b/tests/jobflow/test_emt_recipes.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import pytest + +jobflow = pytest.importorskip("jobflow") + +from ase.build import bulk + +from quacc import flow, job +from quacc.recipes.emt.core import relax_job +from quacc.recipes.emt.slabs import bulk_to_slabs_flow # skipcq: PYL-C0412 + + +@pytest.mark.parametrize("job_decorators", [None, {"relax_job": job()}]) +def test_functools(tmp_path, monkeypatch, job_decorators): + monkeypatch.chdir(tmp_path) + atoms = bulk("Cu") + flow = bulk_to_slabs_flow( + atoms, + run_static=False, + job_params={"relax_job": {"opt_params": {"fmax": 0.1}}}, + job_decorators=job_decorators, + ) + jobflow.run_locally(flow, ensure_success=True) + + +def test_copy_files(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + atoms = bulk("Cu") + + @flow + def myflow(atoms): + result1 = relax_job(atoms) + return relax_job(result1["atoms"], copy_files={result1["dir_name"]: "opt.*"}) + + output = jobflow.run_locally(myflow(atoms)) + first_output = next(iter(output.values()))[1].output + assert "atoms" in first_output + + +def test_relax_flow(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + atoms = bulk("Cu") + + @flow + def relax_flow(atoms): + result1 = relax_job(atoms) + return relax_job(result1["atoms"]) + + jobflow.run_locally(relax_flow(atoms), ensure_success=True) + + +def test_relaxed_slabs(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + atoms = bulk("Cu") + + @flow + def workflow(atoms): + relaxed_bulk = relax_job(atoms) + return bulk_to_slabs_flow(relaxed_bulk["atoms"], run_static=False) + + jobflow.run_locally(workflow(atoms), ensure_success=True) diff --git a/tests/jobflow/test_syntax.py b/tests/jobflow/test_syntax.py index 332f074570..87c0205f69 100644 --- a/tests/jobflow/test_syntax.py +++ b/tests/jobflow/test_syntax.py @@ -32,8 +32,8 @@ def workflow(a, b, c): assert hasattr(mult, "original") assert isinstance(add(1, 2), jf.Job) assert isinstance(mult(1, 2), jf.Job) - assert isinstance(workflow(1, 2, 3), jf.Job) - assert isinstance(add_distributed([1, 2, 3], 4)[0], jf.Job) + assert isinstance(workflow(1, 2, 3), jf.Flow) + assert isinstance(add_distributed([1, 2, 3], 4), jf.Job) def test_jobflow_decorators_args(tmp_path, monkeypatch): @@ -61,5 +61,5 @@ def workflow(a, b, c): assert hasattr(mult, "original") assert isinstance(add(1, 2), jf.Job) assert isinstance(mult(1, 2), jf.Job) - assert isinstance(workflow(1, 2, 3), jf.Job) - assert isinstance(add_distributed([1, 2, 3], 4)[0], jf.Job) + assert isinstance(workflow(1, 2, 3), jf.Flow) + assert isinstance(add_distributed([1, 2, 3], 4), jf.Job) diff --git a/tests/jobflow/test_tutorials.py b/tests/jobflow/test_tutorials.py index 311fad9321..9a333d56be 100644 --- a/tests/jobflow/test_tutorials.py +++ b/tests/jobflow/test_tutorials.py @@ -6,7 +6,7 @@ from ase.build import bulk, molecule -from quacc import job +from quacc import flow, job from quacc.recipes.emt.core import relax_job, static_job # skipcq: PYL-C0412 @@ -33,7 +33,7 @@ def test_tutorial2a(tmp_path, monkeypatch): job1 = relax_job(atoms) # (1)! # Define Job 2, which takes the output of Job 1 as input - job2 = static_job(job1.output["atoms"]) # (2)! + job2 = static_job(job1["atoms"]) # (2)! # Define the workflow workflow = jf.Flow([job1, job2]) @@ -42,6 +42,24 @@ def test_tutorial2a(tmp_path, monkeypatch): jf.run_locally(workflow, create_folders=True, ensure_success=True) +def test_tutorial2a_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + # Make an Atoms object of a bulk Cu structure + atoms = bulk("Cu") + + # Define the workflow + @flow + def workflow(atoms): + # Define Job 1 + job1 = relax_job(atoms) + # Define Job 2, which takes the output of Job 1 as input + static_job(job1["atoms"]) + + # Run the workflow locally + jf.run_locally(workflow(atoms), create_folders=True, ensure_success=True) + + def test_tutorial2b(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) @@ -60,6 +78,44 @@ def test_tutorial2b(tmp_path, monkeypatch): jf.run_locally(workflow, create_folders=True, ensure_success=True) +def test_tutorial2b_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + # Define two Atoms objects + atoms1 = bulk("Cu") + atoms2 = molecule("N2") + + # Define the workflow + @flow + def workflow(atoms1, atoms2): + # Define two independent relaxation jobs + relax_job(atoms1) + relax_job(atoms2) + + # Run the workflow locally + jf.run_locally(workflow(atoms1, atoms2), create_folders=True, ensure_success=True) + + +def test_job_getitem(): + @job + def greetings(s): + return {"hello": f"Hello {s}", "bye": f"Goodbye {s}"} + + @job + def upper(s): + return s.upper() + + @flow + def greet(s): + job1 = greetings(s) + job2 = upper(job1["hello"]) # No need for `job1.output["hello"]` + return job2.output + + workflow = greet("World") + response = jf.run_locally(workflow) + assert response[workflow.output.uuid][1].output == "HELLO WORLD" + + def test_comparison1(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) @@ -79,6 +135,28 @@ def mult(a, b): assert responses[job2.uuid][1].output == 9 +def test_comparison1_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + @job # (1)! + def add(a, b): + return a + b + + @job + def mult(a, b): + return a * b + + @flow + def workflow(): + job1 = add(1, 2) + job2 = mult(job1.output, 3) + return job2.output # or `return job` + + f = workflow() + response = jf.run_locally(f, ensure_success=True) + assert response[f.output.uuid][1].output == 9 + + def test_comparison2(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) @@ -106,6 +184,33 @@ def add_distributed(vals, c): jf.run_locally(flow, ensure_success=True) # [6, 6, 6] in final 3 jobs +def test_comparison2_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + @jf.job + def add(a, b): + return a + b + + @jf.job + def make_more(val): + return [val] * 3 + + @jf.job + def add_distributed(vals, c): + jobs = [add(val, c) for val in vals] + + flow = jf.Flow(jobs) + return jf.Response(replace=flow) + + @jf.flow + def workflow(): + job1 = add(1, 2) + job2 = make_more(job1.output) + add_distributed(job2.output, 3) + + jf.run_locally(workflow(), ensure_success=True) # [6, 6, 6] in final 3 jobs + + def test_comparison3(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) @@ -122,3 +227,22 @@ def mult(a, b): flow = jf.Flow([job1, job2]) jf.run_locally(flow, ensure_success=True) + + +def test_comparison3_flow_decorator(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + @job # (1)! + def add(a, b): + return a + b + + @job + def mult(a, b): + return a * b + + @flow + def workflow(): + job1 = add(1, 2) + mult(job1.output, 3) + + jf.run_locally(workflow(), ensure_success=True) diff --git a/tests/requirements-jobflow.txt b/tests/requirements-jobflow.txt index 9d4745ae8e..c65842ff74 100644 --- a/tests/requirements-jobflow.txt +++ b/tests/requirements-jobflow.txt @@ -1,2 +1,2 @@ -jobflow==0.2.1 +git+https://github.com/vineetbansal/jobflow.git@v0.2.1.post2 jobflow-remote[gui]==0.1.8