1111import typeguard
1212
1313from parsl .dataflow .errors import BadCheckpoint
14+ from parsl .dataflow .futures import AppFuture
1415from parsl .dataflow .taskrecord import TaskRecord
1516
1617if TYPE_CHECKING :
@@ -339,8 +340,12 @@ def _load_checkpoints(self, checkpointDirs: Sequence[str]) -> Dict[str, Future[A
339340 data = pickle .load (f )
340341 # Copy and hash only the input attributes
341342 memo_fu : Future = Future ()
342- assert data ['exception' ] is None
343- memo_fu .set_result (data ['result' ])
343+
344+ if data ['exception' ] is None :
345+ memo_fu .set_result (data ['result' ])
346+ else :
347+ assert data ['result' ] is None
348+ memo_fu .set_exception (data ['exception' ])
344349 memo_lookup_table [data ['hash' ]] = memo_fu
345350
346351 except EOFError :
@@ -414,17 +419,22 @@ def checkpoint(self, tasks: Sequence[TaskRecord]) -> None:
414419
415420 app_fu = task_record ['app_fu' ]
416421
417- if app_fu .done () and app_fu .exception () is None :
422+ if app_fu .done () and self .filter_for_checkpoint (app_fu ):
423+
418424 hashsum = task_record ['hashsum' ]
419425 if not hashsum :
420426 continue
421- t = {'hash' : hashsum , 'exception' : None , 'result' : app_fu .result ()}
427+
428+ if app_fu .exception () is None :
429+ t = {'hash' : hashsum , 'exception' : None , 'result' : app_fu .result ()}
430+ else :
431+ t = {'hash' : hashsum , 'exception' : app_fu .exception (), 'result' : None }
422432
423433 # We are using pickle here since pickle dumps to a file in 'ab'
424434 # mode behave like a incremental log.
425435 pickle .dump (t , f )
426436 count += 1
427- logger .debug ("Task {} checkpointed" .format (task_id ))
437+ logger .debug ("Task {} checkpointed as result " .format (task_id ))
428438
429439 self .checkpointed_tasks += count
430440
@@ -435,3 +445,7 @@ def checkpoint(self, tasks: Sequence[TaskRecord]) -> None:
435445 logger .debug ("No tasks checkpointed in this pass." )
436446 else :
437447 logger .info ("Done checkpointing {} tasks" .format (count ))
448+
449+ def filter_for_checkpoint (self , app_fu : AppFuture ) -> bool :
450+ """Overridable method to decide if an entry should be checkpointed"""
451+ return app_fu .exception () is None
0 commit comments