@@ -175,7 +175,6 @@ def __init__(self, config: Config) -> None:
175175 self ._checkpoint_timer = None
176176 self .checkpoint_mode = config .checkpoint_mode
177177 self ._modify_checkpointable_tasks_lock = threading .Lock ()
178- self .checkpointable_tasks : List [TaskRecord ] = []
179178
180179 # this must be set before executors are added since add_executors calls
181180 # job_status_poller.add_executors.
@@ -568,7 +567,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
568567 self .memoizer .checkpoint (tasks = [task_record ])
569568 elif self .checkpoint_mode in ('manual' , 'periodic' , 'dfk_exit' ):
570569 with self ._modify_checkpointable_tasks_lock :
571- self .checkpointable_tasks .append (task_record )
570+ self .memoizer . checkpointable_tasks .append (task_record )
572571 elif self .checkpoint_mode is None :
573572 pass
574573 else :
@@ -1206,7 +1205,7 @@ def cleanup(self) -> None:
12061205
12071206 # TODO: accesses to self.checkpointable_tasks should happen
12081207 # under a lock?
1209- self .memoizer .checkpoint (self .checkpointable_tasks )
1208+ self .memoizer .checkpoint (self .memoizer . checkpointable_tasks )
12101209
12111210 if self ._checkpoint_timer :
12121211 logger .info ("Stopping checkpoint timer" )
@@ -1270,8 +1269,8 @@ def cleanup(self) -> None:
12701269
12711270 def checkpoint (self ) -> None :
12721271 with self ._modify_checkpointable_tasks_lock :
1273- self .memoizer .checkpoint (self .checkpointable_tasks )
1274- self .checkpointable_tasks = []
1272+ self .memoizer .checkpoint (self .memoizer . checkpointable_tasks )
1273+ self .memoizer . checkpointable_tasks = []
12751274
12761275 @staticmethod
12771276 def _log_std_streams (task_record : TaskRecord ) -> None :
0 commit comments