Skip to content

Commit 29456be

Browse files
committed
debugged cache and rerunning in test_workflow
1 parent 7245524 commit 29456be

File tree

4 files changed

+53
-26
lines changed

4 files changed

+53
-26
lines changed

pydra/engine/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def cache_dir(self, path: os.PathLike):
172172
@property
173173
def cache_locations(self):
174174
"""Get the list of cache sources."""
175-
return self._cache_locations + ensure_list(self.cache_dir)
175+
return ensure_list(self.cache_dir) + self._cache_locations
176176

177177
@cache_locations.setter
178178
def cache_locations(self, locations):

pydra/engine/specs.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from fileformats.generic import FileSet, File
2020
from pydra.utils.messenger import AuditFlag, Messenger
2121
from pydra.utils.typing import is_optional, optional_type
22+
from pydra.utils.hash import register_serializer, hash_single
2223
from .helpers import (
2324
attrs_fields,
2425
attrs_values,
@@ -1350,3 +1351,21 @@ def argstr_formatting(argstr: str, values: dict[str, ty.Any]):
13501351
.strip()
13511352
)
13521353
return argstr_formatted
1354+
1355+
1356+
@register_serializer
1357+
def bytes_repr_task_def(obj: TaskDef, cache: Cache) -> ty.Iterator[bytes]:
1358+
yield f"task_def[{obj._task_type}]:(".encode()
1359+
for field in list_fields(obj):
1360+
yield f"{field.name}=".encode()
1361+
yield hash_single(getattr(obj, field.name), cache)
1362+
yield b","
1363+
yield b"_splitter="
1364+
yield hash_single(obj._splitter, cache)
1365+
yield b",_combiner="
1366+
yield hash_single(obj._combiner, cache)
1367+
yield b",_cont_dim="
1368+
yield hash_single(obj._cont_dim, cache)
1369+
yield b",_xor="
1370+
yield hash_single(obj._xor, cache)
1371+
yield b")"

pydra/engine/submitter.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ def expand_workflow(self, workflow_task: "Task[WorkflowDef]", rerun: bool) -> No
321321
tasks = self.get_runnable_tasks(exec_graph)
322322
while tasks or any(not n.done for n in exec_graph.nodes):
323323
for task in tasks:
324-
self.worker.run(task, rerun=rerun)
324+
self.worker.run(task, rerun=rerun and self.propagate_rerun)
325325
tasks = self.get_runnable_tasks(exec_graph)
326326

