Skip to content

Commit e531aca

Browse files
authored
Merge pull request #416 from alexhsamuel/feature/archive-chunks
Archive chunk size.
2 parents ed4ad21 + 655b854 commit e531aca

File tree

7 files changed

+159
-54
lines changed

7 files changed

+159
-54
lines changed

RELEASE.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# v0.30
22

33
- Config `database.timeout` sets the sqlite lock timeout.
4+
- The run archive program takes `chunk_size` and `chunk_sleep` parameters, to
5+
divide the archive operation into chunks with pauses in between.
46

57

68
# v0.29

docs/programs.rst

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ The archive program retires a run from Apsis's memory before archiving it. The
205205
run is no longer visible through any UI. A run that is not completed cannot be
206206
archived.
207207

208-
This job archives up to 10,000 runs older than 14 days (1,209,600 seconds):
208+
This job archives up to 10,000 runs older than 14 days (1,209,600 seconds), in
209+
chunks of 1,000 runs at a time, with a 10 second pause between chunks:
209210

210211
.. code:: yaml
211212
@@ -218,11 +219,16 @@ This job archives up to 10,000 runs older than 14 days (1,209,600 seconds):
218219
type: apsis.program.internal.archive.ArchiveProgram
219220
age: 1209600
220221
count: 10000
222+
chunk_size: 1000
223+
chunk_sleep: 10
221224
path: '/path/to/apsis/archive.db'
222225
223-
The archive program blocks Apsis from performing other tasks. Adjust the
224-
`count` parameter so that the archiving process does not take more than a few
225-
seconds, to avoid long delays in startng scheduled runs.
226+
The archive program blocks Apsis from performing other tasks for each chunk of
227+
archive runs. Adjust the `chunk_size`, `chunk_sleep`, and `count` parameters so
228+
that the archiving process pauses every few seconds, to avoid long delays in
229+
starting scheduled runs. If the `chunk_size` parameter is omitted, all runs are
230+
archived in one chunk. If the `chunk_sleep` parameter is omitted, Apsis does
231+
not pause between chunks.
226232

227233
The archive file is also an SQLite3 database file, and contains the subset of
228234
columns from the main database file that contains run data. The archive file

python/apsis/lib/json.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ def expand_dotted_keys(mapping):
8585
return type(mapping)(result)
8686

8787

88+
def nkey(name, value):
89+
return {} if value is None else {name: value}
90+
91+
8892
#-------------------------------------------------------------------------------
8993

9094
class TypedJso:

python/apsis/program/internal/archive.py

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import ora
44

55
from ..base import _InternalProgram, ProgramRunning, ProgramSuccess
6-
from apsis.lib.json import check_schema
6+
from apsis.lib.json import check_schema, nkey
77
from apsis.lib.parse import parse_duration
88
from apsis.runs import template_expand
99

