Skip to content

Commit 5ee7baf

Browse files
authored
Merge pull request #427 from alexhsamuel/feature/archive-with-rowid
Select rows by runs.rowid in archive.
2 parents 1094a25 + 6b55fa4 commit 5ee7baf

File tree

1 file changed

+36
-17
lines changed

1 file changed

+36
-17
lines changed

python/apsis/sqlite.py

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,19 @@ def disposing(engine):
5050
engine.dispose()
5151

5252

53+
def _make_run_id(rowid):
54+
return f"r{rowid}"
55+
56+
57+
def _parse_run_id(run_id):
58+
assert run_id[0] == "r"
59+
return int(run_id[1 :])
60+
61+
62+
# SQLite implicitly includes a 'rowid' column in each table, which SA doesn't
63+
# automatically include in the schema.
64+
COL_ROWID = sa.literal_column("rowid")
65+
5366
METADATA = sa.MetaData()
5467

5568
#-------------------------------------------------------------------------------
@@ -172,7 +185,7 @@ def __init__(self, engine):
172185

173186

174187
def get_next_run_id(self):
175-
run_id = f"r{self.__next}"
188+
run_id = _make_run_id(self.__next)
176189
self.__next += 1
177190
if self.__db_next < self.__next:
178191
# Increment by more than one, to decrease database writes. If we
@@ -338,9 +351,8 @@ def __query_runs(conn, expr):
338351

339352

340353
def upsert(self, run):
341-
assert run.run_id.startswith("r")
342354
assert not run.expected
343-
rowid = int(run.run_id[1 :])
355+
rowid = _parse_run_id(run.run_id)
344356

345357
program = (
346358
None if run.program is None
@@ -773,14 +785,16 @@ def get_archive_run_ids(self, *, before, count):
773785
Timer() as timer,
774786
self.__engine.begin() as tx,
775787
):
776-
run_ids = [
777-
r for r, in tx.execute(
778-
sa.select([TBL_RUNS.c.run_id])
779-
.where(TBL_RUNS.c.timestamp < dump_time(before))
780-
.where(TBL_RUNS.c.state.in_(FINISHED_STATES))
781-
.limit(count)
782-
)
783-
]
788+
res = list(tx.execute(
789+
sa.select([TBL_RUNS.c.run_id, COL_ROWID])
790+
.where(TBL_RUNS.c.timestamp < dump_time(before))
791+
.where(TBL_RUNS.c.state.in_(FINISHED_STATES))
792+
.limit(count)
793+
))
794+
795+
# Make sure rowids and run_ids correspond.
796+
assert all( rowid == _parse_run_id(run_id) for run_id, rowid in res )
797+
run_ids = [ r for r, _ in res ]
784798

785799
log.info(
786800
f"obtained {len(run_ids)} runs to archive in {timer.elapsed:.3f} s")
@@ -798,6 +812,8 @@ def archive(self, path, run_ids):
798812
:param run_ids:
799813
Sequence of run IDs to archive.
800814
"""
815+
rowids = [ _parse_run_id(i) for i in run_ids ]
816+
801817
# Open the archive file, creating if necessary.
802818
archive_engine = self.__get_engine(path)
803819
# Create tables if necessary.
@@ -832,9 +848,15 @@ def archive(self, path, run_ids):
832848
)
833849

834850
for table in ARCHIVE_TABLES:
851+
# Predicate for rows to select. In the runs table, query
852+
# by rowid instead of run_id, for performance.
853+
row_pred = (
854+
COL_ROWID.in_(rowids) if table is TBL_RUNS
855+
else table.c.run_id.in_(run_ids)
856+
)
857+
835858
# Extract the rows to archive.
836-
sel = sa.select(table).where(table.c.run_id.in_(run_ids))
837-
res = src_tx.execute(sel)
859+
res = src_tx.execute(sa.select(table).where(row_pred))
838860
rows = tuple(res.mappings())
839861

840862
# Write the rows to the archive.
@@ -852,10 +874,7 @@ def archive(self, path, run_ids):
852874
f"archive {table} contains {count} rows"
853875

854876
# Remove the rows from the source table.
855-
res = src_tx.execute(
856-
sa.delete(table)
857-
.where(table.c.run_id.in_(run_ids))
858-
)
877+
res = src_tx.execute(sa.delete(table).where(row_pred))
859878
assert res.rowcount == len(rows)
860879

861880
# Keep count of how many rows we archived from each table.

0 commit comments

Comments
 (0)