Skip to content

Commit 12b2168

Browse files
committed
configurable memoizer instance
1 parent b089733 commit 12b2168

File tree

4 files changed

+73
-3
lines changed

4 files changed

+73
-3
lines changed

parsl/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing_extensions import Literal
66

77
from parsl.dataflow.dependency_resolvers import DependencyResolver
8+
from parsl.dataflow.memoization import Memoizer
89
from parsl.dataflow.taskrecord import TaskRecord
910
from parsl.errors import ConfigurationError
1011
from parsl.executors.base import ParslExecutor
@@ -98,6 +99,7 @@ class Config(RepresentationMixin, UsageInformation):
9899
def __init__(self,
99100
executors: Optional[Iterable[ParslExecutor]] = None,
100101
app_cache: bool = True,
102+
memoizer: Optional[Memoizer] = None,
101103
checkpoint_files: Optional[Sequence[str]] = None,
102104
checkpoint_mode: Union[None,
103105
Literal['task_exit'],
@@ -127,6 +129,7 @@ def __init__(self,
127129
self._executors: Sequence[ParslExecutor] = executors
128130
self._validate_executors()
129131

132+
self.memoizer = memoizer
130133
self.app_cache = app_cache
131134
self.checkpoint_files = checkpoint_files
132135
self.checkpoint_mode = checkpoint_mode

parsl/dataflow/dflow.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,16 @@ def __init__(self, config: Config) -> None:
173173
else:
174174
checkpoint_files = []
175175

176-
self.memoizer: Memoizer = BasicMemoizer(self, memoize=config.app_cache, checkpoint_files=checkpoint_files)
177-
self.memoizer.run_dir = self.run_dir
176+
# self.memoizer: Memoizer = BasicMemoizer(self, memoize=config.app_cache, checkpoint_files=checkpoint_files)
177+
# the memoize flag might turn into the user choosing different instances
178+
# of the Memoizer interface
179+
self.memoizer: Memoizer
180+
if config.memoizer is not None:
181+
self.memoizer = config.memoizer
182+
else:
183+
self.memoizer = BasicMemoizer()
178184

185+
self.memoizer.start(dfk=self, memoize=config.app_cache, checkpoint_files=checkpoint_files, run_dir=self.run_dir)
179186
self._checkpoint_timer = None
180187
self.checkpoint_mode = config.checkpoint_mode
181188

parsl/dataflow/memoization.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ def id_for_memo_function(f: types.FunctionType, output_ref: bool = False) -> byt
121121

122122

123123
class Memoizer:
124+
def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: Sequence[str], run_dir: str) -> None:
125+
raise NotImplementedError
126+
124127
def update_memo(self, task: TaskRecord, r: Future[Any]) -> None:
125128
raise NotImplementedError
126129

@@ -164,7 +167,10 @@ class BasicMemoizer(Memoizer):
164167

165168
run_dir: str
166169

167-
def __init__(self, dfk: DataFlowKernel, *, memoize: bool = True, checkpoint_files: Sequence[str]):
170+
def __init__(self) -> None:
171+
pass
172+
173+
def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: Sequence[str], run_dir: str) -> None:
168174
"""Initialize the memoizer.
169175
170176
Args:
@@ -176,6 +182,7 @@ def __init__(self, dfk: DataFlowKernel, *, memoize: bool = True, checkpoint_file
176182
"""
177183
self.dfk = dfk
178184
self.memoize = memoize
185+
self.run_dir = run_dir
179186

180187
self.checkpointed_tasks = 0
181188

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import argparse
2+
3+
import pytest
4+
5+
import parsl
6+
from parsl.app.app import python_app
7+
from parsl.config import Config
8+
from parsl.dataflow.memoization import BasicMemoizer
9+
from parsl.dataflow.taskrecord import TaskRecord
10+
11+
12+
class DontReuseSevenMemoizer(BasicMemoizer):
13+
def check_memo(self, task_record: TaskRecord):
14+
if task_record['args'][0] == 7:
15+
return None # we didn't find a suitable memo record...
16+
else:
17+
return super().check_memo(task_record)
18+
19+
20+
def local_config():
21+
return Config(memoizer=DontReuseSevenMemoizer())
22+
23+
24+
@python_app(cache=True)
25+
def random_uuid(x, cache=True):
26+
import uuid
27+
return str(uuid.uuid4())
28+
29+
30+
@pytest.mark.local
31+
def test_python_memoization(n=10):
32+
"""Testing python memoization disable
33+
"""
34+
35+
# TODO: this .result() needs to be here, not in the loop
36+
# because otherwise we race to complete... and then
37+
# we might sometimes get a memoization before the loop
38+
# and sometimes not...
39+
x = random_uuid(0).result()
40+
41+
for i in range(0, n):
42+
foo = random_uuid(0)
43+
print(i)
44+
print(foo.result())
45+
assert foo.result() == x, "Memoized results were incorrectly not used"
46+
47+
y = random_uuid(7).result()
48+
49+
for i in range(0, n):
50+
foo = random_uuid(7)
51+
print(i)
52+
print(foo.result())
53+
assert foo.result() != y, "Memoized results were incorrectly used"

0 commit comments

Comments
 (0)