Skip to content

Commit cdcd265

Browse files
authored
Merge pull request #41 from kraken-tech/backfill-positive-negative
Set up backfill log with separate positive and negative ranges
2 parents 37def79 + 801b0e2 commit cdcd265

File tree

4 files changed

+101
-7
lines changed

4 files changed

+101
-7
lines changed

src/psycopack/_introspect.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,25 @@ def get_table_columns(self, *, table: str) -> list[str]:
123123
return [r[0] for r in self.cur.fetchall()]
124124

125125
def get_min_and_max_pk(
126-
self, *, table: str, pk_column: str
126+
self, *, table: str, pk_column: str, positive: bool
127127
) -> tuple[int, int] | None:
128+
if positive:
129+
# positive range
130+
where_sql = "WHERE {pk_column} >= 0;"
131+
else:
132+
# negative range
133+
where_sql = "WHERE {pk_column} < 0;"
128134
self.cur.execute(
129135
psycopg.sql.SQL(
130-
dedent("""
131-
SELECT
132-
MIN({pk_column}) AS min_pk,
133-
MAX({pk_column}) AS max_pk
134-
FROM {schema}.{table};
135-
""")
136+
dedent(
137+
"""
138+
SELECT
139+
MIN({pk_column}) AS min_pk,
140+
MAX({pk_column}) AS max_pk
141+
FROM {schema}.{table}
142+
"""
143+
+ where_sql
144+
)
136145
)
137146
.format(
138147
table=psycopg.sql.Identifier(table),

src/psycopack/_repack.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,9 +627,26 @@ def _create_backfill_log(self) -> None:
627627
self.command.create_backfill_log(table=self.backfill_log)
628628

629629
def _populate_backfill_log(self) -> None:
630+
# positive pk values
630631
min_and_max = self.introspector.get_min_and_max_pk(
631632
table=self.table,
632633
pk_column=self.pk_column,
634+
positive=True,
635+
)
636+
if min_and_max is not None:
637+
min_pk, max_pk = min_and_max
638+
self.command.populate_backfill_log(
639+
table=self.backfill_log,
640+
batch_size=self.batch_size,
641+
min_pk=min_pk,
642+
max_pk=max_pk,
643+
)
644+
645+
# negative pk values
646+
min_and_max = self.introspector.get_min_and_max_pk(
647+
table=self.table,
648+
pk_column=self.pk_column,
649+
positive=False,
633650
)
634651
if min_and_max is not None:
635652
min_pk, max_pk = min_and_max

tests/factories.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,3 +216,34 @@ def create_table_for_repacking(
216216
SELECT generate_series(1, {referring_table_rows});
217217
""")
218218
)
219+
220+
221+
def create_table_for_backfilling(
222+
cur: _cur.Cursor,
223+
table_name: str = "to_backfill",
224+
positive_rows: int = 100,
225+
negative_rows: int = 100,
226+
) -> None:
227+
cur.execute(
228+
dedent(f"""
229+
CREATE TABLE {table_name} (
230+
id SERIAL PRIMARY KEY
231+
);
232+
""")
233+
)
234+
235+
if positive_rows > 0:
236+
cur.execute(
237+
dedent(f"""
238+
INSERT INTO {table_name} (id) SELECT generate_series(1, {positive_rows});
239+
""")
240+
)
241+
242+
if negative_rows > 0:
243+
negative_base = -1000 # arbitrary
244+
cur.execute(
245+
dedent(f"""
246+
INSERT INTO {table_name} (id)
247+
SELECT generate_series({negative_base}, {negative_base} + {negative_rows} - 1);
248+
""")
249+
)

tests/test_repack.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,3 +1989,40 @@ def test_when_repack_is_reinstantiated_after_swapping(
19891989
repack=repack,
19901990
cur=cur,
19911991
)
1992+
1993+
1994+
@pytest.mark.parametrize(
1995+
"positive_rows, negative_rows, expected_ranges",
1996+
[
1997+
(100, 0, [(1, 50), (51, 100)]),
1998+
(0, 100, [(-1000, -951), (-950, -901)]),
1999+
(100, 100, [(1, 50), (51, 100), (-1000, -951), (-950, -901)]),
2000+
],
2001+
)
2002+
def test_populate_backfill_log(
2003+
connection: _psycopg.Connection,
2004+
positive_rows: int,
2005+
negative_rows: int,
2006+
expected_ranges: list[tuple[int, int]],
2007+
) -> None:
2008+
with _cur.get_cursor(connection, logged=True) as cur:
2009+
factories.create_table_for_backfilling(
2010+
cur=cur,
2011+
positive_rows=positive_rows,
2012+
negative_rows=negative_rows,
2013+
)
2014+
repack = Psycopack(
2015+
table="to_backfill",
2016+
batch_size=50,
2017+
conn=connection,
2018+
cur=cur,
2019+
)
2020+
repack.pre_validate()
2021+
2022+
repack.setup_repacking() # calls self._populate_backfill_log()
2023+
2024+
cur.execute(
2025+
f"SELECT batch_start, batch_end FROM {repack.backfill_log} ORDER BY id;"
2026+
)
2027+
result = cur.fetchall()
2028+
assert result == expected_ranges

0 commit comments

Comments
 (0)