@@ -171,6 +171,8 @@ def __init__(self, config: Config) -> None:
171171 self .checkpointed_tasks = 0
172172 self ._checkpoint_timer = None
173173 self .checkpoint_mode = config .checkpoint_mode
174+
175+ self ._modify_checkpointable_tasks_lock = threading .Lock ()
174176 self .checkpointable_tasks : List [TaskRecord ] = []
175177
176178 # this must be set before executors are added since add_executors calls
@@ -195,7 +197,7 @@ def __init__(self, config: Config) -> None:
195197 except Exception :
196198 raise ConfigurationError ("invalid checkpoint_period provided: {0} expected HH:MM:SS" .format (config .checkpoint_period ))
197199 checkpoint_period = (h * 3600 ) + (m * 60 ) + s
198- self ._checkpoint_timer = Timer (self .checkpoint , interval = checkpoint_period , name = "Checkpoint" )
200+ self ._checkpoint_timer = Timer (self .invoke_checkpoint , interval = checkpoint_period , name = "Checkpoint" )
199201
200202 self .task_count = 0
201203 self .tasks : Dict [int , TaskRecord ] = {}
@@ -560,7 +562,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
560562 if self .checkpoint_mode == 'task_exit' :
561563 self .checkpoint (tasks = [task_record ])
562564 elif self .checkpoint_mode in ('manual' , 'periodic' , 'dfk_exit' ):
563- with self .checkpoint_lock :
565+ with self ._modify_checkpointable_tasks_lock :
564566 self .checkpointable_tasks .append (task_record )
565567 elif self .checkpoint_mode is None :
566568 pass
@@ -1193,7 +1195,10 @@ def cleanup(self) -> None:
11931195 # Checkpointing takes priority over the rest of the tasks
11941196 # checkpoint if any valid checkpoint method is specified
11951197 if self .checkpoint_mode is not None :
1196- self .checkpoint ()
1198+
1199+ # TODO: accesses to self.checkpointable_tasks should happen
1200+ # under a lock?
1201+ self .checkpoint (self .checkpointable_tasks )
11971202
11981203 if self ._checkpoint_timer :
11991204 logger .info ("Stopping checkpoint timer" )
@@ -1247,7 +1252,12 @@ def cleanup(self) -> None:
12471252
12481253 logger .info ("DFK cleanup complete" )
12491254
1250- def checkpoint (self , tasks : Optional [Sequence [TaskRecord ]] = None ) -> None :
1255+ def invoke_checkpoint (self ) -> None :
1256+ with self ._modify_checkpointable_tasks_lock :
1257+ self .checkpoint (self .checkpointable_tasks )
1258+ self .checkpointable_tasks = []
1259+
1260+ def checkpoint (self , tasks : Sequence [TaskRecord ]) -> None :
12511261 """Checkpoint the dfk incrementally to a checkpoint file.
12521262
12531263 When called, every task that has been completed yet not
@@ -1266,11 +1276,7 @@ def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> None:
12661276 run under RUNDIR/checkpoints/tasks.pkl
12671277 """
12681278 with self .checkpoint_lock :
1269- if tasks :
1270- checkpoint_queue = tasks
1271- else :
1272- checkpoint_queue = self .checkpointable_tasks
1273- self .checkpointable_tasks = []
1279+ checkpoint_queue = tasks
12741280
12751281 checkpoint_dir = '{0}/checkpoint' .format (self .run_dir )
12761282 checkpoint_tasks = checkpoint_dir + '/tasks.pkl'
0 commit comments