Skip to content

Commit bb5e882

Browse files
committed
checkpoint exceptions
1 parent e5d1a60 commit bb5e882

File tree

2 files changed

+89
-22
lines changed

2 files changed

+89
-22
lines changed

parsl/dataflow/memoization.py

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,12 @@ def _load_checkpoints(self, checkpointDirs: Sequence[str]) -> Dict[str, Future[A
386386
data = pickle.load(f)
387387
# Copy and hash only the input attributes
388388
memo_fu: Future = Future()
389-
assert data['exception'] is None
390-
memo_fu.set_result(data['result'])
389+
390+
if data['exception'] is None:
391+
memo_fu.set_result(data['result'])
392+
else:
393+
assert data['result'] is None
394+
memo_fu.set_exception(data['exception'])
391395
memo_lookup_table[data['hash']] = memo_fu
392396

393397
except EOFError:
@@ -470,20 +474,22 @@ def checkpoint(self, *, task: Optional[TaskRecord] = None, exception: Optional[B
470474
# TODO: refactor with below
471475

472476
task_id = task['id']
473-
474-
if exception is None:
475-
hashsum = task['hashsum']
476-
if not hashsum:
477-
pass # TODO: log an error? see below discussion
478-
else:
477+
hashsum = task['hashsum']
478+
if not hashsum:
479+
pass # TODO: log an error? see below discussion
480+
else:
481+
if exception is None and self.filter_result_for_checkpoint(result):
479482
t = {'hash': hashsum, 'exception': None, 'result': result}
480-
481-
# We are using pickle here since pickle dumps to a file in 'ab'
482-
# mode behave like a incremental log.
483483
pickle.dump(t, f)
484484
count += 1
485-
486-
logger.debug("Task {} checkpointed".format(task_id))
485+
logger.debug("Task {} checkpointed result".format(task_id))
486+
elif exception is not None and self.filter_exception_for_checkpoint(exception):
487+
t = {'hash': hashsum, 'exception': exception, 'result': None}
488+
pickle.dump(t, f)
489+
count += 1
490+
logger.debug("Task {} checkpointed exception".format(task_id))
491+
else:
492+
pass # no checkpoint - maybe debug log? TODO
487493
else:
488494
checkpoint_queue = self.checkpointable_tasks
489495

@@ -494,18 +500,22 @@ def checkpoint(self, *, task: Optional[TaskRecord] = None, exception: Optional[B
494500

495501
assert app_fu.done(), "trying to checkpoint a task that is not done"
496502

497-
if app_fu.done() and app_fu.exception() is None:
498-
hashsum = task_record['hashsum']
499-
if not hashsum:
500-
continue # TODO: log an error? maybe some tasks don't have hashsums legitimately?
501-
t = {'hash': hashsum, 'exception': None, 'result': app_fu.result()}
503+
hashsum = task_record['hashsum']
504+
if not hashsum:
505+
continue # TODO: log an error? maybe some tasks don't have hashsums legitimately?
502506

503-
# We are using pickle here since pickle dumps to a file in 'ab'
504-
# mode behave like a incremental log.
507+
if app_fu.exception() is None and self.filter_result_for_checkpoint(app_fu.result()):
508+
t = {'hash': hashsum, 'exception': None, 'result': app_fu.result()}
505509
pickle.dump(t, f)
506510
count += 1
507-
508-
logger.debug("Task {} checkpointed".format(task_id))
511+
logger.debug("Task {} checkpointed result".format(task_id))
512+
elif (e := app_fu.exception()) is not None and self.filter_exception_for_checkpoint(e):
513+
t = {'hash': hashsum, 'exception': app_fu.exception(), 'result': None}
514+
pickle.dump(t, f)
515+
count += 1
516+
logger.debug("Task {} checkpointed exception".format(task_id))
517+
else:
518+
pass # TODO: maybe log at debug level
509519

510520
self.checkpointed_tasks += count
511521

@@ -523,3 +533,11 @@ def checkpoint(self, *, task: Optional[TaskRecord] = None, exception: Optional[B
523533
# Or maybe a failure of iteration if the list is appended to while checkpointing is happening?
524534
if not task:
525535
self.checkpointable_tasks = []
536+
537+
def filter_result_for_checkpoint(self, result: Any) -> bool:
538+
"""Overridable method to decide if an task that ended with a successful result should be checkpointed"""
539+
return True
540+
541+
def filter_exception_for_checkpoint(self, exception: BaseException) -> bool:
542+
"""Overridable method to decide if an entry that ended with an exception should be checkpointed"""
543+
return False
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import contextlib
2+
import os
3+
4+
import pytest
5+
6+
import parsl
7+
from parsl import python_app
8+
from parsl.config import Config
9+
from parsl.dataflow.memoization import BasicMemoizer
10+
from parsl.executors.threads import ThreadPoolExecutor
11+
12+
13+
class CheckpointExceptionsMemoizer(BasicMemoizer):
14+
def filter_exception_for_checkpoint(self, ex):
15+
# TODO: this used to be the case, but in moving to results-only mode,
16+
# the task record is lost. Maybe it's useful to pass it in? What
17+
# are the use cases for this deciding function?
18+
# task record is available from app_fu.task_record
19+
# assert app_fu.task_record is not None
20+
21+
# override the default always-False, to be always-True
22+
return True
23+
24+
25+
def fresh_config(run_dir, memoizer):
26+
return Config(
27+
memoizer=memoizer,
28+
run_dir=str(run_dir)
29+
)
30+
31+
32+
@python_app(cache=True)
33+
def uuid_app():
34+
import uuid
35+
raise RuntimeError(str(uuid.uuid4()))
36+
37+
38+
@pytest.mark.local
39+
def test_loading_checkpoint(tmpd_cwd):
40+
"""Load memoization table from previous checkpoint
41+
"""
42+
with parsl.load(fresh_config(tmpd_cwd, CheckpointExceptionsMemoizer(checkpoint_mode="task_exit"))):
43+
checkpoint_files = [os.path.join(parsl.dfk().run_dir, "checkpoint")]
44+
result = uuid_app().exception()
45+
46+
with parsl.load(fresh_config(tmpd_cwd, CheckpointExceptionsMemoizer(checkpoint_files=checkpoint_files))):
47+
relaunched = uuid_app().exception()
48+
49+
assert result.args == relaunched.args, "Expected following call to uuid_app to return cached uuid in exception"

0 commit comments

Comments
 (0)