Skip to content

Commit e5d1a60

Browse files
committed
configurable memoizer instance
TODO: rewrite documentation that refers to any example checkpointing or app caching or memoization configuration with the documentation ready to receive documentation for the SQL plugin this PR is the biggest user facing change because it changes the configuration interface for anyone configuring anything to do with app caching/memoization/checkpointing.
1 parent e07945e commit e5d1a60

File tree

11 files changed

+91
-61
lines changed

11 files changed

+91
-61
lines changed

parsl/config.py

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing_extensions import Literal
66

77
from parsl.dataflow.dependency_resolvers import DependencyResolver
8+
from parsl.dataflow.memoization import Memoizer
89
from parsl.dataflow.taskrecord import TaskRecord
910
from parsl.errors import ConfigurationError
1011
from parsl.executors.base import ParslExecutor
@@ -27,17 +28,6 @@ class Config(RepresentationMixin, UsageInformation):
2728
executors : sequence of ParslExecutor, optional
2829
List (or other iterable) of `ParslExecutor` instances to use for executing tasks.
2930
Default is (:class:`~parsl.executors.threads.ThreadPoolExecutor()`,).
30-
app_cache : bool, optional
31-
Enable app caching. Default is True.
32-
checkpoint_files : sequence of str, optional
33-
List of paths to checkpoint files. See :func:`parsl.utils.get_all_checkpoints` and
34-
:func:`parsl.utils.get_last_checkpoint` for helpers. Default is None.
35-
checkpoint_mode : str, optional
36-
Checkpoint mode to use, can be ``'dfk_exit'``, ``'task_exit'``, ``'periodic'`` or ``'manual'``.
37-
If set to `None`, checkpointing will be disabled. Default is None.
38-
checkpoint_period : str, optional
39-
Time interval (in "HH:MM:SS") at which to checkpoint completed tasks. Only has an effect if
40-
``checkpoint_mode='periodic'``.
4131
dependency_resolver: plugin point for custom dependency resolvers. Default: only resolve Futures,
4232
using the `SHALLOW_DEPENDENCY_RESOLVER`.
4333
exit_mode: str, optional
@@ -100,14 +90,7 @@ class Config(RepresentationMixin, UsageInformation):
10090
@typeguard.typechecked
10191
def __init__(self,
10292
executors: Optional[Iterable[ParslExecutor]] = None,
103-
app_cache: bool = True,
104-
checkpoint_files: Optional[Sequence[str]] = None,
105-
checkpoint_mode: Union[None,
106-
Literal['task_exit'],
107-
Literal['periodic'],
108-
Literal['dfk_exit'],
109-
Literal['manual']] = None,
110-
checkpoint_period: Optional[str] = None,
93+
memoizer: Optional[Memoizer] = None,
11194
dependency_resolver: Optional[DependencyResolver] = None,
11295
exit_mode: Literal['cleanup', 'skip', 'wait'] = 'cleanup',
11396
garbage_collect: bool = True,
@@ -131,21 +114,7 @@ def __init__(self,
131114
self._executors: Sequence[ParslExecutor] = executors
132115
self._validate_executors()
133116

134-
self.app_cache = app_cache
135-
self.checkpoint_files = checkpoint_files
136-
self.checkpoint_mode = checkpoint_mode
137-
if checkpoint_period is not None:
138-
if checkpoint_mode is None:
139-
logger.debug('The requested `checkpoint_period={}` will have no effect because `checkpoint_mode=None`'.format(
140-
checkpoint_period)
141-
)
142-
elif checkpoint_mode != 'periodic':
143-
logger.debug("Requested checkpoint period of {} only has an effect with checkpoint_mode='periodic'".format(
144-
checkpoint_period)
145-
)
146-
if checkpoint_mode == 'periodic' and checkpoint_period is None:
147-
checkpoint_period = "00:30:00"
148-
self.checkpoint_period = checkpoint_period
117+
self.memoizer = memoizer
149118
self.dependency_resolver = dependency_resolver
150119
self.exit_mode = exit_mode
151120
self.garbage_collect = garbage_collect

parsl/configs/ASPIRE1.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from parsl.addresses import address_by_interface
22
from parsl.config import Config
3+
from parsl.dataflow.memoization import BasicMemoizer
34
from parsl.executors import HighThroughputExecutor
45
from parsl.launchers import MpiRunLauncher
56
from parsl.monitoring.monitoring import MonitoringHub
@@ -38,7 +39,6 @@
3839
),
3940
strategy='simple',
4041
retries=3,
41-
app_cache=True,
42-
checkpoint_mode='task_exit',
42+
memoizer=BasicMemoizer(checkpoint_mode='task_exit'),
4343
usage_tracking=LEVEL_1,
4444
)

