Skip to content

Commit 1641c96

Browse files
authored
Merge pull request #518 from nipype/rf/run-tasks
TST: Decrease divergence between Task/Workflow _run_task
2 parents 74957ca + 5cbca66 commit 1641c96

File tree

1 file changed

+63
-46
lines changed

1 file changed

+63
-46
lines changed

pydra/engine/core.py

Lines changed: 63 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -452,10 +452,40 @@ def __call__(
452452
res = self._run(rerun=rerun, **kwargs)
453453
return res
454454

455+
def _modify_inputs(self):
456+
"""Update and preserve a Task's original inputs"""
457+
orig_inputs = attr.asdict(self.inputs, recurse=False)
458+
map_copyfiles = copyfile_input(self.inputs, self.output_dir)
459+
modified_inputs = template_update(
460+
self.inputs, self.output_dir, map_copyfiles=map_copyfiles
461+
)
462+
if modified_inputs:
463+
self.inputs = attr.evolve(self.inputs, **modified_inputs)
464+
return orig_inputs
465+
466+
def _populate_filesystem(self, checksum, output_dir):
467+
"""
468+
Invoked immediately after the lockfile is generated, this function:
469+
- Creates the cache file
470+
- Clears existing outputs if `can_resume` is False
471+
- Generates a fresh output directory
472+
473+
Created as an attempt to simplify overlapping `Task`|`Workflow` behaviors.
474+
"""
475+
# adding info file with the checksum in case the task was cancelled
476+
# and the lockfile has to be removed
477+
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
478+
json.dump({"checksum": checksum}, jsonfile)
479+
if not self.can_resume and output_dir.exists():
480+
shutil.rmtree(output_dir)
481+
output_dir.mkdir(parents=False, exist_ok=self.can_resume)
482+
455483
def _run(self, rerun=False, **kwargs):
456484
self.inputs = attr.evolve(self.inputs, **kwargs)
457485
self.inputs.check_fields_input_spec()
486+
458487
checksum = self.checksum
488+
output_dir = self.output_dir
459489
lockfile = self.cache_dir / (checksum + ".lock")
460490
# Eagerly retrieve cached - see scenarios in __init__()
461491
self.hooks.pre_run(self)
@@ -464,47 +494,33 @@ def _run(self, rerun=False, **kwargs):
464494
result = self.result()
465495
if result is not None and not result.errored:
466496
return result
467-
# adding info file with the checksum in case the task was cancelled
468-
# and the lockfile has to be removed
469-
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
470-
json.dump({"checksum": self.checksum}, jsonfile)
471-
# Let only one equivalent process run
472-
odir = self.output_dir
473-
if not self.can_resume and odir.exists():
474-
shutil.rmtree(odir)
475497
cwd = os.getcwd()
476-
odir.mkdir(parents=False, exist_ok=True if self.can_resume else False)
477-
orig_inputs = attr.asdict(self.inputs, recurse=False)
478-
map_copyfiles = copyfile_input(self.inputs, self.output_dir)
479-
modified_inputs = template_update(
480-
self.inputs, self.output_dir, map_copyfiles=map_copyfiles
481-
)
482-
if modified_inputs:
483-
self.inputs = attr.evolve(self.inputs, **modified_inputs)
484-
self.audit.start_audit(odir)
498+
self._populate_filesystem(checksum, output_dir)
499+
orig_inputs = self._modify_inputs()
485500
result = Result(output=None, runtime=None, errored=False)
486501
self.hooks.pre_run_task(self)
502+
self.audit.start_audit(odir=output_dir)
487503
try:
488504
self.audit.monitor()
489505
self._run_task()
490-
result.output = self._collect_outputs(output_dir=odir)
506+
result.output = self._collect_outputs(output_dir=output_dir)
491507
except Exception:
492508
etype, eval, etr = sys.exc_info()
493509
traceback = format_exception(etype, eval, etr)
494-
record_error(self.output_dir, error=traceback)
510+
record_error(output_dir, error=traceback)
495511
result.errored = True
496512
raise
497513
finally:
498514
self.hooks.post_run_task(self, result)
499515
self.audit.finalize_audit(result)
500-
save(odir, result=result, task=self)
516+
save(output_dir, result=result, task=self)
501517
self.output_ = None
502518
# removing the additional file with the chcksum
503519
(self.cache_dir / f"{self.uid}_info.json").unlink()
504520
# # function etc. shouldn't change anyway, so removing
505-
orig_inputs = dict(
506-
(k, v) for (k, v) in orig_inputs.items() if not k.startswith("_")
507-
)
521+
orig_inputs = {
522+
k: v for k, v in orig_inputs.items() if not k.startswith("_")
523+
}
508524
self.inputs = attr.evolve(self.inputs, **orig_inputs)
509525
os.chdir(cwd)
510526
self.hooks.post_run(self, result)
@@ -1038,53 +1054,38 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
10381054
raise ValueError(
10391055
"Workflow output cannot be None, use set_output to define output(s)"
10401056
)
1041-
checksum = self.checksum
10421057
# creating connections that were defined after adding tasks to the wf
1043-
for task in self.graph.nodes:
1044-
# if workflow has task_rerun=True and propagate_rerun=True,
1045-
# it should be passed to the tasks
1046-
if self.task_rerun and self.propagate_rerun:
1047-
task.task_rerun = self.task_rerun
1048-
# if the task is a wf, than the propagate_rerun should be also set
1049-
if is_workflow(task):
1050-
task.propagate_rerun = self.propagate_rerun
1051-
task.cache_locations = task._cache_locations + self.cache_locations
1052-
self.create_connections(task)
1058+
self._connect_and_propagate_to_tasks()
1059+
1060+
checksum = self.checksum
1061+
output_dir = self.output_dir
10531062
lockfile = self.cache_dir / (checksum + ".lock")
10541063
self.hooks.pre_run(self)
10551064
async with PydraFileLock(lockfile):
1056-
# retrieve cached results
10571065
if not (rerun or self.task_rerun):
10581066
result = self.result()
10591067
if result is not None and not result.errored:
10601068
return result
1061-
# adding info file with the checksum in case the task was cancelled
1062-
# and the lockfile has to be removed
1063-
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
1064-
json.dump({"checksum": checksum}, jsonfile)
1065-
odir = self.output_dir
1066-
if not self.can_resume and odir.exists():
1067-
shutil.rmtree(odir)
10681069
cwd = os.getcwd()
1069-
odir.mkdir(parents=False, exist_ok=True if self.can_resume else False)
1070-
self.audit.start_audit(odir=odir)
1070+
self._populate_filesystem(checksum, output_dir)
10711071
result = Result(output=None, runtime=None, errored=False)
10721072
self.hooks.pre_run_task(self)
1073+
self.audit.start_audit(odir=output_dir)
10731074
try:
10741075
self.audit.monitor()
10751076
await self._run_task(submitter, rerun=rerun)
10761077
result.output = self._collect_outputs()
10771078
except Exception:
10781079
etype, eval, etr = sys.exc_info()
10791080
traceback = format_exception(etype, eval, etr)
1080-
record_error(self.output_dir, error=traceback)
1081+
record_error(output_dir, error=traceback)
10811082
result.errored = True
10821083
self._errored = True
10831084
raise
10841085
finally:
10851086
self.hooks.post_run_task(self, result)
10861087
self.audit.finalize_audit(result=result)
1087-
save(odir, result=result, task=self)
1088+
save(output_dir, result=result, task=self)
10881089
# removing the additional file with the chcksum
10891090
(self.cache_dir / f"{self.uid}_info.json").unlink()
10901091
os.chdir(cwd)
@@ -1226,6 +1227,22 @@ def create_dotfile(self, type="simple", export=None, name=None):
12261227
formatted_dot.append(self.graph.export_graph(dotfile=dotfile, ext=ext))
12271228
return dotfile, formatted_dot
12281229

1230+
def _connect_and_propagate_to_tasks(self):
1231+
"""
1232+
Visit each node in the graph and create the connections.
1233+
Additionally checks if all tasks should be rerun.
1234+
"""
1235+
for task in self.graph.nodes:
1236+
# if workflow has task_rerun=True and propagate_rerun=True,
1237+
# it should be passed to the tasks
1238+
if self.task_rerun and self.propagate_rerun:
1239+
task.task_rerun = self.task_rerun
1240+
# if the task is a wf, than the propagate_rerun should be also set
1241+
if is_workflow(task):
1242+
task.propagate_rerun = self.propagate_rerun
1243+
task.cache_locations = task._cache_locations + self.cache_locations
1244+
self.create_connections(task)
1245+
12291246

12301247
def is_task(obj):
12311248
"""Check whether an object looks like a task."""

0 commit comments

Comments
 (0)