Skip to content

Commit a2ddfe7

Browse files
authored
Merge pull request #409 from djarecka/mnt/wf_run
[mnt] changes in the run methods
2 parents 512fca6 + 480fd58 commit a2ddfe7

File tree

2 files changed

+17
-15
lines changed

2 files changed

+17
-15
lines changed

pydra/engine/core.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -433,15 +433,15 @@ def _run(self, rerun=False, **kwargs):
433433
lockfile = self.cache_dir / (checksum + ".lock")
434434
# Eagerly retrieve cached - see scenarios in __init__()
435435
self.hooks.pre_run(self)
436-
# adding info file with the checksum in case the task was cancelled
437-
# and the lockfile has to be removed
438-
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
439-
json.dump({"checksum": self.checksum}, jsonfile)
440436
with SoftFileLock(lockfile):
441437
if not (rerun or self.task_rerun):
442438
result = self.result()
443439
if result is not None:
444440
return result
441+
# adding info file with the checksum in case the task was cancelled
442+
# and the lockfile has to be removed
443+
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
444+
json.dump({"checksum": self.checksum}, jsonfile)
445445
# Let only one equivalent process run
446446
odir = self.output_dir
447447
if not self.can_resume and odir.exists():
@@ -965,11 +965,6 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
965965
"Workflow output cannot be None, use set_output to define output(s)"
966966
)
967967
checksum = self.checksum
968-
# Eagerly retrieve cached
969-
if not (rerun or self.task_rerun):
970-
result = self.result()
971-
if result is not None:
972-
return result
973968
# creating connections that were defined after adding tasks to the wf
974969
for task in self.graph.nodes:
975970
# if workflow has task_rerun=True and propagate_rerun=True,
@@ -981,15 +976,18 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
981976
task.propagate_rerun = self.propagate_rerun
982977
task.cache_locations = task._cache_locations + self.cache_locations
983978
self.create_connections(task)
984-
# TODO add signal handler for processes killed after lock acquisition
985-
# adding info file with the checksum in case the task was cancelled
986-
# and the lockfile has to be removed
987-
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
988-
json.dump({"checksum": checksum}, jsonfile)
989979
lockfile = self.cache_dir / (checksum + ".lock")
990980
self.hooks.pre_run(self)
991981
with SoftFileLock(lockfile):
992-
# # Let only one equivalent process run
982+
# retrieve cached results
983+
if not (rerun or self.task_rerun):
984+
result = self.result()
985+
if result is not None:
986+
return result
987+
# adding info file with the checksum in case the task was cancelled
988+
# and the lockfile has to be removed
989+
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
990+
json.dump({"checksum": checksum}, jsonfile)
993991
odir = self.output_dir
994992
if not self.can_resume and odir.exists():
995993
shutil.rmtree(odir)
@@ -1015,6 +1013,8 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
10151013
(self.cache_dir / f"{self.uid}_info.json").unlink()
10161014
os.chdir(cwd)
10171015
self.hooks.post_run(self, result)
1016+
if result is None:
1017+
raise Exception("This should never happen, please open new issue")
10181018
return result
10191019

10201020
async def _run_task(self, submitter, rerun=False):

pydra/engine/helpers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ def load_result(checksum, cache_locations):
109109
"""
110110
if not cache_locations:
111111
return None
112+
# TODO: if there are issues with loading, we might need to
113+
# TODO: sleep and repeat loads (after checkin that there are no lock files!)
112114
for location in cache_locations:
113115
if (location / checksum).exists():
114116
result_file = location / checksum / "_result.pklz"

0 commit comments

Comments
 (0)