@@ -176,6 +176,8 @@ def __init__(self, config: Config) -> None:
176176 self .checkpointed_tasks = 0
177177 self ._checkpoint_timer = None
178178 self .checkpoint_mode = config .checkpoint_mode
179+
180+ self ._modify_checkpointable_tasks_lock = threading .Lock ()
179181 self .checkpointable_tasks : List [TaskRecord ] = []
180182
181183 # this must be set before executors are added since add_executors calls
@@ -200,7 +202,7 @@ def __init__(self, config: Config) -> None:
200202 except Exception :
201203 raise ConfigurationError ("invalid checkpoint_period provided: {0} expected HH:MM:SS" .format (config .checkpoint_period ))
202204 checkpoint_period = (h * 3600 ) + (m * 60 ) + s
203- self ._checkpoint_timer = Timer (self .checkpoint , interval = checkpoint_period , name = "Checkpoint" )
205+ self ._checkpoint_timer = Timer (self .invoke_checkpoint , interval = checkpoint_period , name = "Checkpoint" )
204206
205207 self .task_count = 0
206208 self .tasks : Dict [int , TaskRecord ] = {}
@@ -571,7 +573,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
571573 if self .checkpoint_mode == 'task_exit' :
572574 self .checkpoint (tasks = [task_record ])
573575 elif self .checkpoint_mode in ('manual' , 'periodic' , 'dfk_exit' ):
574- with self .checkpoint_lock :
576+ with self ._modify_checkpointable_tasks_lock :
575577 self .checkpointable_tasks .append (task_record )
576578 elif self .checkpoint_mode is None :
577579 pass
@@ -1250,7 +1252,10 @@ def cleanup(self) -> None:
12501252 # Checkpointing takes priority over the rest of the tasks
12511253 # checkpoint if any valid checkpoint method is specified
12521254 if self .checkpoint_mode is not None :
1253- self .checkpoint ()
1255+
1256+ # TODO: accesses to self.checkpointable_tasks should happen
1257+ # under a lock?
1258+ self .checkpoint (self .checkpointable_tasks )
12541259
12551260 if self ._checkpoint_timer :
12561261 logger .info ("Stopping checkpoint timer" )
@@ -1323,7 +1328,13 @@ def cleanup(self) -> None:
13231328
13241329 logger .info ("DFK cleanup complete" )
13251330
1326- def checkpoint (self , tasks : Optional [Sequence [TaskRecord ]] = None ) -> str :
1331+ def invoke_checkpoint (self ):
1332+ with self ._modify_checkpointable_tasks_lock :
1333+ r = self .checkpoint (self .checkpointable_tasks )
1334+ self .checkpointable_tasks = []
1335+ return r
1336+
1337+ def checkpoint (self , tasks : Sequence [TaskRecord ]) -> str :
13271338 """Checkpoint the dfk incrementally to a checkpoint file.
13281339
13291340 When called, every task that has been completed yet not
@@ -1342,11 +1353,7 @@ def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str:
13421353 run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}
13431354 """
13441355 with self .checkpoint_lock :
1345- if tasks :
1346- checkpoint_queue = tasks
1347- else :
1348- checkpoint_queue = self .checkpointable_tasks
1349- self .checkpointable_tasks = []
1356+ checkpoint_queue = tasks
13501357
13511358 checkpoint_dir = '{0}/checkpoint' .format (self .run_dir )
13521359 checkpoint_dfk = checkpoint_dir + '/dfk.pkl'
0 commit comments