@@ -195,7 +195,7 @@ def interface(self):
195
195
def result (self ):
196
196
"""Get result from result file (do not hold it in memory)"""
197
197
return _load_resultfile (
198
- op .join (self .output_dir (), 'result_%s.pklz' % self .name ))[ 0 ]
198
+ op .join (self .output_dir (), 'result_%s.pklz' % self .name ))
199
199
200
200
@property
201
201
def inputs (self ):
@@ -518,7 +518,7 @@ def _get_inputs(self):
518
518
logger .debug ('input: %s' , key )
519
519
results_file = info [0 ]
520
520
logger .debug ('results file: %s' , results_file )
521
- outputs = _load_resultfile (results_file )[ 0 ] .outputs
521
+ outputs = _load_resultfile (results_file ).outputs
522
522
if outputs is None :
523
523
raise RuntimeError ("""\
524
524
Error populating the input "%s" of node "%s": the results file of the source node \
@@ -565,34 +565,42 @@ def _run_interface(self, execute=True, updatehash=False):
565
565
566
566
def _load_results (self ):
567
567
cwd = self .output_dir ()
568
- result , aggregate , attribute_error = _load_resultfile (
569
- op .join (cwd , 'result_%s.pklz' % self .name ))
568
+
569
+ try :
570
+ result = _load_resultfile (
571
+ op .join (cwd , 'result_%s.pklz' % self .name ))
572
+ except (traits .TraitError , EOFError ):
573
+ logger .debug (
574
+ 'Error populating inputs/outputs, (re)aggregating results...' )
575
+ except (AttributeError , ImportError ) as err :
576
+ logger .debug ('attribute error: %s probably using '
577
+ 'different trait pickled file' , str (err ))
578
+ old_inputs = loadpkl (op .join (cwd , '_inputs.pklz' ))
579
+ self .inputs .trait_set (** old_inputs )
580
+ else :
581
+ return result
582
+
570
583
# try aggregating first
571
- if aggregate :
572
- logger .debug ('aggregating results' )
573
- if attribute_error :
574
- old_inputs = loadpkl (op .join (cwd , '_inputs.pklz' ))
575
- self .inputs .trait_set (** old_inputs )
576
- if not isinstance (self , MapNode ):
577
- self ._copyfiles_to_wd (linksonly = True )
578
- aggouts = self ._interface .aggregate_outputs (
579
- needed_outputs = self .needed_outputs )
580
- runtime = Bunch (
581
- cwd = cwd ,
582
- returncode = 0 ,
583
- environ = dict (os .environ ),
584
- hostname = socket .gethostname ())
585
- result = InterfaceResult (
586
- interface = self ._interface .__class__ ,
587
- runtime = runtime ,
588
- inputs = self ._interface .inputs .get_traitsfree (),
589
- outputs = aggouts )
590
- _save_resultfile (
591
- result , cwd , self .name ,
592
- rebase = str2bool (self .config ['execution' ]['use_relative_paths' ]))
593
- else :
594
- logger .debug ('aggregating mapnode results' )
595
- result = self ._run_interface ()
584
+ if not isinstance (self , MapNode ):
585
+ self ._copyfiles_to_wd (linksonly = True )
586
+ aggouts = self ._interface .aggregate_outputs (
587
+ needed_outputs = self .needed_outputs )
588
+ runtime = Bunch (
589
+ cwd = cwd ,
590
+ returncode = 0 ,
591
+ environ = dict (os .environ ),
592
+ hostname = socket .gethostname ())
593
+ result = InterfaceResult (
594
+ interface = self ._interface .__class__ ,
595
+ runtime = runtime ,
596
+ inputs = self ._interface .inputs .get_traitsfree (),
597
+ outputs = aggouts )
598
+ _save_resultfile (
599
+ result , cwd , self .name ,
600
+ rebase = str2bool (self .config ['execution' ]['use_relative_paths' ]))
601
+ else :
602
+ logger .debug ('aggregating mapnode results' )
603
+ result = self ._run_interface ()
596
604
return result
597
605
598
606
def _run_command (self , execute , copyfiles = True ):
0 commit comments