@@ -463,7 +463,7 @@ def _modify_inputs(self):
463
463
self .inputs = attr .evolve (self .inputs , ** modified_inputs )
464
464
return orig_inputs
465
465
466
- def _populate_filesystem (self ):
466
+ def _populate_filesystem (self , checksum , output_dir ):
467
467
"""
468
468
Invoked immediately after the lockfile is generated, this function:
469
469
- Creates the cache file
@@ -475,16 +475,18 @@ def _populate_filesystem(self):
475
475
# adding info file with the checksum in case the task was cancelled
476
476
# and the lockfile has to be removed
477
477
with open (self .cache_dir / f"{ self .uid } _info.json" , "w" ) as jsonfile :
478
- json .dump ({"checksum" : self . checksum }, jsonfile )
479
- if not self .can_resume and self . output_dir .exists ():
480
- shutil .rmtree (self . output_dir )
481
- self . output_dir .mkdir (parents = False , exist_ok = self .can_resume )
478
+ json .dump ({"checksum" : checksum }, jsonfile )
479
+ if not self .can_resume and output_dir .exists ():
480
+ shutil .rmtree (output_dir )
481
+ output_dir .mkdir (parents = False , exist_ok = self .can_resume )
482
482
483
483
def _run (self , rerun = False , ** kwargs ):
484
484
self .inputs = attr .evolve (self .inputs , ** kwargs )
485
485
self .inputs .check_fields_input_spec ()
486
486
487
- lockfile = self .cache_dir / (self .checksum + ".lock" )
487
+ checksum = self .checksum
488
+ output_dir = self .output_dir
489
+ lockfile = self .cache_dir / (checksum + ".lock" )
488
490
# Eagerly retrieve cached - see scenarios in __init__()
489
491
self .hooks .pre_run (self )
490
492
with SoftFileLock (lockfile ):
@@ -493,27 +495,25 @@ def _run(self, rerun=False, **kwargs):
493
495
if result is not None and not result .errored :
494
496
return result
495
497
cwd = os .getcwd ()
496
- self ._populate_filesystem ()
498
+ self ._populate_filesystem (checksum , output_dir )
497
499
orig_inputs = self ._modify_inputs ()
498
- # the output dir can be changed by _run_task (but should it??)
499
- orig_outdir = self .output_dir
500
500
result = Result (output = None , runtime = None , errored = False )
501
501
self .hooks .pre_run_task (self )
502
- self .audit .start_audit (odir = self . output_dir )
502
+ self .audit .start_audit (odir = output_dir )
503
503
try :
504
504
self .audit .monitor ()
505
505
self ._run_task ()
506
- result .output = self ._collect_outputs (output_dir = orig_outdir )
506
+ result .output = self ._collect_outputs (output_dir = output_dir )
507
507
except Exception :
508
508
etype , eval , etr = sys .exc_info ()
509
509
traceback = format_exception (etype , eval , etr )
510
- record_error (self . output_dir , error = traceback )
510
+ record_error (output_dir , error = traceback )
511
511
result .errored = True
512
512
raise
513
513
finally :
514
514
self .hooks .post_run_task (self , result )
515
515
self .audit .finalize_audit (result )
516
- save (orig_outdir , result = result , task = self )
516
+ save (output_dir , result = result , task = self )
517
517
self .output_ = None
518
518
# removing the additional file with the chcksum
519
519
(self .cache_dir / f"{ self .uid } _info.json" ).unlink ()
@@ -1056,35 +1056,36 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
1056
1056
)
1057
1057
# creating connections that were defined after adding tasks to the wf
1058
1058
self ._connect_and_propagate_to_tasks ()
1059
- lockfile = self .cache_dir / (self .checksum + ".lock" )
1059
+
1060
+ checksum = self .checksum
1061
+ output_dir = self .output_dir
1062
+ lockfile = self .cache_dir / (checksum + ".lock" )
1060
1063
self .hooks .pre_run (self )
1061
1064
async with PydraFileLock (lockfile ):
1062
1065
if not (rerun or self .task_rerun ):
1063
1066
result = self .result ()
1064
1067
if result is not None and not result .errored :
1065
1068
return result
1066
1069
cwd = os .getcwd ()
1067
- self ._populate_filesystem ()
1068
- # the output dir can be changed by _run_task (but should it??)
1069
- orig_outdir = self .output_dir
1070
+ self ._populate_filesystem (checksum , output_dir )
1070
1071
result = Result (output = None , runtime = None , errored = False )
1071
1072
self .hooks .pre_run_task (self )
1072
- self .audit .start_audit (odir = self . output_dir )
1073
+ self .audit .start_audit (odir = output_dir )
1073
1074
try :
1074
1075
self .audit .monitor ()
1075
1076
await self ._run_task (submitter , rerun = rerun )
1076
1077
result .output = self ._collect_outputs ()
1077
1078
except Exception :
1078
1079
etype , eval , etr = sys .exc_info ()
1079
1080
traceback = format_exception (etype , eval , etr )
1080
- record_error (self . output_dir , error = traceback )
1081
+ record_error (output_dir , error = traceback )
1081
1082
result .errored = True
1082
1083
self ._errored = True
1083
1084
raise
1084
1085
finally :
1085
1086
self .hooks .post_run_task (self , result )
1086
1087
self .audit .finalize_audit (result = result )
1087
- save (orig_outdir , result = result , task = self )
1088
+ save (output_dir , result = result , task = self )
1088
1089
# removing the additional file with the chcksum
1089
1090
(self .cache_dir / f"{ self .uid } _info.json" ).unlink ()
1090
1091
os .chdir (cwd )
0 commit comments