@@ -388,8 +388,12 @@ def _load_checkpoints(self, checkpointDirs: Sequence[str]) -> Dict[str, Future[A
388388 data = pickle .load (f )
389389 # Copy and hash only the input attributes
390390 memo_fu : Future = Future ()
391- assert data ['exception' ] is None
392- memo_fu .set_result (data ['result' ])
391+
392+ if data ['exception' ] is None :
393+ memo_fu .set_result (data ['result' ])
394+ else :
395+ assert data ['result' ] is None
396+ memo_fu .set_exception (data ['exception' ])
393397 memo_lookup_table [data ['hash' ]] = memo_fu
394398
395399 except EOFError :
@@ -472,20 +476,22 @@ def checkpoint(self, *, task: Optional[TaskRecord] = None, exception: Optional[B
472476 # TODO: refactor with below
473477
474478 task_id = task ['id' ]
475-
476- if exception is None :
477- hashsum = task ['hashsum' ]
478- if not hashsum :
479- pass # TODO: log an error? see below discussion
480- else :
479+ hashsum = task ['hashsum' ]
480+ if not hashsum :
481+ pass # TODO: log an error? see below discussion
482+ else :
483+ if exception is None and self .filter_result_for_checkpoint (result ):
481484 t = {'hash' : hashsum , 'exception' : None , 'result' : result }
482-
483- # We are using pickle here since pickle dumps to a file in 'ab'
484- # mode behave like a incremental log.
485485 pickle .dump (t , f )
486486 count += 1
487-
488- logger .debug ("Task {} checkpointed" .format (task_id ))
487+ logger .debug ("Task {} checkpointed result" .format (task_id ))
488+ elif exception is not None and self .filter_exception_for_checkpoint (exception ):
489+ t = {'hash' : hashsum , 'exception' : exception , 'result' : None }
490+ pickle .dump (t , f )
491+ count += 1
492+ logger .debug ("Task {} checkpointed exception" .format (task_id ))
493+ else :
494+ pass # no checkpoint - maybe debug log? TODO
489495 else :
490496 checkpoint_queue = self .checkpointable_tasks
491497
@@ -496,18 +502,22 @@ def checkpoint(self, *, task: Optional[TaskRecord] = None, exception: Optional[B
496502
497503 assert app_fu .done (), "trying to checkpoint a task that is not done"
498504
499- if app_fu .done () and app_fu .exception () is None :
500- hashsum = task_record ['hashsum' ]
501- if not hashsum :
502- continue # TODO: log an error? maybe some tasks don't have hashsums legitimately?
503- t = {'hash' : hashsum , 'exception' : None , 'result' : app_fu .result ()}
505+ hashsum = task_record ['hashsum' ]
506+ if not hashsum :
507+ continue # TODO: log an error? maybe some tasks don't have hashsums legitimately?
504508
505- # We are using pickle here since pickle dumps to a file in 'ab'
506- # mode behave like a incremental log.
509+ if app_fu . exception () is None and self . filter_result_for_checkpoint ( app_fu . result ()):
510+ t = { 'hash' : hashsum , 'exception' : None , 'result' : app_fu . result ()}
507511 pickle .dump (t , f )
508512 count += 1
509-
510- logger .debug ("Task {} checkpointed" .format (task_id ))
513+ logger .debug ("Task {} checkpointed result" .format (task_id ))
514+ elif (e := app_fu .exception ()) is not None and self .filter_exception_for_checkpoint (e ):
515+ t = {'hash' : hashsum , 'exception' : app_fu .exception (), 'result' : None }
516+ pickle .dump (t , f )
517+ count += 1
518+ logger .debug ("Task {} checkpointed exception" .format (task_id ))
519+ else :
520+ pass # TODO: maybe log at debug level
511521
512522 self .checkpointed_tasks += count
513523
@@ -525,3 +535,11 @@ def checkpoint(self, *, task: Optional[TaskRecord] = None, exception: Optional[B
525535 # Or maybe a failure of iteration if the list is appended to while checkpointing is happening?
526536 if not task :
527537 self .checkpointable_tasks = []
538+
539+ def filter_result_for_checkpoint (self , result : Any ) -> bool :
540+ """Overridable method to decide if an task that ended with a successful result should be checkpointed"""
541+ return True
542+
543+ def filter_exception_for_checkpoint (self , exception : BaseException ) -> bool :
544+ """Overridable method to decide if an entry that ended with an exception should be checkpointed"""
545+ return False
0 commit comments