Skip to content

Commit 519e29c

Browse files
committed
wf.task_rerun is now propagating to all tasks, so if we want any task to rerun, some changes in wf.inputs has to be added
1 parent e88db6f commit 519e29c

File tree

2 files changed

+127
-12
lines changed

2 files changed

+127
-12
lines changed

pydra/engine/core.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,8 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
800800
return result
801801
# creating connections that were defined after adding tasks to the wf
802802
for task in self.graph.nodes:
803+
if self.task_rerun:
804+
task.task_rerun = self.task_rerun
803805
task.cache_locations = task._cache_locations + self.cache_locations
804806
self.create_connections(task)
805807
# TODO add signal handler for processes killed after lock acquisition

pydra/engine/tests/test_workflow.py

Lines changed: 125 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1993,8 +1993,7 @@ def test_wf_nostate_cachelocations_wftaskrerun(plugin, tmpdir):
19931993
"""
19941994
Two identical wfs with provided cache_dir;
19951995
the second wf has cache_locations and rerun=True,
1996-
so the workflow is rerun, but it doesn't propagate to nodes,
1997-
so none of the node has to be recalculated
1996+
so everything should be rerun even if Submitter doesn't have rerun
19981997
"""
19991998
cache_dir1 = tmpdir.mkdir("test_wf_cache3")
20001999
cache_dir2 = tmpdir.mkdir("test_wf_cache4")
@@ -2020,7 +2019,7 @@ def test_wf_nostate_cachelocations_wftaskrerun(plugin, tmpdir):
20202019
input_spec=["x", "y"],
20212020
cache_dir=cache_dir2,
20222021
cache_locations=cache_dir1,
2023-
rerun=True,
2022+
rerun=True, # wh has to be rerun
20242023
)
20252024
wf2.add(multiply(name="mult", x=wf2.lzin.x, y=wf2.lzin.y))
20262025
wf2.add(add2_wait(name="add2", x=wf2.mult.lzout.out))
@@ -2041,20 +2040,19 @@ def test_wf_nostate_cachelocations_wftaskrerun(plugin, tmpdir):
20412040
assert wf1.output_dir.exists()
20422041
assert wf2.output_dir.exists()
20432042

2044-
# even if the second wf is recomputed the nodes are not, so it's fast
2043+
# everything has to be recomputed
20452044
assert len(list(Path(cache_dir1).glob("F*"))) == 2
2046-
assert len(list(Path(cache_dir2).glob("F*"))) == 0
2045+
assert len(list(Path(cache_dir2).glob("F*"))) == 2
20472046
assert t1 > 3
2048-
assert t2 < 1
2047+
assert t2 > 3
20492048

20502049

20512050
@pytest.mark.parametrize("plugin", Plugins)
2052-
def test_wf_nostate_cachelocations_taskrerun(plugin, tmpdir):
2051+
def test_wf_nostate_cachelocations_taskrerun_noeffect(plugin, tmpdir):
20532052
"""
20542053
Two identical wfs with provided cache_dir;
2055-
the second wf has cache_locations and rerun=True
2056-
and one of the task also has rerun=True;
2057-
so the workflow is rerun, and one of the node also has to be rerun,
2054+
submitter doesn't have rerun and wf doesn't have rerun,
2055+
so even if the task has rerun=True, nothing will be rerun (wf will read the result)
20582056
"""
20592057
cache_dir1 = tmpdir.mkdir("test_wf_cache3")
20602058
cache_dir2 = tmpdir.mkdir("test_wf_cache4")
@@ -2080,13 +2078,75 @@ def test_wf_nostate_cachelocations_taskrerun(plugin, tmpdir):
20802078
input_spec=["x", "y"],
20812079
cache_dir=cache_dir2,
20822080
cache_locations=cache_dir1,
2083-
rerun=True,
20842081
)
20852082
wf2.add(multiply(name="mult", x=wf2.lzin.x, y=wf2.lzin.y))
2083+
# rerun on the task level will not have any effect if the wf doesn't start rerun
2084+
wf2.add(add2_wait(name="add2", x=wf2.mult.lzout.out, rerun=True))
2085+
wf2.set_output([("out", wf2.add2.lzout.out)])
2086+
wf2.inputs.x = 2
2087+
wf2.inputs.y = 3
2088+
wf2.plugin = plugin
2089+
2090+
t0 = time.time()
2091+
with Submitter(plugin=plugin) as sub:
2092+
sub(wf2)
2093+
t2 = time.time() - t0
2094+
2095+
results2 = wf2.result()
2096+
assert 8 == results2.output.out
2097+
2098+
# checking if the second wf doesn't runs again
2099+
assert wf1.output_dir.exists()
2100+
assert not wf2.output_dir.exists()
2101+
# should be fast
2102+
assert t1 > 3
2103+
assert t2 < 1
2104+
2105+
2106+
@pytest.mark.parametrize("plugin", Plugins)
2107+
def test_wf_nostate_cachelocations_taskrerun_wfmodif(plugin, tmpdir):
2108+
"""
2109+
Two identical wfs with provided cache_dir;
2110+
the second wf has cache_locations,
2111+
the second task has rerun=True, but because wf and submitter doesn't have rerun
2112+
it wouldn't rerun anyway (see previous test),
2113+
so wf inputs has to be modified to force wf to start rerunning,
2114+
the first task will read the result (doesn't have rerun),
2115+
but the second will be rerun
2116+
"""
2117+
cache_dir1 = tmpdir.mkdir("test_wf_cache3")
2118+
cache_dir2 = tmpdir.mkdir("test_wf_cache4")
2119+
2120+
wf1 = Workflow(name="wf", input_spec=["x", "y"], cache_dir=cache_dir1)
2121+
wf1.add(multiply(name="mult", x=wf1.lzin.x, y=wf1.lzin.y))
2122+
wf1.add(add2_wait(name="add2", x=wf1.mult.lzout.out))
2123+
wf1.set_output([("out", wf1.add2.lzout.out)])
2124+
wf1.inputs.x = 2
2125+
wf1.inputs.y = 3
2126+
wf1.plugin = plugin
2127+
2128+
t0 = time.time()
2129+
with Submitter(plugin=plugin) as sub:
2130+
sub(wf1)
2131+
t1 = time.time() - t0
2132+
2133+
results1 = wf1.result()
2134+
assert 8 == results1.output.out
2135+
2136+
wf2 = Workflow(
2137+
name="wf",
2138+
input_spec=["x", "y", "fake"],
2139+
cache_dir=cache_dir2,
2140+
cache_locations=cache_dir1,
2141+
)
2142+
wf2.add(multiply(name="mult", x=wf2.lzin.x, y=wf2.lzin.y))
2143+
# when wf starts rerun, this task will be rerun
20862144
wf2.add(add2_wait(name="add2", x=wf2.mult.lzout.out, rerun=True))
20872145
wf2.set_output([("out", wf2.add2.lzout.out)])
20882146
wf2.inputs.x = 2
20892147
wf2.inputs.y = 3
2148+
# adding a fake input, just to start wf rerun
2149+
wf2.inputs.fake = "whatever"
20902150
wf2.plugin = plugin
20912151

