1111import typeguard
1212
1313from parsl .dataflow .errors import BadCheckpoint
14+ from parsl .dataflow .futures import AppFuture
1415from parsl .dataflow .taskrecord import TaskRecord
1516
1617if TYPE_CHECKING :
@@ -336,8 +337,12 @@ def _load_checkpoints(self, checkpointDirs: Sequence[str]) -> Dict[str, Future[A
336337 data = pickle .load (f )
337338 # Copy and hash only the input attributes
338339 memo_fu : Future = Future ()
339- assert data ['exception' ] is None
340- memo_fu .set_result (data ['result' ])
340+
341+ if data ['exception' ] is None :
342+ memo_fu .set_result (data ['result' ])
343+ else :
344+ assert data ['result' ] is None
345+ memo_fu .set_exception (data ['exception' ])
341346 memo_lookup_table [data ['hash' ]] = memo_fu
342347
343348 except EOFError :
@@ -418,17 +423,22 @@ def checkpoint(self, tasks: Sequence[TaskRecord]) -> str:
418423
419424 app_fu = task_record ['app_fu' ]
420425
421- if app_fu .done () and app_fu .exception () is None :
426+ if app_fu .done () and self .filter_for_checkpoint (app_fu ):
427+
422428 hashsum = task_record ['hashsum' ]
423429 if not hashsum :
424430 continue
425- t = {'hash' : hashsum , 'exception' : None , 'result' : app_fu .result ()}
431+
432+ if app_fu .exception () is None :
433+ t = {'hash' : hashsum , 'exception' : None , 'result' : app_fu .result ()}
434+ else :
435+ t = {'hash' : hashsum , 'exception' : app_fu .exception (), 'result' : None }
426436
427437 # We are using pickle here since pickle dumps to a file in 'ab'
428438 # mode behave like a incremental log.
429439 pickle .dump (t , f )
430440 count += 1
431- logger .debug ("Task {} checkpointed" .format (task_id ))
441+ logger .debug ("Task {} checkpointed as result " .format (task_id ))
432442
433443 self .checkpointed_tasks += count
434444
@@ -441,3 +451,7 @@ def checkpoint(self, tasks: Sequence[TaskRecord]) -> str:
441451 logger .info ("Done checkpointing {} tasks" .format (count ))
442452
443453 return checkpoint_dir
454+
455+ def filter_for_checkpoint (self , app_fu : AppFuture ) -> bool :
456+ """Overridable method to decide if an entry should be checkpointed"""
457+ return app_fu .exception () is None
0 commit comments