Skip to content

Commit c5e8d4e

Browse files
committed
dev
1 parent 67e6a52 commit c5e8d4e

File tree

2 files changed

+78
-73
lines changed

2 files changed

+78
-73
lines changed

parsl/dataflow/dflow.py

Lines changed: 5 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import logging
88
import os
99
import pathlib
10-
import pickle
1110
import random
1211
import sys
1312
import threading
@@ -99,8 +98,6 @@ def __init__(self, config: Config) -> None:
9998

10099
logger.info("Parsl version: {}".format(get_version()))
101100

102-
self.checkpoint_lock = threading.Lock()
103-
104101
self.usage_tracker = UsageTracker(self)
105102
self.usage_tracker.send_start_message()
106103

@@ -173,7 +170,8 @@ def __init__(self, config: Config) -> None:
173170
checkpoint_files = []
174171

175172
self.memoizer = Memoizer(self, memoize=config.app_cache, checkpoint_files=checkpoint_files)
176-
self.checkpointed_tasks = 0
173+
self.memoizer.run_dir = self.run_dir
174+
177175
self._checkpoint_timer = None
178176
self.checkpoint_mode = config.checkpoint_mode
179177

@@ -571,7 +569,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
571569
# Do we need to checkpoint now, or queue for later,
572570
# or do nothing?
573571
if self.checkpoint_mode == 'task_exit':
574-
self.checkpoint(tasks=[task_record])
572+
self.memoizer.checkpoint(tasks=[task_record])
575573
elif self.checkpoint_mode in ('manual', 'periodic', 'dfk_exit'):
576574
with self._modify_checkpointable_tasks_lock:
577575
self.checkpointable_tasks.append(task_record)
@@ -1255,7 +1253,7 @@ def cleanup(self) -> None:
12551253

12561254
# TODO: accesses to self.checkpointable_tasks should happen
12571255
# under a lock?
1258-
self.checkpoint(self.checkpointable_tasks)
1256+
self.memoizer.checkpoint(self.checkpointable_tasks)
12591257

12601258
if self._checkpoint_timer:
12611259
logger.info("Stopping checkpoint timer")
@@ -1330,76 +1328,10 @@ def cleanup(self) -> None:
13301328

13311329
def invoke_checkpoint(self):
13321330
with self._modify_checkpointable_tasks_lock:
1333-
r = self.checkpoint(self.checkpointable_tasks)
1331+
r = self.memoizer.checkpoint(self.checkpointable_tasks)
13341332
self.checkpointable_tasks = []
13351333
return r
13361334

1337-
def checkpoint(self, tasks: Sequence[TaskRecord]) -> str:
1338-
"""Checkpoint the dfk incrementally to a checkpoint file.
1339-
1340-
When called, every task that has been completed yet not
1341-
checkpointed is checkpointed to a file.
1342-
1343-
Kwargs:
1344-
- tasks (List of task records) : List of task ids to checkpoint. Default=None
1345-
if set to None, we iterate over all tasks held by the DFK.
1346-
1347-
.. note::
1348-
Checkpointing only works if memoization is enabled
1349-
1350-
Returns:
1351-
Checkpoint dir if checkpoints were written successfully.
1352-
By default the checkpoints are written to the RUNDIR of the current
1353-
run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}
1354-
"""
1355-
with self.checkpoint_lock:
1356-
checkpoint_queue = tasks
1357-
1358-
checkpoint_dir = '{0}/checkpoint'.format(self.run_dir)
1359-
checkpoint_dfk = checkpoint_dir + '/dfk.pkl'
1360-
checkpoint_tasks = checkpoint_dir + '/tasks.pkl'
1361-
1362-
if not os.path.exists(checkpoint_dir):
1363-
os.makedirs(checkpoint_dir, exist_ok=True)
1364-
1365-
with open(checkpoint_dfk, 'wb') as f:
1366-
state = {'rundir': self.run_dir,
1367-
'task_count': self.task_count
1368-
}
1369-
pickle.dump(state, f)
1370-
1371-
count = 0
1372-
1373-
with open(checkpoint_tasks, 'ab') as f:
1374-
for task_record in checkpoint_queue:
1375-
task_id = task_record['id']
1376-
1377-
app_fu = task_record['app_fu']
1378-
1379-
if app_fu.done() and app_fu.exception() is None:
1380-
hashsum = task_record['hashsum']
1381-
if not hashsum:
1382-
continue
1383-
t = {'hash': hashsum, 'exception': None, 'result': app_fu.result()}
1384-
1385-
# We are using pickle here since pickle dumps to a file in 'ab'
1386-
# mode behave like a incremental log.
1387-
pickle.dump(t, f)
1388-
count += 1
1389-
logger.debug("Task {} checkpointed".format(task_id))
1390-
1391-
self.checkpointed_tasks += count
1392-
1393-
if count == 0:
1394-
if self.checkpointed_tasks == 0:
1395-
logger.warning("No tasks checkpointed so far in this run. Please ensure caching is enabled")
1396-
else:
1397-
logger.debug("No tasks checkpointed in this pass.")
1398-
else:
1399-
logger.info("Done checkpointing {} tasks".format(count))
1400-
1401-
return checkpoint_dir
1402-
14031335
@staticmethod
14041336
def _log_std_streams(task_record: TaskRecord) -> None:
14051337
tid = task_record['id']

