Skip to content

Commit 4856e98

Browse files
committed
move checkpointable_tasks list a bit more
1 parent 6562fd3 commit 4856e98

File tree

2 files changed

+17
-13
lines changed

2 files changed

+17
-13
lines changed

parsl/dataflow/dflow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
564564
# Do we need to checkpoint now, or queue for later,
565565
# or do nothing?
566566
if self.checkpoint_mode == 'task_exit':
567-
self.memoizer.checkpoint(tasks=[task_record])
567+
self.memoizer.checkpoint(task=task_record)
568568
elif self.checkpoint_mode in ('manual', 'periodic', 'dfk_exit'):
569569
with self._modify_checkpointable_tasks_lock:
570570
self.memoizer.checkpointable_tasks.append(task_record)
@@ -1205,7 +1205,7 @@ def cleanup(self) -> None:
12051205

12061206
# TODO: accesses to self.checkpointable_tasks should happen
12071207
# under a lock?
1208-
self.memoizer.checkpoint(self.memoizer.checkpointable_tasks)
1208+
self.memoizer.checkpoint()
12091209

12101210
if self._checkpoint_timer:
12111211
logger.info("Stopping checkpoint timer")
@@ -1269,7 +1269,7 @@ def cleanup(self) -> None:
12691269

12701270
def checkpoint(self) -> None:
12711271
with self._modify_checkpointable_tasks_lock:
1272-
self.memoizer.checkpoint(self.memoizer.checkpointable_tasks)
1272+
self.memoizer.checkpoint()
12731273
self.memoizer.checkpointable_tasks = []
12741274

12751275
@staticmethod

parsl/dataflow/memoization.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -351,26 +351,30 @@ def load_checkpoints(self, checkpointDirs: Optional[Sequence[str]]) -> Dict[str,
351351
else:
352352
return {}
353353

354-
def checkpoint(self, tasks: Sequence[TaskRecord]) -> None:
354+
def checkpoint(self, *, task: Optional[TaskRecord] = None) -> None:
355355
"""Checkpoint the dfk incrementally to a checkpoint file.
356356
357-
When called, every task that has been completed yet not
358-
checkpointed is checkpointed to a file.
357+
When called with no argument, all tasks registered in self.checkpointable_tasks
358+
will be checkpointed. When called with a single TaskRecord argument, that task will be
359+
checkpointed.
360+
361+
By default the checkpoints are written to the RUNDIR of the current
362+
run under RUNDIR/checkpoints/tasks.pkl
359363
360364
Kwargs:
361-
- tasks (List of task records) : List of task ids to checkpoint. Default=None
362-
if set to None, we iterate over all tasks held by the DFK.
365+
- task (Optional task records) : A task to checkpoint. Default=None, meaning all
366+
tasks registered for checkpointing.
363367
364368
.. note::
365369
Checkpointing only works if memoization is enabled
366370
367-
Returns:
368-
Checkpoint dir if checkpoints were written successfully.
369-
By default the checkpoints are written to the RUNDIR of the current
370-
run under RUNDIR/checkpoints/tasks.pkl
371371
"""
372372
with self.checkpoint_lock:
373-
checkpoint_queue = tasks
373+
374+
if task:
375+
checkpoint_queue = [task]
376+
else:
377+
checkpoint_queue = self.checkpointable_tasks
374378

375379
checkpoint_dir = '{0}/checkpoint'.format(self.run_dir)
376380
checkpoint_tasks = checkpoint_dir + '/tasks.pkl'

0 commit comments

Comments
 (0)