Skip to content

Commit 6562fd3

Browse files
committed
naively move checkpointable tasks list into memoizer, with accesses still happening from the DFK
1 parent 31236dc commit 6562fd3

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

parsl/dataflow/dflow.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ def __init__(self, config: Config) -> None:
175175
self._checkpoint_timer = None
176176
self.checkpoint_mode = config.checkpoint_mode
177177
self._modify_checkpointable_tasks_lock = threading.Lock()
178-
self.checkpointable_tasks: List[TaskRecord] = []
179178

180179
# this must be set before executors are added since add_executors calls
181180
# job_status_poller.add_executors.
@@ -568,7 +567,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
568567
self.memoizer.checkpoint(tasks=[task_record])
569568
elif self.checkpoint_mode in ('manual', 'periodic', 'dfk_exit'):
570569
with self._modify_checkpointable_tasks_lock:
571-
self.checkpointable_tasks.append(task_record)
570+
self.memoizer.checkpointable_tasks.append(task_record)
572571
elif self.checkpoint_mode is None:
573572
pass
574573
else:
@@ -1206,7 +1205,7 @@ def cleanup(self) -> None:
12061205

12071206
# TODO: accesses to self.checkpointable_tasks should happen
12081207
# under a lock?
1209-
self.memoizer.checkpoint(self.checkpointable_tasks)
1208+
self.memoizer.checkpoint(self.memoizer.checkpointable_tasks)
12101209

12111210
if self._checkpoint_timer:
12121211
logger.info("Stopping checkpoint timer")
@@ -1270,8 +1269,8 @@ def cleanup(self) -> None:
12701269

12711270
def checkpoint(self) -> None:
12721271
with self._modify_checkpointable_tasks_lock:
1273-
self.memoizer.checkpoint(self.checkpointable_tasks)
1274-
self.checkpointable_tasks = []
1272+
self.memoizer.checkpoint(self.memoizer.checkpointable_tasks)
1273+
self.memoizer.checkpointable_tasks = []
12751274

12761275
@staticmethod
12771276
def _log_std_streams(task_record: TaskRecord) -> None:

parsl/dataflow/memoization.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ def __init__(self, *,
169169
self.checkpoint_files = checkpoint_files
170170
self.checkpoint_mode = checkpoint_mode
171171

172+
self.checkpointable_tasks: List[TaskRecord] = []
173+
172174
def start(self) -> None:
173175
if self.checkpoint_files is not None:
174176
checkpoint_files = self.checkpoint_files

0 commit comments

Comments
 (0)