Skip to content

Commit 36ad2df

Browse files
committed
FIX: Group logical parts together
1 parent a456a6b commit 36ad2df

File tree

1 file changed

+31
-27
lines changed

1 file changed

+31
-27
lines changed

pydra/engine/core.py

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -452,36 +452,33 @@ def __call__(
452452
res = self._run(rerun=rerun, **kwargs)
453453
return res
454454

455-
def prepare_run_task(self, rerun):
455+
def _modify_inputs(self):
456+
"""Update and preserve a Task's original inputs"""
457+
orig_inputs = attr.asdict(self.inputs, recurse=False)
458+
map_copyfiles = copyfile_input(self.inputs, self.output_dir)
459+
modified_inputs = template_update(
460+
self.inputs, self.output_dir, map_copyfiles=map_copyfiles
461+
)
462+
if modified_inputs:
463+
self.inputs = attr.evolve(self.inputs, **modified_inputs)
464+
return orig_inputs
465+
466+
def _populate_filesystem(self):
456467
"""
457468
Invoked immediately after the lockfile is generated, this function:
458-
- does a lot of things... (TODO)
459-
- Creates an empty Result and passes it along to be populated.
469+
- Creates the cache file
470+
- Clears existing outputs if `can_resume` is False
471+
- Generates a fresh output directory
460472
461473
Created as an attempt to simplify overlapping `Task`|`Workflow` behaviors.
462474
"""
463-
# retrieve cached results
464-
if not (rerun or self.task_rerun):
465-
result = self.result()
466475
# adding info file with the checksum in case the task was cancelled
467476
# and the lockfile has to be removed
468477
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
469478
json.dump({"checksum": self.checksum}, jsonfile)
470479
if not self.can_resume and self.output_dir.exists():
471480
shutil.rmtree(self.output_dir)
472481
self.output_dir.mkdir(parents=False, exist_ok=self.can_resume)
473-
if not is_workflow(self):
474-
self._orig_inputs = attr.asdict(self.inputs, recurse=False)
475-
map_copyfiles = copyfile_input(self.inputs, self.output_dir)
476-
modified_inputs = template_update(
477-
self.inputs, self.output_dir, map_copyfiles=map_copyfiles
478-
)
479-
if modified_inputs:
480-
self.inputs = attr.evolve(self.inputs, **modified_inputs)
481-
self.audit.start_audit(odir=self.output_dir)
482-
result = Result(output=None, runtime=None, errored=False)
483-
self.hooks.pre_run_task(self)
484-
return result
485482

486483
def _run(self, rerun=False, **kwargs):
487484
self.inputs = attr.evolve(self.inputs, **kwargs)
@@ -496,8 +493,13 @@ def _run(self, rerun=False, **kwargs):
496493
if result is not None and not result.errored:
497494
return result
498495
cwd = os.getcwd()
499-
result = self.prepare_run_task(rerun)
496+
self._populate_filesystem()
497+
orig_inputs = self._modify_inputs()
498+
# the output dir can be changed by _run_task (but should it??)
500499
orig_outdir = self.output_dir
500+
result = Result(output=None, runtime=None, errored=False)
501+
self.hooks.pre_run_task(self)
502+
self.audit.start_audit(odir=self.output_dir)
501503
try:
502504
self.audit.monitor()
503505
self._run_task()
@@ -516,12 +518,10 @@ def _run(self, rerun=False, **kwargs):
516518
# removing the additional file with the chcksum
517519
(self.cache_dir / f"{self.uid}_info.json").unlink()
518520
# # function etc. shouldn't change anyway, so removing
519-
self._orig_inputs = {
520-
k: v for k, v in self._orig_inputs.items() if not k.startswith("_")
521+
orig_inputs = {
522+
k: v for k, v in orig_inputs.items() if not k.startswith("_")
521523
}
522-
self.inputs = attr.evolve(self.inputs, **self._orig_inputs)
523-
# no need to propagate this
524-
del self._orig_inputs
524+
self.inputs = attr.evolve(self.inputs, **orig_inputs)
525525
os.chdir(cwd)
526526
self.hooks.post_run(self, result)
527527
return result
@@ -1055,7 +1055,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
10551055
"Workflow output cannot be None, use set_output to define output(s)"
10561056
)
10571057
# creating connections that were defined after adding tasks to the wf
1058-
self.connect_and_propagate_to_tasks()
1058+
self._connect_and_propagate_to_tasks()
10591059
lockfile = self.cache_dir / (self.checksum + ".lock")
10601060
self.hooks.pre_run(self)
10611061
async with PydraFileLock(lockfile):
@@ -1064,8 +1064,12 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
10641064
if result is not None and not result.errored:
10651065
return result
10661066
cwd = os.getcwd()
1067-
result = self.prepare_run_task(rerun)
1067+
self._populate_filesystem()
1068+
# the output dir can be changed by _run_task (but should it??)
10681069
orig_outdir = self.output_dir
1070+
result = Result(output=None, runtime=None, errored=False)
1071+
self.hooks.pre_run_task(self)
1072+
self.audit.start_audit(odir=self.output_dir)
10691073
try:
10701074
self.audit.monitor()
10711075
await self._run_task(submitter, rerun=rerun)
@@ -1222,7 +1226,7 @@ def create_dotfile(self, type="simple", export=None, name=None):
12221226
formatted_dot.append(self.graph.export_graph(dotfile=dotfile, ext=ext))
12231227
return dotfile, formatted_dot
12241228

1225-
def connect_and_propagate_to_tasks(self):
1229+
def _connect_and_propagate_to_tasks(self):
12261230
"""
12271231
Visit each node in the graph and create the connections.
12281232
Additionally checks if all tasks should be rerun.

0 commit comments

Comments
 (0)