parsl/dataflow/dflow.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,15 +165,8 @@ def __init__(self, config: Config) -> None:
165165
self.monitoring_radio.send((MessageType.WORKFLOW_INFO,
166166
workflow_info))
167167

168-
# TODO: the parameters that remain here should be parameters that are going to be configured by
169-
# the user as part of checkpoint/memo configuration object.
170-
self.memoizer: Memoizer = BasicMemoizer(memoize=config.app_cache,
171-
checkpoint_mode=config.checkpoint_mode,
172-
checkpoint_files=config.checkpoint_files,
173-
checkpoint_period=config.checkpoint_period)
174-
self.memoizer.run_dir = self.run_dir
175-
176-
self.memoizer.start()
168+
self.memoizer: Memoizer = config.memoizer if config.memoizer is not None else BasicMemoizer()
169+
self.memoizer.start(run_dir=self.run_dir)
177170

178171
# this must be set before executors are added since add_executors calls
179172
# job_status_poller.add_executors.

parsl/dataflow/memoization.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ def update_memo_exception(self, task: TaskRecord, e: BaseException) -> None:
125125
def update_memo_result(self, task: TaskRecord, r: Any) -> None:
126126
raise NotImplementedError
127127

128+
def start(self, *, run_dir: str) -> None:
129+
raise NotImplementedError
130+
128131
def checkpoint(self, *, task: Optional[TaskRecord] = None) -> None:
129132
raise NotImplementedError
130133

@@ -169,18 +172,17 @@ class BasicMemoizer(Memoizer):
169172
run_dir: str
170173

171174
def __init__(self, *,
172-
memoize: bool = True,
173-
checkpoint_files: Sequence[str] | None,
174-
checkpoint_period: Optional[str],
175-
checkpoint_mode: Literal['task_exit', 'periodic', 'dfk_exit', 'manual'] | None):
175+
checkpoint_files: Sequence[str] | None = None,
176+
checkpoint_period: Optional[str] = None,
177+
checkpoint_mode: Literal['task_exit', 'periodic', 'dfk_exit', 'manual'] | None = None,
178+
memoize: bool = True): # TODO: unlikely to need to set this to false, but it was in config API before...
176179
"""Initialize the memoizer.
177180
178181
KWargs:
179182
- memoize (Bool): enable memoization or not.
180183
- checkpoint (Dict): A checkpoint loaded as a dict.
184+
TODO: update
181185
"""
182-
self.memoize = memoize
183-
184186
self.checkpointed_tasks = 0
185187

