Skip to content

Commit 591a2dd

Browse files
committed
passing wf cache_locations to each node
1 parent c57d830 commit 591a2dd

File tree

3 files changed

+65
-0
lines changed

3 files changed

+65
-0
lines changed

pydra/engine/core.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
795795
return result
796796
# creating connections that were defined after adding tasks to the wf
797797
for task in self.graph.nodes:
798+
task.cache_locations = task._cache_locations + self.cache_locations
798799
self.create_connections(task)
799800
# TODO add signal handler for processes killed after lock acquisition
800801
self.hooks.pre_run(self)

pydra/engine/tests/test_workflow.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
import shutil, os
33
import time
44
import attr
5+
from pathlib import Path
56

67
from .utils import (
78
add2,
89
add2_wait,
910
multiply,
1011
power,
12+
ten,
1113
identity,
1214
list_output,
1315
fun_addvar3,
@@ -1986,6 +1988,63 @@ def test_wf_nostate_cachelocations_forcererun(plugin, tmpdir):
19861988
assert wf2.output_dir.exists()
19871989

19881990

1991+
@pytest.mark.parametrize("plugin", Plugins)
1992+
def test_wf_nostate_nodecachelocations(plugin, tmpdir):
1993+
"""
1994+
Two wfs with different input, but the second node has the same input;
1995+
the second wf has cache_locations and should recompute the wf,
1996+
but without recomputing the second node
1997+
"""
1998+
cache_dir1 = tmpdir.mkdir("test_wf_cache3")
1999+
cache_dir2 = tmpdir.mkdir("test_wf_cache4")
2000+
2001+
wf1 = Workflow(name="wf", input_spec=["x"], cache_dir=cache_dir1)
2002+
wf1.add(ten(name="ten", x=wf1.lzin.x))
2003+
wf1.add(add2_wait(name="add2", x=wf1.ten.lzout.out))
2004+
wf1.set_output([("out", wf1.add2.lzout.out)])
2005+
wf1.inputs.x = 3
2006+
wf1.plugin = plugin
2007+
2008+
t0 = time.time()
2009+
with Submitter(plugin=plugin) as sub:
2010+
sub(wf1)
2011+
t1 = time.time() - t0
2012+
2013+
results1 = wf1.result()
2014+
assert 12 == results1.output.out
2015+
2016+
wf2 = Workflow(
2017+
name="wf",
2018+
input_spec=["x", "y"],
2019+
cache_dir=cache_dir2,
2020+
cache_locations=cache_dir1,
2021+
)
2022+
wf2.add(ten(name="ten", x=wf2.lzin.x))
2023+
wf2.add(add2_wait(name="add2", x=wf2.ten.lzout.out))
2024+
wf2.set_output([("out", wf2.add2.lzout.out)])
2025+
wf2.inputs.x = 2
2026+
wf2.plugin = plugin
2027+
2028+
t0 = time.time()
2029+
with Submitter(plugin=plugin) as sub:
2030+
sub(wf2)
2031+
t2 = time.time() - t0
2032+
2033+
results2 = wf2.result()
2034+
assert 12 == results2.output.out
2035+
2036+
# checking execution time
2037+
assert t1 > 3
2038+
assert t2 < 0.5
2039+
2040+
# checking if the second wf runs again, but runs only one task
2041+
assert wf1.output_dir.exists()
2042+
assert wf2.output_dir.exists()
2043+
2044+
assert len(list(Path(cache_dir1).glob("F*"))) == 2
2045+
assert len(list(Path(cache_dir2).glob("F*"))) == 1
2046+
2047+
19892048
@pytest.mark.parametrize("plugin", Plugins)
19902049
def test_wf_state_cachelocations(plugin, tmpdir):
19912050
"""

pydra/engine/tests/utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ def identity(x):
8787
return x
8888

8989

90+
@mark.task
91+
def ten(x):
92+
return 10
93+
94+
9095
@mark.task
9196
def add2_wait(x):
9297
time.sleep(3)

0 commit comments

Comments
 (0)