Skip to content

Commit 6b55fa4

Browse files
committed
Select rows by runs.rowid in archive.
1 parent 160d241 commit 6b55fa4

File tree

1 file changed

+36
-17
lines changed

1 file changed

+36
-17
lines changed

python/apsis/sqlite.py

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,19 @@ def disposing(engine):
5050
engine.dispose()
5151

5252

53+
def _make_run_id(rowid):
54+
return f"r{rowid}"
55+
56+
57+
def _parse_run_id(run_id):
58+
assert run_id[0] == "r"
59+
return int(run_id[1 :])
60+
61+
62+
# SQLite implicitly includes a 'rowid' column in each table, which SA doesn't
63+
# automatically include in the schema.
64+
COL_ROWID = sa.literal_column("rowid")
65+
5366
METADATA = sa.MetaData()
5467

5568
#-------------------------------------------------------------------------------
@@ -172,7 +185,7 @@ def __init__(self, engine):
172185

173186

174187
def get_next_run_id(self):
175-
run_id = f"r{self.__next}"
188+
run_id = _make_run_id(self.__next)
176189
self.__next += 1
177190
if self.__db_next < self.__next:
178191
# Increment by more than one, to decrease database writes. If we
@@ -333,9 +346,8 @@ def __query_runs(conn, expr):
333346

334347

335348
def upsert(self, run):
336-
assert run.run_id.startswith("r")
337349
assert not run.expected
338-
rowid = int(run.run_id[1 :])
350+
rowid = _parse_run_id(run.run_id)
339351

340352
program = (
341353
None if run.program is None
@@ -768,14 +780,16 @@ def get_archive_run_ids(self, *, before, count):
768780
Timer() as timer,
769781
self.__engine.begin() as tx,
770782
):
771-
run_ids = [
772-
r for r, in tx.execute(
773-
sa.select([TBL_RUNS.c.run_id])
774-
.where(TBL_RUNS.c.timestamp < dump_time(before))
775-
.where(TBL_RUNS.c.state.in_(FINISHED_STATES))
776-
.limit(count)
777-
)
778-
]
783+
res = list(tx.execute(
784+
sa.select([TBL_RUNS.c.run_id, COL_ROWID])
785+
.where(TBL_RUNS.c.timestamp < dump_time(before))
786+
.where(TBL_RUNS.c.state.in_(FINISHED_STATES))
787+
.limit(count)
788+
))
789+
790+
# Make sure rowids and run_ids correspond.
791+
assert all( rowid == _parse_run_id(run_id) for run_id, rowid in res )
792+
run_ids = [ r for r, _ in res ]
779793

780794
log.info(
781795
f"obtained {len(run_ids)} runs to archive in {timer.elapsed:.3f} s")
@@ -793,6 +807,8 @@ def archive(self, path, run_ids):
793807
:param run_ids:
794808
Sequence of run IDs to archive.
795809
"""
810+
rowids = [ _parse_run_id(i) for i in run_ids ]
811+
796812
# Open the archive file, creating if necessary.
797813
archive_engine = self.__get_engine(path)
798814
# Create tables if necessary.
@@ -827,9 +843,15 @@ def archive(self, path, run_ids):
827843
)
828844

829845
for table in ARCHIVE_TABLES:
846+
# Predicate for rows to select. In the runs table, query
847+
# by rowid instead of run_id, for performance.
848+
row_pred = (
849+
COL_ROWID.in_(rowids) if table is TBL_RUNS
850+
else table.c.run_id.in_(run_ids)
851+
)
852+
830853
# Extract the rows to archive.
831-
sel = sa.select(table).where(table.c.run_id.in_(run_ids))
832-
res = src_tx.execute(sel)
854+
res = src_tx.execute(sa.select(table).where(row_pred))
833855
rows = tuple(res.mappings())
834856

835857
# Write the rows to the archive.
@@ -847,10 +869,7 @@ def archive(self, path, run_ids):
847869
f"archive {table} contains {count} rows"
848870

849871
# Remove the rows from the source table.
850-
res = src_tx.execute(
851-
sa.delete(table)
852-
.where(table.c.run_id.in_(run_ids))
853-
)
872+
res = src_tx.execute(sa.delete(table).where(row_pred))
854873
assert res.rowcount == len(rows)
855874

856875
# Keep count of how many rows we archived from each table.

0 commit comments

Comments
 (0)