Skip to content

Commit f58f27b

Browse files
committed
make memoizer into an interface class and impls
by this time, all the checkpointing and memoization code bother needs to be moved out of the DFK
1 parent 507f778 commit f58f27b

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

parsl/dataflow/dflow.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from parsl.dataflow.dependency_resolvers import SHALLOW_DEPENDENCY_RESOLVER
3030
from parsl.dataflow.errors import DependencyError, JoinError
3131
from parsl.dataflow.futures import AppFuture
32-
from parsl.dataflow.memoization import Memoizer
32+
from parsl.dataflow.memoization import BasicMemoizer, Memoizer
3333
from parsl.dataflow.rundirs import make_rundir
3434
from parsl.dataflow.states import FINAL_FAILURE_STATES, FINAL_STATES, States
3535
from parsl.dataflow.taskrecord import TaskRecord
@@ -165,11 +165,12 @@ def __init__(self, config: Config) -> None:
165165
self.monitoring_radio.send((MessageType.WORKFLOW_INFO,
166166
workflow_info))
167167

168-
self.memoizer = Memoizer(memoize=config.app_cache,
169-
checkpoint_mode=config.checkpoint_mode,
170-
checkpoint_files=config.checkpoint_files,
171-
checkpoint_period=config.checkpoint_period)
168+
self.memoizer: Memoizer = BasicMemoizer(memoize=config.app_cache,
169+
checkpoint_mode=config.checkpoint_mode,
170+
checkpoint_files=config.checkpoint_files,
171+
checkpoint_period=config.checkpoint_period)
172172
self.memoizer.run_dir = self.run_dir
173+
173174
self.memoizer.start()
174175

175176
# this must be set before executors are added since add_executors calls
@@ -1205,6 +1206,10 @@ def cleanup(self) -> None:
12051206
# should still see it.
12061207
logger.info("DFK cleanup complete")
12071208

1209+
# TODO: this should maybe go away: manual explicit checkponting is
1210+
# a property of the (upcoming) BasicMemoizer, not of a memoisation
1211+
# plugin in general -- configure a BasicMemoizer separately from the
1212+
# DFK and call checkpoint on that...
12081213
def checkpoint(self) -> None:
12091214
self.memoizer.checkpoint()
12101215

parsl/dataflow/memoization.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,23 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
119119

120120

121121
class Memoizer:
122+
def update_memo_exception(self, task: TaskRecord, e: BaseException) -> None:
123+
raise NotImplementedError
124+
125+
def update_memo_result(self, task: TaskRecord, r: Any) -> None:
126+
raise NotImplementedError
127+
128+
def checkpoint(self, *, task: Optional[TaskRecord] = None) -> None:
129+
raise NotImplementedError
130+
131+
def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]:
132+
raise NotImplementedError
133+
134+
def close(self) -> None:
135+
raise NotImplementedError
136+
137+
138+
class BasicMemoizer(Memoizer):
122139
"""Memoizer is responsible for ensuring that identical work is not repeated.
123140
124141
When a task is repeated, i.e., the same function is called with the same exact arguments, the

0 commit comments

Comments
 (0)