Skip to content

Commit 76be21a

Browse files
committed
checkpoint exceptions
1 parent 1c114ac commit 76be21a

File tree

2 files changed

+89
-22
lines changed

2 files changed

+89
-22
lines changed

parsl/dataflow/memoization.py

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -388,8 +388,12 @@ def _load_checkpoints(self, checkpointDirs: Sequence[str]) -> Dict[str, Future[A
388388
data = pickle.load(f)
389389
# Copy and hash only the input attributes
390390
memo_fu: Future = Future()
391-
assert data['exception'] is None
392-
memo_fu.set_result(data['result'])
391+
392+
if data['exception'] is None:
393+
memo_fu.set_result(data['result'])
394+
else:
395+
assert data['result'] is None
396+
memo_fu.set_exception(data['exception'])
393397
memo_lookup_table[data['hash']] = memo_fu
394398

395399
except EOFError:
@@ -472,20 +476,22 @@ def checkpoint(self, *, task: Optional[TaskRecord] = None, exception: Optional[B
472476
# TODO: refactor with below
473477

474478
task_id = task['id']
475-
476-
if exception is None:
477-
hashsum = task['hashsum']
478-
if not hashsum:
479-
pass # TODO: log an error? see below discussion
480-
else:
479+
hashsum = task['hashsum']
480+
if not hashsum:
481+
pass # TODO: log an error? see below discussion
482+
else:
483+
if exception is None and self.filter_result_for_checkpoint(result):
481484
t = {'hash': hashsum, 'exception': None, 'result': result}
482-
483-
# We are using pickle here since pickle dumps to a file in 'ab'
484-
# mode behave like a incremental log.
485485
pickle.dump(t, f)
486486
count += 1
487-
488-
logger.debug("Task {} checkpointed".format(task_id))
487+
logger.debug("Task {} checkpointed result".format(task_id))
488+
elif exception is not None and self.filter_exception_for_checkpoint(exception):
489+
t = {'hash': hashsum, 'exception': exception, 'result': None}
490+
pickle.dump(t, f)
491+
count += 1
492+
logger.debug("Task {} checkpointed exception".format(task_id))
493+
else:
494+
pass # no checkpoint - maybe debug log? TODO
489495
else:
490496
checkpoint_queue = self.checkpointable_tasks
491497

@@ -496,18 +502,22 @@ def checkpoint(self, *, task: Optional[TaskRecord] = None, exception: Optional[B
496502

497503
assert app_fu.done(), "trying to checkpoint a task that is not done"
498504

499-
if app_fu.done() and app_fu.exception() is None:
500-
hashsum = task_record['hashsum']
501-
if not hashsum:
502-
continue # TODO: log an error? maybe some tasks don't have hashsums legitimately?
503-
t = {'hash': hashsum, 'exception': None, 'result': app_fu.result()}
505+
hashsum = task_record['hashsum']
506+
if not hashsum:
507+
continue # TODO: log an error? maybe some tasks don't have hashsums legitimately?
504508

505-
# We are using pickle here since pickle dumps to a file in 'ab'
506-
# mode behave like a incremental log.
509+
if app_fu.exception() is None and self.filter_result_for_checkpoint(app_fu.result()):
510+
t = {'hash': hashsum, 'exception': None, 'result': app_fu.result()}
507511
pickle.dump(t, f)
508512
count += 1
509-
510-
logger.debug("Task {} checkpointed".format(task_id))
513+
logger.debug("Task {} checkpointed result".format(task_id))
514+
elif (e := app_fu.exception()) is not None and self.filter_exception_for_checkpoint(e):
515+
t = {'hash': hashsum, 'exception': app_fu.exception(), 'result': None}
516+
pickle.dump(t, f)
517+
count += 1
518+
logger.debug("Task {} checkpointed exception".format(task_id))
519+
else:
520+
pass # TODO: maybe log at debug level
511521

512522
self.checkpointed_tasks += count
513523

@@ -525,3 +535,11 @@ def checkpoint(self, *, task: Optional[TaskRecord] = None, exception: Optional[B
525535
# Or maybe a failure of iteration if the list is appended to while checkpointing is happening?
526536
if not task:
527537
self.checkpointable_tasks = []
538+
539+
def filter_result_for_checkpoint(self, result: Any) -> bool:
540+
"""Overridable method to decide if an task that ended with a successful result should be checkpointed"""
541+
return True
542+
543+
def filter_exception_for_checkpoint(self, exception: BaseException) -> bool:
544+
"""Overridable method to decide if an entry that ended with an exception should be checkpointed"""
545+
return False
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import contextlib
2+
import os
3+
4+
import pytest
5+
6+
import parsl
7+
from parsl import python_app
8+
from parsl.config import Config
9+
from parsl.dataflow.memoization import BasicMemoizer
10+
from parsl.executors.threads import ThreadPoolExecutor
11+
12+
13+
class CheckpointExceptionsMemoizer(BasicMemoizer):
14+
def filter_exception_for_checkpoint(self, ex):
15+
# TODO: this used to be the case, but in moving to results-only mode,
16+
# the task record is lost. Maybe it's useful to pass it in? What
17+
# are the use cases for this deciding function?
18+
# task record is available from app_fu.task_record
19+
# assert app_fu.task_record is not None
20+
21+
# override the default always-False, to be always-True
22+
return True
23+
24+
25+
def fresh_config(run_dir, memoizer):
26+
return Config(
27+
memoizer=memoizer,
28+
run_dir=str(run_dir)
29+
)
30+
31+
32+
@python_app(cache=True)
33+
def uuid_app():
34+
import uuid
35+
raise RuntimeError(str(uuid.uuid4()))
36+
37+
38+
@pytest.mark.local
39+
def test_loading_checkpoint(tmpd_cwd):
40+
"""Load memoization table from previous checkpoint
41+
"""
42+
with parsl.load(fresh_config(tmpd_cwd, CheckpointExceptionsMemoizer(checkpoint_mode="task_exit"))):
43+
checkpoint_files = [os.path.join(parsl.dfk().run_dir, "checkpoint")]
44+
result = uuid_app().exception()
45+
46+
with parsl.load(fresh_config(tmpd_cwd, CheckpointExceptionsMemoizer(checkpoint_files=checkpoint_files))):
47+
relaunched = uuid_app().exception()
48+
49+
assert result.args == relaunched.args, "Expected following call to uuid_app to return cached uuid in exception"

0 commit comments

Comments
 (0)