@@ -23,7 +23,7 @@ class ArchiveProgram(_InternalProgram):
2323
skipped for archiving.
2424
"""
2525

26-
def __init__(self, *, age, path, count):
26+
def __init__(self, *, age, path, count, chunk_size=None, chunk_sleep=None):
2727
"""
2828
If this archive file doesn't exist, it is created automatically on
2929
first use; the contianing directory must exist.
@@ -35,38 +35,59 @@ def __init__(self, *, age, path, count):
3535
Apsis database file.
3636
:param count:
3737
Maximum number of runs to archive per run of this program.
38+
:param chunk_size:
39+
Number of runs to archive in one chunk. Each chunk is blocking.
40+
:param chunk_sleep:
41+
Time in seconds to wait between chunks.
3842
"""
39-
self.__age = age
40-
self.__path = path
41-
self.__count = count
43+
self.__age = age
44+
self.__path = path
45+
self.__count = count
46+
self.__chunk_size = chunk_size
47+
self.__chunk_sleep = chunk_sleep
4248

4349

4450
def __str__(self):
4551
return f"archive age {self.__age}{self.__path}"
4652

4753

4854
def bind(self, args):
49-
age = parse_duration(template_expand(self.__age, args))
50-
path = template_expand(self.__path, args)
51-
count = int(template_expand(self.__count, args))
52-
return type(self)(age=age, path=path, count=count)
55+
return type(self)(
56+
age = parse_duration(template_expand(self.__age, args)),
57+
path = template_expand(self.__path, args),
58+
count = int(template_expand(self.__count, args)),
59+
chunk_size = None if self.__chunk_size is None
60+
else int(template_expand(self.__chunk_size, args)),
61+
chunk_sleep = None if self.__chunk_sleep is None
62+
else float(template_expand(self.__chunk_sleep, args)),
63+
)
5364

5465

5566
@classmethod
5667
def from_jso(cls, jso):
5768
with check_schema(jso) as pop:
58-
age = pop("age")
59-
path = pop("path", str)
60-
count = pop("count", int)
61-
return cls(age=age, path=path, count=count)
69+
age = pop("age")
70+
path = pop("path", str)
71+
count = pop("count", int)
72+
chunk_size = pop("chunk_size", int, None)
73+
chunk_sleep = pop("chunk_sleep", float, None)
74+
return cls(
75+
age =age,
76+
path =path,
77+
count =count,
78+
chunk_size =chunk_size,
79+
chunk_sleep =chunk_sleep,
80+
)
6281

6382

6483
def to_jso(self):
6584
return {
6685
**super().to_jso(),
67-
"age": self.__age,
68-
"path": self.__path,
69-
"count": self.__count,
86+
"age" : self.__age,
87+
"path" : self.__path,
88+
"count" : self.__count,
89+
**nkey("chunk_size", self.__chunk_size),
90+
**nkey("chunk_sleep", self.__chunk_sleep),
7091
}
7192

7293

@@ -78,28 +99,47 @@ async def wait(self, apsis):
7899
# FIXME: Private attributes.
79100
db = apsis._Apsis__db
80101

81-
run_ids = db.get_archive_run_ids(
82-
before =ora.now() - self.__age,
83-
count =self.__count,
84-
)
85-
86-
# Make sure all runs are retired; else skip them.
87-
run_ids = [ r for r in run_ids if apsis.run_store.retire(r) ]
88-
89-
if len(run_ids) > 0:
90-
# Archive these runs.
91-
row_counts = db.archive(self.__path, run_ids)
92-
# Also vacuum to free space.
93-
db.vacuum()
102+
if not (self.__chunk_size is None or 0 < self.__chunk_size):
103+
raise ValueError("nonpositive chunk size")
94104

95-
else:
96-
row_counts = {}
105+
row_counts = {}
106+
meta = {
107+
"run count" : 0,
108+
"run_ids" : [],
109+
"row counts": row_counts
110+
}
97111

98-
return ProgramSuccess(meta={
99-
"run count" : len(run_ids),
100-
"run_ids" : run_ids,
101-
"row counts": row_counts,
102-
})
112+
count = self.__count
113+
while count > 0:
114+
chunk = (
115+
count if self.__chunk_size is None
116+
else min(count, self.__chunk_size)
117+
)
118+
run_ids = db.get_archive_run_ids(
119+
before =ora.now() - self.__age,
120+
count =chunk,
121+
)
122+
count -= chunk
123+
124+
# Make sure all runs are retired; else skip them.
125+
run_ids = [ r for r in run_ids if apsis.run_store.retire(r) ]
126+
127+
if len(run_ids) > 0:
128+
# Archive these runs.
129+
chunk_row_counts = db.archive(self.__path, run_ids)
130+
# Accumulate metadata.
131+
meta["run count"] += len(run_ids)
132+
meta["run_ids"].append(run_ids)
133+
for key, value in chunk_row_counts.items():
134+
row_counts[key] = row_counts.get(key, 0) + value
135+
# Also vacuum to free space.
136+
db.vacuum()
137+
138+
if count > 0 and self.__chunk_sleep is not None:
139+
# Yield to the event loop.
140+
await asyncio.sleep(self.__chunk_sleep)
141+
142+
return ProgramSuccess(meta=meta)
103143

104144

105145
def reconnect(self, run_id, run_state, apsis):

python/apsis/service/client.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -332,23 +332,24 @@ def schedule(self, job_id, args, time="now", *, count=None):
332332
return next(iter(runs.values())) if count is None else runs.values()
333333

334334

335-
def __schedule(self, time, job):
335+
def __schedule(self, time, job, count):
336336
time = "now" if time == "now" else str(Time(time))
337337
data = {
338338
"job": job,
339339
"times": {
340340
"schedule": time,
341341
},
342342
}
343-
runs = self.__post("/api/v1/runs", data=data)["runs"]
344-
return next(iter(runs.values()))
343+
runs = self.__post("/api/v1/runs", data=data, count=count)["runs"]
344+
# FIXME: Hacky.
345+
return next(iter(runs.values())) if count is None else runs.values()
345346

346347

347-
def schedule_adhoc(self, time, job):
348-
return self.__schedule(time, job)
348+
def schedule_adhoc(self, time, job, *, count=None):
349+
return self.__schedule(time, job, count)
349350

350351

351-
def schedule_program(self, time, args):
352+
def schedule_program(self, time, args, *, count=None):
352353
"""
353354
:param time:
354355
The schedule time, or "now" for immediate.
@@ -357,17 +358,17 @@ def schedule_program(self, time, args):
357358
to run.
358359
"""
359360
args = [ str(a) for a in args ]
360-
return self.__schedule(time, {"program": args})
361+
return self.__schedule(time, {"program": args}, count)
361362

362363

363-
def schedule_shell_program(self, time, command):
364+
def schedule_shell_program(self, time, command, *, count=None):
364365
"""
365366
:param time:
366367
The schedule time, or "now" for immediate.
367368
:param command:
368369
The shell command to run.
369370
"""
370-
return self.__schedule(time, {"program": str(command)})
371+
return self.__schedule(time, {"program": str(command)}, count)
371372

372373

373374
def reload_jobs(self, *, dry_run=False):

python/apsis/sqlite.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,6 @@ def get_archive_run_ids(self, *, before, count):
761761
sa.select([TBL_RUNS.c.run_id])
762762
.where(TBL_RUNS.c.timestamp < dump_time(before))
763763
.where(TBL_RUNS.c.state.in_(FINISHED_STATES))
764-
.order_by(TBL_RUNS.c.timestamp)
765764
.limit(count)
766765
)
767766
]

test/int/test_archive.py

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def test_archive(tmp_path):
5656
res = inst.wait_run(res["run_id"])
5757
# The first run has been archived.
5858
assert res["meta"]["program"]["run count"] == 1
59-
assert res["meta"]["program"]["run_ids"] == [run_id0]
59+
assert res["meta"]["program"]["run_ids"] == [[run_id0]]
6060

6161
# The first run is no longer be available; the other two are.
6262
with pytest.raises(APIError):
@@ -77,7 +77,7 @@ def test_archive(tmp_path):
7777
# The second run was archived, but the third isn't old enough yet.
7878
res = inst.wait_run(res["run_id"])
7979
assert res["meta"]["program"]["run count"] == 1
80-
assert res["meta"]["program"]["run_ids"] == [run_id1]
80+
assert res["meta"]["program"]["run_ids"] == [[run_id1]]
8181

8282
# The second run is no longer available.
8383
with pytest.raises(APIError):
@@ -103,6 +103,57 @@ def test_archive(tmp_path):
103103
assert rows[0] == (run_id1, "combined stdout & stderr", 14)
104104

105105

106+
def test_archive_chunks(tmp_path):
107+
path = tmp_path / "archive.db"
108+
job_dir = tmp_path / "jobs"
109+
job_dir.mkdir()
110+
111+
with closing(ApsisService(
112+
cfg={"schedule": {"horizon": 1}},
113+
job_dir=job_dir,
114+
)) as inst:
115+
inst.create_db()
116+
inst.write_cfg()
117+
inst.start_serve()
118+
inst.wait_for_serve()
119+
120+
client = inst.client
121+
122+
# Run 100 runs.
123+
res = client.schedule_adhoc(
124+
"now", {"program": {"type": "no-op"}}, count=100)
125+
run_ids = { r["run_id"] for r in res }
126+
for run_id in run_ids:
127+
inst.wait_run(run_id)
128+
129+
time.sleep(1)
130+
131+
# Archive, with a max age of 1 s and up to 80 runs, chunked by 10.
132+
res = client.schedule_adhoc("now", {
133+
"program": {
134+
"type": "apsis.program.internal.archive.ArchiveProgram",
135+
"age": 1,
136+
"count": 80,
137+
"chunk_size": 10,
138+
"chunk_sleep": 0.1,
139+
"path": str(path),
140+
},
141+
})
142+
res = inst.wait_run(res["run_id"])
143+
# Runs have been archived.
144+
meta = res["meta"]["program"]
145+
assert meta["run count"] == 80
146+
assert len(meta["run_ids"]) == 8
147+
assert all( len(c) == 10 for c in meta["run_ids"] )
148+
assert all( r in run_ids for c in meta["run_ids"] for r in c )
149+
150+
# Check the archive file.
151+
with closing(sqlite3.connect(path)) as db:
152+
rows = set(db.execute("SELECT run_id, state FROM runs"))
153+
assert len(rows) == 80
154+
assert all( r[0] in run_ids and r[1] == "success" for r in rows )
155+
156+
106157
def test_clean_up_jobs(tmp_path):
107158
path = tmp_path / "archive.db"
108159
job_dir = tmp_path / "jobs"
@@ -156,8 +207,10 @@ def test_clean_up_jobs(tmp_path):
156207
res = inst.wait_run(res["run_id"])
157208
archive_job_id = res["job_id"]
158209
# The first two runs have been archived.
159-
assert res["meta"]["program"]["run count"] == 2
160-
assert set(res["meta"]["program"]["run_ids"]) == {run_id0, run_id1}
210+
meta = res["meta"]["program"]
211+
assert meta["run count"] == 2
212+
assert len(meta["run_ids"]) == 1 # one chunk
213+
assert set(meta["run_ids"][0]) == {run_id0, run_id1}
161214

162215
# Check the DB. Only the third job ID should remain, plus the job ID from
163216
# the archive job.

0 commit comments

Comments
 (0)