Skip to content

Commit 3e74939

Browse files
committed
out-of-memory checkpointing
goal: results should not (never? in weak small cache?) be stored in an in-memory memo table. so that memo table should be not present in this implementation. instead all memo questions go to the sqlite3 database. this drives some blurring between in-memory caching and disk-based checkpointing: the previous disk based checkpointed model relied on repopulating the in-memory memo table cache... i hit some thread problems when using one sqlite3 connection across threads and the docs are unclear about what I can/cannot do, so i made this open the sqlite3 database on every access. that's probably got quite a performance hit, but its probably enough for basically validating the idea. this hangs right now because update_memo is moved to a different place and it now has different semantics (compared to when I initially implemented this patch): the Future is now not-populated, so we can't ask the future for the result. which means we need to inject the result in a different way. the three usecases: sqlite3, in-memory and traditional on-disk checkpoint, maybe needs some thoughts about the completion semantics and data structures to go with it.
1 parent 65295b1 commit 3e74939

File tree

4 files changed

+191
-5
lines changed

4 files changed

+191
-5
lines changed

parsl/benchmark/perf.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def performance(*, resources: dict, target_t: float, args_extra_size: int) -> No
5353

5454
iteration = 1
5555

56-
args_extra_payload = "x" * args_extra_size
56+
# args_extra_payload = "x" * args_extra_size
5757

5858
while delta_t < threshold_t or iteration <= min_iterations:
5959
print(f"==== Iteration {iteration} ====")
@@ -62,7 +62,13 @@ def performance(*, resources: dict, target_t: float, args_extra_size: int) -> No
6262

6363
fs = []
6464
print("Submitting tasks / invoking apps")
65-
for _ in range(n):
65+
for index in range(n):
66+
# this means there is a different argument for each iteration,
67+
# which will make checkpointing/memo behave differently
68+
# so this could be switchable in parsl-perf dev branch
69+
# args_extra_payload = index # always a new one (except for run repeats)
70+
71+
args_extra_payload = index % 10
6672
fs.append(app(args_extra_payload, parsl_resource_specification=resources))
6773

6874
submitted_t = time.time()

parsl/dataflow/memosql.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import logging
2+
import pickle
3+
import sqlite3
4+
from concurrent.futures import Future
5+
from pathlib import Path
6+
from typing import Any, Optional
7+
8+
from parsl.dataflow.memoization import Memoizer, make_hash
9+
from parsl.dataflow.taskrecord import TaskRecord
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class SQLiteMemoizer(Memoizer):
15+
"""Memoize out of memory into an sqlite3 database.
16+
"""
17+
18+
def __init__(self, *, checkpoint_dir: str | None = None):
19+
self.checkpoint_dir = checkpoint_dir
20+
21+
def start(self, *, run_dir: str) -> None:
22+
"""TODO: run_dir is the per-workflow run dir, but we need a broader checkpoint context... one level up
23+
by default... get_all_checkpoints uses "runinfo/" as a relative path for that by default so replicating
24+
that choice would do here. likewise I think for monitoring."""
25+
26+
self.run_dir = run_dir
27+
28+
dir = self.checkpoint_dir if self.checkpoint_dir is not None else self.run_dir
29+
30+
self.db_path = Path(dir) / "checkpoint.sqlite3"
31+
logger.debug("starting with db_path %r", self.db_path)
32+
33+
connection = sqlite3.connect(self.db_path)
34+
cursor = connection.cursor()
35+
36+
cursor.execute("CREATE TABLE IF NOT EXISTS checkpoints(key, result)")
37+
# probably want some index on key because that's what we're doing all the access via.
38+
39+
connection.commit()
40+
connection.close()
41+
logger.debug("checkpoint table created")
42+
43+
def close(self):
44+
"""TODO: probably going to need some kind of shutdown now, to close the sqlite3 connection."""
45+
pass
46+
47+
def checkpoint(self, *, task: TaskRecord | None = None) -> None:
48+
"""All the behaviour for this memoizer is in check_memo and update_memo.
49+
"""
50+
logger.debug("Explicit checkpoint call is a no-op with this memoizer")
51+
52+
def check_memo(self, task: TaskRecord) -> Optional[Future]:
53+
"""TODO: document this: check_memo is required to set the task hashsum,
54+
if that's how we're going to key checkpoints in update_memo. (that's not
55+
a requirement though: other equalities are available."""
56+
task_id = task['id']
57+
58+
if not task['memoize']:
59+
task['hashsum'] = None
60+
logger.debug("Task %s will not be memoized", task_id)
61+
return None
62+
63+
hashsum = make_hash(task)
64+
logger.debug("Task {} has memoization hash {}".format(task_id, hashsum))
65+
task['hashsum'] = hashsum
66+
67+
connection = sqlite3.connect(self.db_path)
68+
cursor = connection.cursor()
69+
cursor.execute("SELECT result FROM checkpoints WHERE key = ?", (hashsum, ))
70+
r = cursor.fetchone()
71+
72+
if r is None:
73+
connection.close()
74+
return None
75+
else:
76+
data = pickle.loads(r[0])
77+
connection.close()
78+
79+
memo_fu: Future = Future()
80+
81+
if data['exception'] is None:
82+
memo_fu.set_result(data['result'])
83+
else:
84+
assert data['result'] is None
85+
memo_fu.set_exception(data['exception'])
86+
87+
return memo_fu
88+
89+
def update_memo_result(self, task: TaskRecord, result: Any) -> None:
90+
logger.debug("updating memo")
91+
92+
if not task['memoize'] or 'hashsum' not in task:
93+
logger.debug("preconditions for memo not satisfied")
94+
return
95+
96+
if not isinstance(task['hashsum'], str):
97+
logger.error(f"Attempting to update app cache entry but hashsum is not a string key: {task['hashsum']}")
98+
return
99+
100+
hashsum = task['hashsum']
101+
102+
# this comes from the original concatenation-based checkpoint code:
103+
# assert app_fu.done(), "assumption: update_memo is called after future has a result"
104+
t = {'hash': hashsum, 'exception': None, 'result': result}
105+
# else:
106+
# t = {'hash': hashsum, 'exception': app_fu.exception(), 'result': None}
107+
108+
value = pickle.dumps(t)
109+
110+
connection = sqlite3.connect(self.db_path)
111+
cursor = connection.cursor()
112+
113+
cursor.execute("INSERT INTO checkpoints VALUES(?, ?)", (hashsum, value))
114+
115+
connection.commit()
116+
connection.close()
117+
118+
def update_memo_exception(self, task: TaskRecord, exception: BaseException) -> None:
119+
logger.debug("updating memo")
120+
121+
if not task['memoize'] or 'hashsum' not in task:
122+
logger.debug("preconditions for memo not satisfied")
123+
return
124+
125+
if not isinstance(task['hashsum'], str):
126+
logger.error(f"Attempting to update app cache entry but hashsum is not a string key: {task['hashsum']}")
127+
return
128+
129+
hashsum = task['hashsum']
130+
131+
# this comes from the original concatenation-based checkpoint code:
132+
# assert app_fu.done(), "assumption: update_memo is called after future has a result"
133+
# t = {'hash': hashsum, 'exception': None, 'result': app_fu.result()}
134+
# else:
135+
t = {'hash': hashsum, 'exception': exception, 'result': None}
136+
137+
value = pickle.dumps(t)
138+
139+
connection = sqlite3.connect(self.db_path)
140+
cursor = connection.cursor()
141+
142+
cursor.execute("INSERT INTO checkpoints VALUES(?, ?)", (hashsum, value))
143+
144+
connection.commit()
145+
connection.close()