20922152
t0 = time.time()
@@ -2101,7 +2161,7 @@ def test_wf_nostate_cachelocations_taskrerun(plugin, tmpdir):
21012161
assert wf1.output_dir.exists()
21022162
assert wf2.output_dir.exists()
21032163

2104-
# the second task also has to be recomputed this time
2164+
# the first task shouldn't be recomputed, but the second one should be
21052165
assert len(list(Path(cache_dir1).glob("F*"))) == 2
21062166
assert len(list(Path(cache_dir2).glob("F*"))) == 1
21072167
assert t1 > 3
@@ -2165,6 +2225,59 @@ def test_wf_nostate_nodecachelocations(plugin, tmpdir):
21652225
assert len(list(Path(cache_dir2).glob("F*"))) == 1
21662226

21672227

2228+
@pytest.mark.parametrize("plugin", Plugins)
2229+
def test_wf_nostate_nodecachelocations_upd(plugin, tmpdir):
2230+
"""
2231+
Two wfs with different input, but the second node has the same input;
2232+
the second wf has cache_locations (set after adding tasks) and should recompute,
2233+
but without recomputing the second node
2234+
"""
2235+
cache_dir1 = tmpdir.mkdir("test_wf_cache3")
2236+
cache_dir2 = tmpdir.mkdir("test_wf_cache4")
2237+
2238+
wf1 = Workflow(name="wf", input_spec=["x"], cache_dir=cache_dir1)
2239+
wf1.add(ten(name="ten", x=wf1.lzin.x))
2240+
wf1.add(add2_wait(name="add2", x=wf1.ten.lzout.out))
2241+
wf1.set_output([("out", wf1.add2.lzout.out)])
2242+
wf1.inputs.x = 3
2243+
wf1.plugin = plugin
2244+
2245+
t0 = time.time()
2246+
with Submitter(plugin=plugin) as sub:
2247+
sub(wf1)
2248+
t1 = time.time() - t0
2249+
2250+
results1 = wf1.result()
2251+
assert 12 == results1.output.out
2252+
2253+
wf2 = Workflow(name="wf", input_spec=["x", "y"], cache_dir=cache_dir2)
2254+
wf2.add(ten(name="ten", x=wf2.lzin.x))
2255+
wf2.add(add2_wait(name="add2", x=wf2.ten.lzout.out))
2256+
wf2.set_output([("out", wf2.add2.lzout.out)])
2257+
wf2.inputs.x = 2
2258+
wf2.plugin = plugin
2259+
# updating cache_locations after adding the tasks
2260+
wf2.cache_locations = cache_dir1
2261+
2262+
t0 = time.time()
2263+
with Submitter(plugin=plugin) as sub:
2264+
sub(wf2)
2265+
t2 = time.time() - t0
2266+
2267+
results2 = wf2.result()
2268+
assert 12 == results2.output.out
2269+
2270+
# checking if the second wf runs again, but runs only one task
2271+
assert wf1.output_dir.exists()
2272+
assert wf2.output_dir.exists()
2273+
2274+
assert len(list(Path(cache_dir1).glob("F*"))) == 2
2275+
assert len(list(Path(cache_dir2).glob("F*"))) == 1
2276+
# checking execution time
2277+
assert t1 > 3
2278+
assert t2 < 0.5
2279+
2280+
21682281
@pytest.mark.parametrize("plugin", Plugins)
21692282
def test_wf_state_cachelocations(plugin, tmpdir):
21702283
"""

0 commit comments

Comments
 (0)