Skip to content

Commit 6263b06

Browse files
committed
adding propagate_rerun to wf, so we can have wf that is trigerred, but doesn't rerun every single task, and wf that reruns everything (default)
1 parent 519e29c commit 6263b06

File tree

2 files changed

+38
-30
lines changed

2 files changed

+38
-30
lines changed

pydra/engine/core.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ def __init__(
614614
messengers=None,
615615
output_spec: ty.Optional[BaseSpec] = None,
616616
rerun=False,
617+
propagate_rerun=True,
617618
**kwargs,
618619
):
619620
"""
@@ -683,6 +684,8 @@ def __init__(
683684

684685
# store output connections
685686
self._connections = None
687+
# propagating rerun if task_rerun=True
688+
self.propagate_rerun = propagate_rerun
686689

687690
def __getattr__(self, name):
688691
if name == "lzin":
@@ -800,8 +803,13 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
800803
return result
801804
# creating connections that were defined after adding tasks to the wf
802805
for task in self.graph.nodes:
803-
if self.task_rerun:
806+
# if workflow has task_rerun=True and propagate_rerun=True,
807+
# it should be passed to the tasks
808+
if self.task_rerun and self.propagate_rerun:
804809
task.task_rerun = self.task_rerun
810+
# if the task is a wf, than the propagate_rerun should be also set
811+
if is_workflow(task):
812+
task.propagate_rerun = self.propagate_rerun
805813
task.cache_locations = task._cache_locations + self.cache_locations
806814
self.create_connections(task)
807815
# TODO add signal handler for processes killed after lock acquisition

pydra/engine/tests/test_workflow.py

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,11 +1989,11 @@ def test_wf_nostate_cachelocations_forcererun(plugin, tmpdir):
19891989

19901990

19911991
@pytest.mark.parametrize("plugin", Plugins)
1992-
def test_wf_nostate_cachelocations_wftaskrerun(plugin, tmpdir):
1992+
def test_wf_nostate_cachelocations_wftaskrerun_propagateTrue(plugin, tmpdir):
19931993
"""
1994-
Two identical wfs with provided cache_dir;
1995-
the second wf has cache_locations and rerun=True,
1996-
so everything should be rerun even if Submitter doesn't have rerun
1994+
Two identical wfs with provided cache_dir and cache_locations for the second one;
1995+
submitter doesn't have rerun, but the second wf has rerun=True,
1996+
propagate_rerun is True as default, so everything should be rerun
19971997
"""
19981998
cache_dir1 = tmpdir.mkdir("test_wf_cache3")
19991999
cache_dir2 = tmpdir.mkdir("test_wf_cache4")
@@ -2019,7 +2019,7 @@ def test_wf_nostate_cachelocations_wftaskrerun(plugin, tmpdir):
20192019
input_spec=["x", "y"],
20202020
cache_dir=cache_dir2,
20212021
cache_locations=cache_dir1,
2022-
rerun=True, # wh has to be rerun
2022+
rerun=True, # wh has to be rerun (default for propagate_rerun is True)
20232023
)
20242024
wf2.add(multiply(name="mult", x=wf2.lzin.x, y=wf2.lzin.y))
20252025
wf2.add(add2_wait(name="add2", x=wf2.mult.lzout.out))
@@ -2048,11 +2048,12 @@ def test_wf_nostate_cachelocations_wftaskrerun(plugin, tmpdir):
20482048

20492049

20502050
@pytest.mark.parametrize("plugin", Plugins)
2051-
def test_wf_nostate_cachelocations_taskrerun_noeffect(plugin, tmpdir):
2051+
def test_wf_nostate_cachelocations_wftaskrerun_propagateFalse(plugin, tmpdir):
20522052
"""
2053-
Two identical wfs with provided cache_dir;
2054-
submitter doesn't have rerun and wf doesn't have rerun,
2055-
so even if the task has rerun=True, nothing will be rerun (wf will read the result)
2053+
Two identical wfs with provided cache_dir and cache_locations for the second one;
2054+
submitter doesn't have rerun, but the second wf has rerun=True,
2055+
propagate_rerun is set to False, so wf will be triggered,
2056+
but tasks will not have rerun, so will use the previous results
20562057
"""
20572058
cache_dir1 = tmpdir.mkdir("test_wf_cache3")
20582059
cache_dir2 = tmpdir.mkdir("test_wf_cache4")
@@ -2078,10 +2079,11 @@ def test_wf_nostate_cachelocations_taskrerun_noeffect(plugin, tmpdir):
20782079
input_spec=["x", "y"],
20792080
cache_dir=cache_dir2,
20802081
cache_locations=cache_dir1,
2082+
rerun=True, # wh has to be rerun
2083+
propagate_rerun=False, # but rerun doesn't propagate to the tasks
20812084
)
20822085
wf2.add(multiply(name="mult", x=wf2.lzin.x, y=wf2.lzin.y))
2083-
# rerun on the task level will not have any effect if the wf doesn't start rerun
2084-
wf2.add(add2_wait(name="add2", x=wf2.mult.lzout.out, rerun=True))
2086+
wf2.add(add2_wait(name="add2", x=wf2.mult.lzout.out))
20852087
wf2.set_output([("out", wf2.add2.lzout.out)])
20862088
wf2.inputs.x = 2
20872089
wf2.inputs.y = 3
@@ -2095,24 +2097,23 @@ def test_wf_nostate_cachelocations_taskrerun_noeffect(plugin, tmpdir):
20952097
results2 = wf2.result()
20962098
assert 8 == results2.output.out
20972099

2098-
# checking if the second wf doesn't runs again
2100+
# checking if the second wf runs again
20992101
assert wf1.output_dir.exists()
2100-
assert not wf2.output_dir.exists()
2101-
# should be fast
2102+
assert wf2.output_dir.exists()
2103+
2104+
# tasks should not be recomputed
2105+
assert len(list(Path(cache_dir1).glob("F*"))) == 2
2106+
assert len(list(Path(cache_dir2).glob("F*"))) == 0
21022107
assert t1 > 3
21032108
assert t2 < 1
21042109

21052110

21062111
@pytest.mark.parametrize("plugin", Plugins)
2107-
def test_wf_nostate_cachelocations_taskrerun_wfmodif(plugin, tmpdir):
2112+
def test_wf_nostate_cachelocations_taskrerun_wfrerun_propagateFalse(plugin, tmpdir):
21082113
"""
2109-
Two identical wfs with provided cache_dir;
2110-
the second wf has cache_locations,
2111-
the second task has rerun=True, but because wf and submitter doesn't have rerun
2112-
it wouldn't rerun anyway (see previous test),
2113-
so wf inputs has to be modified to force wf to start rerunning,
2114-
the first task will read the result (doesn't have rerun),
2115-
but the second will be rerun
2114+
Two identical wfs with provided cache_dir, and cache_locations for teh second wf;
2115+
submitter doesn't have rerun, but wf has rerun=True,
2116+
since propagate_rerun=False, only tasks that have rerun=True will be rerun
21162117
"""
21172118
cache_dir1 = tmpdir.mkdir("test_wf_cache3")
21182119
cache_dir2 = tmpdir.mkdir("test_wf_cache4")
@@ -2135,18 +2136,18 @@ def test_wf_nostate_cachelocations_taskrerun_wfmodif(plugin, tmpdir):
21352136

21362137
wf2 = Workflow(
21372138
name="wf",
2138-
input_spec=["x", "y", "fake"],
2139+
input_spec=["x", "y"],
21392140
cache_dir=cache_dir2,
21402141
cache_locations=cache_dir1,
2142+
rerun=True,
2143+
propagate_rerun=False, # rerun will not be propagated to each task
21412144
)
21422145
wf2.add(multiply(name="mult", x=wf2.lzin.x, y=wf2.lzin.y))
2143-
# when wf starts rerun, this task will be rerun
2146+
# rerun on the task level needed (wf.propagate_rerun is False)
21442147
wf2.add(add2_wait(name="add2", x=wf2.mult.lzout.out, rerun=True))
21452148
wf2.set_output([("out", wf2.add2.lzout.out)])
21462149
wf2.inputs.x = 2
21472150
wf2.inputs.y = 3
2148-
# adding a fake input, just to start wf rerun
2149-
wf2.inputs.fake = "whatever"
21502151
wf2.plugin = plugin
21512152

21522153
t0 = time.time()
@@ -2157,11 +2158,10 @@ def test_wf_nostate_cachelocations_taskrerun_wfmodif(plugin, tmpdir):
21572158
results2 = wf2.result()
21582159
assert 8 == results2.output.out
21592160

2160-
# checking if the second wf runs again
2161+
# checking if the second wf doesn't runs again
21612162
assert wf1.output_dir.exists()
21622163
assert wf2.output_dir.exists()
2163-
2164-
# the first task shouldn't be recomputed, but the second one should be
2164+
# the second task should be recomputed
21652165
assert len(list(Path(cache_dir1).glob("F*"))) == 2
21662166
assert len(list(Path(cache_dir2).glob("F*"))) == 1
21672167
assert t1 > 3

0 commit comments

Comments
 (0)