Skip to content

Commit e509906

Browse files
committed
remove DFK's use of checkpoint mode coniguration, into the memoizer code
1 parent ae375dd commit e509906

File tree

2 files changed

+35
-27
lines changed

2 files changed

+35
-27
lines changed

parsl/dataflow/dflow.py

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
from parsl.monitoring.remote import monitor_wrapper
5050
from parsl.process_loggers import wrap_with_logs
5151
from parsl.usage_tracking.usage import UsageTracker
52-
from parsl.utils import Timer, get_std_fname_mode, get_version
52+
from parsl.utils import get_std_fname_mode, get_version
5353

5454
logger = logging.getLogger(__name__)
5555

@@ -167,13 +167,13 @@ def __init__(self, config: Config) -> None:
167167

168168
# TODO: the parameters that remain here should be parameters that are going to be configured by
169169
# the user as part of checkpoint/memo configuration object.
170-
self.memoizer = Memoizer(memoize=config.app_cache, checkpoint_mode=config.checkpoint_mode, checkpoint_files=config.checkpoint_files)
170+
self.memoizer = Memoizer(memoize=config.app_cache,
171+
checkpoint_mode=config.checkpoint_mode,
172+
checkpoint_files=config.checkpoint_files,
173+
checkpoint_period=config.checkpoint_period)
171174
self.memoizer.run_dir = self.run_dir
172175
self.memoizer.start()
173176

174-
# TODO: this block needs to move too
175-
self._checkpoint_timer = None
176-
self.checkpoint_mode = config.checkpoint_mode
177177
self._modify_checkpointable_tasks_lock = threading.Lock()
178178

179179
# this must be set before executors are added since add_executors calls
@@ -190,17 +190,6 @@ def __init__(self, config: Config) -> None:
190190
self.add_executors(config.executors)
191191
self.add_executors([parsl_internal_executor])
192192

193-
if self.checkpoint_mode == "periodic":
194-
if config.checkpoint_period is None:
195-
raise ConfigurationError("Checkpoint period must be specified with periodic checkpoint mode")
196-
else:
197-
try:
198-
h, m, s = map(int, config.checkpoint_period.split(':'))
199-
except Exception:
200-
raise ConfigurationError("invalid checkpoint_period provided: {0} expected HH:MM:SS".format(config.checkpoint_period))
201-
checkpoint_period = (h * 3600) + (m * 60) + s
202-
self._checkpoint_timer = Timer(self.checkpoint, interval=checkpoint_period, name="Checkpoint")
203-
204193
self.task_count = 0
205194
self.tasks: Dict[int, TaskRecord] = {}
206195
self.submitter_lock = threading.Lock()
@@ -1187,16 +1176,7 @@ def cleanup(self) -> None:
11871176

11881177
self.log_task_states()
11891178

1190-
# checkpoint if any valid checkpoint method is specified
1191-
if self.checkpoint_mode is not None:
1192-
1193-
# TODO: accesses to self.checkpointable_tasks should happen
1194-
# under a lock?
1195-
self.memoizer.checkpoint()
1196-
1197-
if self._checkpoint_timer:
1198-
logger.info("Stopping checkpoint timer")
1199-
self._checkpoint_timer.close()
1179+
self.memoizer.close()
12001180

12011181
# Send final stats
12021182
self.usage_tracker.send_end_message()

parsl/dataflow/memoization.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414

1515
from parsl.dataflow.errors import BadCheckpoint
1616
from parsl.dataflow.taskrecord import TaskRecord
17-
from parsl.utils import get_all_checkpoints
17+
from parsl.errors import ConfigurationError
18+
from parsl.utils import Timer, get_all_checkpoints
1819

1920
logger = logging.getLogger(__name__)
2021

@@ -153,6 +154,7 @@ class Memoizer:
153154
def __init__(self, *,
154155
memoize: bool = True,
155156
checkpoint_files: Sequence[str] | None,
157+
checkpoint_period: Optional[str],
156158
checkpoint_mode: Literal['task_exit', 'periodic', 'dfk_exit', 'manual'] | None):
157159
"""Initialize the memoizer.
158160
@@ -168,9 +170,12 @@ def __init__(self, *,
168170

169171
self.checkpoint_files = checkpoint_files
170172
self.checkpoint_mode = checkpoint_mode
173+
self.checkpoint_period = checkpoint_period
171174

172175
self.checkpointable_tasks: List[TaskRecord] = []
173176

177+
self._checkpoint_timer: Timer | None = None
178+
174179
def start(self) -> None:
175180
if self.checkpoint_files is not None:
176181
checkpoint_files = self.checkpoint_files
@@ -190,6 +195,29 @@ def start(self) -> None:
190195
logger.info("App caching disabled for all apps")
191196
self.memo_lookup_table = {}
192197

198+
if self.checkpoint_mode == "periodic":
199+
if self.checkpoint_period is None:
200+
raise ConfigurationError("Checkpoint period must be specified with periodic checkpoint mode")
201+
else:
202+
try:
203+
h, m, s = map(int, self.checkpoint_period.split(':'))
204+
except Exception:
205+
raise ConfigurationError("invalid checkpoint_period provided: {0} expected HH:MM:SS".format(self.checkpoint_period))
206+
checkpoint_period = (h * 3600) + (m * 60) + s
207+
self._checkpoint_timer = Timer(self.checkpoint, interval=checkpoint_period, name="Checkpoint")
208+
209+
def close(self) -> None:
210+
# checkpoint if any valid checkpoint method is specified
211+
if self.checkpoint_mode is not None:
212+
213+
# TODO: accesses to self.checkpointable_tasks should happen
214+
# under a lock?
215+
self.checkpoint()
216+
217+
if self._checkpoint_timer:
218+
logger.info("Stopping checkpoint timer")
219+
self._checkpoint_timer.close()
220+
193221
def make_hash(self, task: TaskRecord) -> str:
194222
"""Create a hash of the task inputs.
195223

0 commit comments

Comments
 (0)