parsl/tests/configs/htex_local_alternate.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from parsl.data_provider.ftp import FTPInTaskStaging
2323
from parsl.data_provider.http import HTTPInTaskStaging
2424
from parsl.data_provider.zip import ZipFileStaging
25-
from parsl.dataflow.memoization import BasicMemoizer
25+
from parsl.dataflow.memosql import SQLiteMemoizer
2626
from parsl.executors import HighThroughputExecutor
2727
from parsl.launchers import SingleNodeLauncher
2828

@@ -57,14 +57,14 @@ def fresh_config():
5757
)
5858
],
5959
strategy='simple',
60-
memoizer=BasicMemoizer(memoize=True, checkpoint_mode='task_exit'),
6160
retries=2,
6261
monitoring=MonitoringHub(
6362
monitoring_debug=False,
6463
resource_monitoring_interval=1,
6564
),
6665
usage_tracking=3,
67-
project_name="parsl htex_local_alternate test configuration"
66+
project_name="parsl htex_local_alternate test configuration",
67+
memoizer=SQLiteMemoizer()
6868
)
6969

7070

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import contextlib
2+
import os
3+
4+
import pytest
5+
6+
import parsl
7+
from parsl import python_app
8+
from parsl.config import Config
9+
from parsl.dataflow.memosql import SQLiteMemoizer
10+
11+
12+
def parsl_configured(run_dir, memoizer):
13+
return parsl.load(Config(
14+
run_dir=str(run_dir),
15+
memoizer=memoizer
16+
))
17+
18+
19+
@python_app(cache=True)
20+
def uuid_app():
21+
import uuid
22+
return uuid.uuid4()
23+
24+
25+
@pytest.mark.local
26+
def test_loading_checkpoint(tmpd_cwd):
27+
"""Load memoization table from previous checkpoint
28+
"""
29+
with parsl_configured(tmpd_cwd, SQLiteMemoizer(checkpoint_dir=tmpd_cwd)):
30+
result = uuid_app().result()
31+
32+
with parsl_configured(tmpd_cwd, SQLiteMemoizer(checkpoint_dir=tmpd_cwd)):
33+
relaunched = uuid_app().result()
34+
35+
assert result == relaunched, "Expected following call to uuid_app to return cached uuid"

0 commit comments

Comments
 (0)