parsl/dataflow/memoization.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import os
66
import pickle
7+
import threading
78
from functools import lru_cache, singledispatch
89
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
910

@@ -150,6 +151,8 @@ class Memoizer:
150151
151152
"""
152153

154+
run_dir: str
155+
153156
def __init__(self, dfk: DataFlowKernel, *, memoize: bool = True, checkpoint_files: Sequence[str]):
154157
"""Initialize the memoizer.
155158
@@ -163,6 +166,10 @@ def __init__(self, dfk: DataFlowKernel, *, memoize: bool = True, checkpoint_file
163166
self.dfk = dfk
164167
self.memoize = memoize
165168

169+
self.checkpointed_tasks = 0
170+
171+
self.checkpoint_lock = threading.Lock()
172+
166173
# TODO: we always load checkpoints even if we then discard them...
167174
# this is more obvious here, less obvious in previous Parsl...
168175
checkpoint = self.load_checkpoints(checkpoint_files)
@@ -350,3 +357,69 @@ def load_checkpoints(self, checkpointDirs: Optional[Sequence[str]]) -> Dict[str,
350357
return self._load_checkpoints(checkpointDirs)
351358
else:
352359
return {}
360+
361+
def checkpoint(self, tasks: Sequence[TaskRecord]) -> str:
362+
"""Checkpoint the dfk incrementally to a checkpoint file.
363+
364+
When called, every task that has been completed yet not
365+
checkpointed is checkpointed to a file.
366+
367+
Kwargs:
368+
- tasks (List of task records) : List of task ids to checkpoint. Default=None
369+
if set to None, we iterate over all tasks held by the DFK.
370+
371+
.. note::
372+
Checkpointing only works if memoization is enabled
373+
374+
Returns:
375+
Checkpoint dir if checkpoints were written successfully.
376+
By default the checkpoints are written to the RUNDIR of the current
377+
run under RUNDIR/checkpoints/{tasks.pkl, dfk.pkl}
378+
"""
379+
with self.checkpoint_lock:
380+
checkpoint_queue = tasks
381+
382+
checkpoint_dir = '{0}/checkpoint'.format(self.run_dir)
383+
checkpoint_dfk = checkpoint_dir + '/dfk.pkl'
384+
checkpoint_tasks = checkpoint_dir + '/tasks.pkl'
385+
386+
if not os.path.exists(checkpoint_dir):
387+
os.makedirs(checkpoint_dir, exist_ok=True)
388+
389+
with open(checkpoint_dfk, 'wb') as f:
390+
state = {'rundir': self.run_dir,
391+
# TODO: this isn't relevant to checkpointing? 'task_count': self.task_count
392+
}
393+
pickle.dump(state, f)
394+
395+
count = 0
396+
397+
with open(checkpoint_tasks, 'ab') as f:
398+
for task_record in checkpoint_queue:
399+
task_id = task_record['id']
400+
401+
app_fu = task_record['app_fu']
402+
403+
if app_fu.done() and app_fu.exception() is None:
404+
hashsum = task_record['hashsum']
405+
if not hashsum:
406+
continue
407+
t = {'hash': hashsum, 'exception': None, 'result': app_fu.result()}
408+
409+
# We are using pickle here since pickle dumps to a file in 'ab'
410+
# mode behave like a incremental log.
411+
pickle.dump(t, f)
412+
count += 1
413+
logger.debug("Task {} checkpointed".format(task_id))
414+
415+
self.checkpointed_tasks += count
416+
417+
if count == 0:
418+
if self.checkpointed_tasks == 0:
419+
logger.warning("No tasks checkpointed so far in this run. Please ensure caching is enabled")
420+
else:
421+
logger.debug("No tasks checkpointed in this pass.")
422+
else:
423+
logger.info("Done checkpointing {} tasks".format(count))
424+
425+
return checkpoint_dir

0 commit comments

Comments
 (0)