Skip to content

Commit c674a4a

Browse files
committed
configurable memoizer instance
1 parent ea8fee9 commit c674a4a

File tree

4 files changed

+73
-3
lines changed

4 files changed

+73
-3
lines changed

parsl/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing_extensions import Literal
66

77
from parsl.dataflow.dependency_resolvers import DependencyResolver
8+
from parsl.dataflow.memoization import Memoizer
89
from parsl.dataflow.taskrecord import TaskRecord
910
from parsl.errors import ConfigurationError
1011
from parsl.executors.base import ParslExecutor
@@ -101,6 +102,7 @@ class Config(RepresentationMixin, UsageInformation):
101102
def __init__(self,
102103
executors: Optional[Iterable[ParslExecutor]] = None,
103104
app_cache: bool = True,
105+
memoizer: Optional[Memoizer] = None,
104106
checkpoint_files: Optional[Sequence[str]] = None,
105107
checkpoint_mode: Union[None,
106108
Literal['task_exit'],
@@ -131,6 +133,7 @@ def __init__(self,
131133
self._executors: Sequence[ParslExecutor] = executors
132134
self._validate_executors()
133135

136+
self.memoizer = memoizer
134137
self.app_cache = app_cache
135138
self.checkpoint_files = checkpoint_files
136139
self.checkpoint_mode = checkpoint_mode

parsl/dataflow/dflow.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,16 @@ def __init__(self, config: Config) -> None:
164164
else:
165165
checkpoint_files = []
166166

167-
self.memoizer: Memoizer = BasicMemoizer(self, memoize=config.app_cache, checkpoint_files=checkpoint_files)
168-
self.memoizer.run_dir = self.run_dir
167+
# self.memoizer: Memoizer = BasicMemoizer(self, memoize=config.app_cache, checkpoint_files=checkpoint_files)
168+
# the memoize flag might turn into the user choosing different instances
169+
# of the Memoizer interface
170+
self.memoizer: Memoizer
171+
if config.memoizer is not None:
172+
self.memoizer = config.memoizer
173+
else:
174+
self.memoizer = BasicMemoizer()
169175

176+
self.memoizer.start(dfk=self, memoize=config.app_cache, checkpoint_files=checkpoint_files, run_dir=self.run_dir)
170177
self._checkpoint_timer = None
171178
self.checkpoint_mode = config.checkpoint_mode
172179

parsl/dataflow/memoization.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
121121

122122

123123
class Memoizer:
124+
def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: Sequence[str], run_dir: str) -> None:
125+
raise NotImplementedError
126+
124127
def update_memo(self, task: TaskRecord, r: Future[Any]) -> None:
125128
raise NotImplementedError
126129

@@ -167,7 +170,10 @@ class BasicMemoizer(Memoizer):
167170

168171
run_dir: str
169172

170-
def __init__(self, dfk: DataFlowKernel, *, memoize: bool = True, checkpoint_files: Sequence[str]):
173+
def __init__(self) -> None:
174+
pass
175+
176+
def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: Sequence[str], run_dir: str) -> None:
171177
"""Initialize the memoizer.
172178
173179
Args:
@@ -179,6 +185,7 @@ def __init__(self, dfk: DataFlowKernel, *, memoize: bool = True, checkpoint_file
179185
"""
180186
self.dfk = dfk
181187
self.memoize = memoize
188+
self.run_dir = run_dir
182189

183190
self.checkpointed_tasks = 0
184191

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import argparse
2+
3+
import pytest
4+
5+
import parsl
6+
from parsl.app.app import python_app
7+
from parsl.config import Config
8+
from parsl.dataflow.memoization import BasicMemoizer
9+
from parsl.dataflow.taskrecord import TaskRecord
10+
11+
12+
class DontReuseSevenMemoizer(BasicMemoizer):
13+
def check_memo(self, task_record: TaskRecord):
14+
if task_record['args'][0] == 7:
15+
return None # we didn't find a suitable memo record...
16+
else:
17+
return super().check_memo(task_record)
18+
19+
20+
def local_config():
21+
return Config(memoizer=DontReuseSevenMemoizer())
22+
23+
24+
@python_app(cache=True)
25+
def random_uuid(x, cache=True):
26+
import uuid
27+
return str(uuid.uuid4())
28+
29+
30+
@pytest.mark.local
31+
def test_python_memoization(n=10):
32+
"""Testing python memoization disable
33+
"""
34+
35+
# TODO: this .result() needs to be here, not in the loop
36+
# because otherwise we race to complete... and then
37+
# we might sometimes get a memoization before the loop
38+
# and sometimes not...
39+
x = random_uuid(0).result()
40+
41+
for i in range(0, n):
42+
foo = random_uuid(0)
43+
print(i)
44+
print(foo.result())
45+
assert foo.result() == x, "Memoized results were incorrectly not used"
46+
47+
y = random_uuid(7).result()
48+
49+
for i in range(0, n):
50+
foo = random_uuid(7)
51+
print(i)
52+
print(foo.result())
53+
assert foo.result() != y, "Memoized results were incorrectly used"

0 commit comments

Comments
 (0)