@@ -180,6 +180,8 @@ def __init__(self, config: Config) -> None:
180180 self .checkpointed_tasks = 0
181181 self ._checkpoint_timer = None
182182 self .checkpoint_mode = config .checkpoint_mode
183+
184+ self ._modify_checkpointable_tasks_lock = threading .Lock ()
183185 self .checkpointable_tasks : List [TaskRecord ] = []
184186
185187 # this must be set before executors are added since add_executors calls
@@ -204,7 +206,7 @@ def __init__(self, config: Config) -> None:
204206 except Exception :
205207 raise ConfigurationError ("invalid checkpoint_period provided: {0} expected HH:MM:SS" .format (config .checkpoint_period ))
206208 checkpoint_period = (h * 3600 ) + (m * 60 ) + s
207- self ._checkpoint_timer = Timer (self .checkpoint , interval = checkpoint_period , name = "Checkpoint" )
209+ self ._checkpoint_timer = Timer (self .invoke_checkpoint , interval = checkpoint_period , name = "Checkpoint" )
208210
209211 self .task_count = 0
210212 self .tasks : Dict [int , TaskRecord ] = {}
@@ -575,7 +577,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
575577 if self .checkpoint_mode == 'task_exit' :
576578 self .checkpoint (tasks = [task_record ])
577579 elif self .checkpoint_mode in ('manual' , 'periodic' , 'dfk_exit' ):
578- with self .checkpoint_lock :
580+ with self ._modify_checkpointable_tasks_lock :
579581 self .checkpointable_tasks .append (task_record )
580582 elif self .checkpoint_mode is None :
581583 pass
@@ -1254,7 +1256,10 @@ def cleanup(self) -> None:
12541256 # Checkpointing takes priority over the rest of the tasks
12551257 # checkpoint if any valid checkpoint method is specified
12561258 if self .checkpoint_mode is not None :
1257- self .checkpoint ()
1259+
1260+ # TODO: accesses to self.checkpointable_tasks should happen
1261+ # under a lock?
1262+ self .checkpoint (self .checkpointable_tasks )
12581263
12591264 if self ._checkpoint_timer :
12601265 logger .info ("Stopping checkpoint timer" )
@@ -1327,7 +1332,13 @@ def cleanup(self) -> None:
13271332
13281333 logger .info ("DFK cleanup complete" )
13291334
1330- def checkpoint (self , tasks : Optional [Sequence [TaskRecord ]] = None ) -> str :
1335+ def invoke_checkpoint (self ):
1336+ with self ._modify_checkpointable_tasks_lock :
1337+ r = self .checkpoint (self .checkpointable_tasks )
1338+ self .checkpointable_tasks = []
1339+ return r
1340+
1341+ def checkpoint (self , tasks : Sequence [TaskRecord ]) -> str :
13311342 """Checkpoint the dfk incrementally to a checkpoint file.
13321343
13331344 When called, every task that has been completed yet not
@@ -1346,11 +1357,7 @@ def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str:
13461357 run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}
13471358 """
13481359 with self .checkpoint_lock :
1349- if tasks :
1350- checkpoint_queue = tasks
1351- else :
1352- checkpoint_queue = self .checkpointable_tasks
1353- self .checkpointable_tasks = []
1360+ checkpoint_queue = tasks
13541361
13551362 checkpoint_dir = '{0}/checkpoint' .format (self .run_dir )
13561363 checkpoint_dfk = checkpoint_dir + '/dfk.pkl'
0 commit comments