186188
self.checkpoint_lock = threading.Lock()
@@ -192,8 +194,12 @@ def __init__(self, *,
192194
self.checkpointable_tasks: List[TaskRecord] = []
193195

194196
self._checkpoint_timer: Timer | None = None
197+
self.memoize = memoize
198+
199+
def start(self, *, run_dir: str) -> None:
200+
201+
self.run_dir = run_dir
195202

196-
def start(self) -> None:
197203
if self.checkpoint_files is not None:
198204
checkpoint_files = self.checkpoint_files
199205
elif self.checkpoint_files is None and self.checkpoint_mode is not None:

parsl/tests/configs/htex_local_alternate.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from parsl.data_provider.ftp import FTPInTaskStaging
2323
from parsl.data_provider.http import HTTPInTaskStaging
2424
from parsl.data_provider.zip import ZipFileStaging
25+
from parsl.dataflow.memoization import BasicMemoizer
2526
from parsl.executors import HighThroughputExecutor
2627
from parsl.launchers import SingleNodeLauncher
2728

@@ -56,7 +57,7 @@ def fresh_config():
5657
)
5758
],
5859
strategy='simple',
59-
app_cache=True, checkpoint_mode='task_exit',
60+
memoizer=BasicMemoizer(memoize=True, checkpoint_mode='task_exit'),
6061
retries=2,
6162
monitoring=MonitoringHub(
6263
monitoring_debug=False,
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from parsl.config import Config
2+
from parsl.dataflow.memoization import BasicMemoizer
23
from parsl.executors.threads import ThreadPoolExecutor
34

45
config = Config(
@@ -7,5 +8,5 @@
78
label='local_threads_checkpoint_dfk_exit',
89
)
910
],
10-
checkpoint_mode='dfk_exit'
11+
memoizer=BasicMemoizer(checkpoint_mode='dfk_exit')
1112
)
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from parsl.config import Config
2+
from parsl.dataflow.memoization import BasicMemoizer
23
from parsl.executors.threads import ThreadPoolExecutor
34

45
config = Config(
@@ -7,5 +8,5 @@
78
label='local_threads_checkpoint_task_exit',
89
)
910
],
10-
checkpoint_mode='task_exit'
11+
memoizer=BasicMemoizer(checkpoint_mode='task_exit')
1112
)

parsl/tests/test_checkpointing/test_periodic.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@
33
import parsl
44
from parsl.app.app import python_app
55
from parsl.config import Config
6+
from parsl.dataflow.memoization import BasicMemoizer
67
from parsl.executors.threads import ThreadPoolExecutor
78

89

910
def fresh_config():
1011
tpe = ThreadPoolExecutor(label='local_threads_checkpoint_periodic', max_threads=1)
1112
return Config(
1213
executors=[tpe],
13-
checkpoint_mode='periodic',
14-
checkpoint_period='00:00:02'
14+
memoizer=BasicMemoizer(
15+
checkpoint_mode='periodic',
16+
checkpoint_period='00:00:02'
17+
)
1518
)
1619

1720

@@ -32,7 +35,8 @@ def test_periodic():
3235
"""Test checkpointing with task_periodic behavior
3336
"""
3437
with parsl.load(fresh_config()):
35-
h, m, s = map(int, parsl.dfk().config.checkpoint_period.split(":"))
38+
memoizer = parsl.dfk().memoizer
39+
h, m, s = map(int, memoizer.checkpoint_period.split(":"))
3640
assert h == 0, "Verify test setup"
3741
assert m == 0, "Verify test setup"
3842
assert s > 0, "Verify test setup"

parsl/tests/test_checkpointing/test_regression_233.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import pytest
22

33
from parsl.app.app import python_app
4+
from parsl.config import Config
45
from parsl.dataflow.dflow import DataFlowKernel
6+
from parsl.dataflow.memoization import BasicMemoizer
57

68

79
def run_checkpointed(checkpoints):
8-
from parsl.tests.configs.local_threads_checkpoint_task_exit import config
9-
config.checkpoint_files = checkpoints
10+
config = Config(memoizer=BasicMemoizer(checkpoint_files=checkpoints, checkpoint_mode='task_exit'))
1011
dfk = DataFlowKernel(config=config)
1112

1213
@python_app(data_flow_kernel=dfk, cache=True)

parsl/tests/test_python_apps/test_memoize_2.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import parsl
66
from parsl.app.app import python_app
77
from parsl.config import Config
8+
from parsl.dataflow.memoization import BasicMemoizer
89
from parsl.executors.threads import ThreadPoolExecutor
910

1011

@@ -13,7 +14,7 @@ def local_config():
1314
executors=[
1415
ThreadPoolExecutor(max_threads=4),
1516
],
16-
app_cache=False
17+
memoizer=BasicMemoizer(memoize=False) # TODO: this should be a better do-nothing impl?
1718
)
1819

1920

0 commit comments

Comments
 (0)