Skip to content

Commit 58c3e5c

Browse files
committed
make memoizer into an interface class and impls
by this time, all the checkpointing and memoization code bother needs to be moved out of the DFK
1 parent e509906 commit 58c3e5c

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

parsl/dataflow/dflow.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from parsl.dataflow.dependency_resolvers import SHALLOW_DEPENDENCY_RESOLVER
3030
from parsl.dataflow.errors import DependencyError, JoinError
3131
from parsl.dataflow.futures import AppFuture
32-
from parsl.dataflow.memoization import Memoizer
32+
from parsl.dataflow.memoization import BasicMemoizer, Memoizer
3333
from parsl.dataflow.rundirs import make_rundir
3434
from parsl.dataflow.states import FINAL_FAILURE_STATES, FINAL_STATES, States
3535
from parsl.dataflow.taskrecord import TaskRecord
@@ -167,11 +167,12 @@ def __init__(self, config: Config) -> None:
167167

168168
# TODO: the parameters that remain here should be parameters that are going to be configured by
169169
# the user as part of checkpoint/memo configuration object.
170-
self.memoizer = Memoizer(memoize=config.app_cache,
171-
checkpoint_mode=config.checkpoint_mode,
172-
checkpoint_files=config.checkpoint_files,
173-
checkpoint_period=config.checkpoint_period)
170+
self.memoizer: Memoizer = BasicMemoizer(memoize=config.app_cache,
171+
checkpoint_mode=config.checkpoint_mode,
172+
checkpoint_files=config.checkpoint_files,
173+
checkpoint_period=config.checkpoint_period)
174174
self.memoizer.run_dir = self.run_dir
175+
175176
self.memoizer.start()
176177

177178
self._modify_checkpointable_tasks_lock = threading.Lock()
@@ -1234,6 +1235,10 @@ def cleanup(self) -> None:
12341235
# should still see it.
12351236
logger.info("DFK cleanup complete")
12361237

1238+
# TODO: this should maybe go away: manual explicit checkponting is
1239+
# a property of the (upcoming) BasicMemoizer, not of a memoisation
1240+
# plugin in general -- configure a BasicMemoizer separately from the
1241+
# DFK and call checkpoint on that...
12371242
def checkpoint(self) -> None:
12381243
with self._modify_checkpointable_tasks_lock:
12391244
self.memoizer.checkpoint()

parsl/dataflow/memoization.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,23 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
119119

120120

121121
class Memoizer:
122+
def update_memo_exception(self, task: TaskRecord, e: BaseException) -> None:
123+
raise NotImplementedError
124+
125+
def update_memo_result(self, task: TaskRecord, r: Any) -> None:
126+
raise NotImplementedError
127+
128+
def checkpoint(self, *, task: Optional[TaskRecord] = None) -> None:
129+
raise NotImplementedError
130+
131+
def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
132+
raise NotImplementedError
133+
134+
def close(self) -> None:
135+
raise NotImplementedError
136+
137+
138+
class BasicMemoizer(Memoizer):
122139
"""Memoizer is responsible for ensuring that identical work is not repeated.
123140
124141
When a task is repeated, i.e., the same function is called with the same exact arguments, the

0 commit comments

Comments
 (0)