327327
async def expand_workflow_async(
@@ -418,9 +418,13 @@ async def expand_workflow_async(
418418
raise RuntimeError(msg)
419419
for task in tasks:
420420
if task.is_async:
421-
await self.worker.run_async(task, rerun=rerun)
421+
await self.worker.run_async(
422+
task, rerun=rerun and self.propagate_rerun
423+
)
422424
else:
423-
task_futures.add(self.worker.run(task, rerun=rerun))
425+
task_futures.add(
426+
self.worker.run(task, rerun=rerun and self.propagate_rerun)
427+
)
424428
task_futures = await self.worker.fetch_finished(task_futures)
425429
tasks = self.get_runnable_tasks(exec_graph)
426430

pydra/engine/tests/test_workflow.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2509,9 +2509,9 @@ def test_wf_nostate_cachelocations_wftaskrerun_propagateTrue(
25092509
submitter doesn't have rerun, but the second worky has rerun=True,
25102510
propagate_rerun is True as default, so everything should be rerun
25112511
"""
2512-
cache_dir1 = tmp_path / "test_wf_cache3"
2512+
cache_dir1 = tmp_path / "test_wf_cache1"
25132513
cache_dir1.mkdir()
2514-
cache_dir2 = tmp_path / "test_wf_cache4"
2514+
cache_dir2 = tmp_path / "test_wf_cache2"
25152515
cache_dir2.mkdir()
25162516

25172517
@workflow.define
@@ -2631,6 +2631,12 @@ def Worky2(x, y):
26312631
assert len(list(Path(cache_dir2).glob("python-*"))) == 0
26322632

26332633

2634+
@pytest.mark.xfail(
2635+
reason=(
2636+
"Cannot specify tasks within a workflow to be rerun, maybe rerun could take a "
2637+
"list of task names instead"
2638+
)
2639+
)
26342640
@pytest.mark.flaky(reruns=3)
26352641
def test_wf_nostate_cachelocations_taskrerun_wfrerun_propagateFalse(
26362642
plugin: str, tmp_path: Path
@@ -2640,9 +2646,9 @@ def test_wf_nostate_cachelocations_taskrerun_wfrerun_propagateFalse(
26402646
submitter doesn't have rerun, but worky has rerun=True,
26412647
since propagate_rerun=False, only tasks that have rerun=True will be rerun
26422648
"""
2643-
cache_dir1 = tmp_path / "test_wf_cache3"
2649+
cache_dir1 = tmp_path / "cache1"
26442650
cache_dir1.mkdir()
2645-
cache_dir2 = tmp_path / "test_wf_cache4"
2651+
cache_dir2 = tmp_path / "cache2"
26462652
cache_dir2.mkdir()
26472653

26482654
@workflow.define
@@ -2666,7 +2672,7 @@ def Worky1(x, y):
26662672
def Worky2(x, y):
26672673
mult = workflow.add(Multiply(x=x, y=y), name="mult")
26682674
# rerun on the task level needed (wf["propagate_rerun"] is False)
2669-
add2 = workflow.add(Add2Wait(x=mult.out, rerun=True), name="add2")
2675+
add2 = workflow.add(Add2Wait(x=mult.out), name="add2")
26702676
return add2.out
26712677

26722678
worky2 = Worky2(x=2, y=3)
@@ -2723,7 +2729,7 @@ def Worky1(x):
27232729
assert 12 == results1.outputs.out
27242730

27252731
@workflow.define
2726-
def Worky2(x, y):
2732+
def Worky2(x, y=None):
27272733

27282734
ten = workflow.add(Ten(x=x))
27292735
add2 = workflow.add(Add2(x=ten.out), name="add2")
@@ -2743,8 +2749,8 @@ def Worky2(x, y):
27432749
# checking if the second worky runs again, but runs only one task
27442750
assert results1.output_dir != results2.output_dir
27452751
# the second worky should rerun one task
2746-
assert len(list(Path(cache_dir1).glob("F*"))) == 2
2747-
assert len(list(Path(cache_dir2).glob("F*"))) == 1
2752+
assert len(list(Path(cache_dir1).glob("python-*"))) == 2
2753+
assert len(list(Path(cache_dir2).glob("python-*"))) == 1
27482754

27492755

27502756
@pytest.mark.flaky(reruns=3)
@@ -2775,17 +2781,16 @@ def Worky1(x):
27752781
assert 12 == results1.outputs.out
27762782

27772783
@workflow.define
2778-
def Worky2(x, y):
2784+
def Worky2(x, y=None):
27792785
ten = workflow.add(Ten(x=x))
27802786
add2 = workflow.add(Add2(x=ten.out), name="add2")
27812787
return add2.out
27822788

27832789
worky2 = Worky2(x=2)
27842790

2785-
# updating cache_locations after adding the tasks
2786-
worky2.cache_locations = cache_dir1
2787-
2788-
with Submitter(worker=plugin, cache_dir=cache_dir2) as sub:
2791+
with Submitter(
2792+
worker=plugin, cache_dir=cache_dir2, cache_locations=cache_dir1
2793+
) as sub:
27892794
results2 = sub(worky2)
27902795

27912796
assert not results2.errored, "\n".join(results2.errors["error message"])
@@ -2795,8 +2800,8 @@ def Worky2(x, y):
27952800
# checking if the second worky runs again, but runs only one task
27962801
assert results1.output_dir != results2.output_dir
27972802
# the second worky should have only one task run
2798-
assert len(list(Path(cache_dir1).glob("F*"))) == 2
2799-
assert len(list(Path(cache_dir2).glob("F*"))) == 1
2803+
assert len(list(Path(cache_dir1).glob("python-*"))) == 2
2804+
assert len(list(Path(cache_dir2).glob("python-*"))) == 1
28002805

28012806

28022807
@pytest.mark.flaky(reruns=3)
@@ -2846,8 +2851,7 @@ def Worky2(x, y):
28462851
assert not results2.errored, "\n".join(results2.errors["error message"])
28472852
t2 = time.time() - t0
28482853

2849-
assert results2.outputs.out[0] == 8
2850-
assert results2.outputs.out[1] == 82
2854+
assert results2.outputs.out == [8, 82]
28512855

28522856
# for win and dask/slurm the time for dir creation etc. might take much longer
28532857
if not sys.platform.startswith("win") and plugin == "cf":
@@ -3410,7 +3414,7 @@ def Worky1(x, y):
34103414
# checkoing output_dir after the first run
34113415

34123416
# saving the content of the cache dit after the first run
3413-
cache_dir_content = os.listdir(worky1.cache_dir)
3417+
cache_dir_content = os.listdir(cache_dir1)
34143418

34153419
# running workflow the second time
34163420
t0 = time.time()
@@ -3422,7 +3426,7 @@ def Worky1(x, y):
34223426

34233427
assert 8 == results1.outputs.out
34243428
# checking if no new directory is created
3425-
assert cache_dir_content == os.listdir(worky1.cache_dir)
3429+
assert cache_dir_content == os.listdir(cache_dir1)
34263430

34273431
# for win and dask/slurm the time for dir creation etc. might take much longer
34283432
if not sys.platform.startswith("win") and plugin == "cf":
@@ -3458,10 +3462,10 @@ def Worky1(x, y):
34583462
assert 602 == results1.outputs.out[1]
34593463

34603464
# checkoing output_dir after the first run
3461-
assert [odir.exists() for odir in worky1.output_dir]
3465+
assert results1.output_dir.exists()
34623466

34633467
# saving the content of the cache dit after the first run
3464-
cache_dir_content = os.listdir(worky1.cache_dir)
3468+
cache_dir_content = os.listdir(results1.job.cache_dir)
34653469

34663470
# running workflow the second time
34673471
t0 = time.time()
@@ -3474,7 +3478,7 @@ def Worky1(x, y):
34743478
assert 8 == results1.outputs.out[0]
34753479
assert 602 == results1.outputs.out[1]
34763480
# checking if no new directory is created
3477-
assert cache_dir_content == os.listdir(worky1.cache_dir)
3481+
assert cache_dir_content == os.listdir(results1.job.cache_dir)
34783482
# for win and dask/slurm the time for dir creation etc. might take much longer
34793483
if not sys.platform.startswith("win") and plugin == "cf":
34803484
# checking the execution time

0 commit comments

Comments
 (0)