Skip to content

Commit 0ca8f81

Browse files
committed
Fix handling of PK sequence value
Prior to this change, the current value of the table's PK sequence wasn't transferred to the repacked table's PK sequence. Without that, new rows would be inserted into the repacked table starting with a PK value of 1 (which would likely conflict with existing rows). This change adds a command during swap to transfer the sequence value from one table to the other. There is also special handling for the edge case of a bigint conversion where the PK sequence has been set to negative (presumably after positive PK values were exhausted); in that case, the sequence is reset to the first positive bigint value not within the integer range, which is 2**31. Note that this edge case won't be handled correctly during a revert_swap(); addressing that is left as a TODO for later.
1 parent db7a419 commit 0ca8f81

File tree

5 files changed

+96
-3
lines changed

5 files changed

+96
-3
lines changed

src/psycopack/_commands.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,30 @@ def swap_pk_sequence_name(self, *, first_table: str, second_table: str) -> None:
486486
self.rename_sequence(seq_from=second_seq, seq_to=first_seq)
487487
self.rename_sequence(seq_from=temp_seq, seq_to=second_seq)
488488

489+
def transfer_pk_sequence_value(
490+
self, *, source_table: str, dest_table: str, convert_pk_to_bigint: bool
491+
) -> None:
492+
source_seq = self.introspector.get_pk_sequence_name(table=source_table)
493+
dest_seq = self.introspector.get_pk_sequence_name(table=dest_table)
494+
value = self.introspector.get_pk_sequence_value(seq=source_seq)
495+
496+
if convert_pk_to_bigint and value < 0:
497+
# special case handling where negative PK values were used before bigint conversion
498+
value = 2**31 # reset to positive, specifically the first bigint value
499+
500+
# TODO: try to correctly restore a negative PK sequence value if we revert swap
501+
# while doing a bigint conversion
502+
503+
self.cur.execute(
504+
psycopg.sql.SQL("SELECT setval('{schema}.{sequence}', {value});")
505+
.format(
506+
schema=psycopg.sql.Identifier(self.schema),
507+
sequence=psycopg.sql.Identifier(dest_seq),
508+
value=psycopg.sql.SQL(str(value)),
509+
)
510+
.as_string(self.conn)
511+
)
512+
489513
def acquire_access_exclusive_lock(self, *, table: str) -> None:
490514
self.cur.execute(
491515
psycopg.sql.SQL("LOCK TABLE {schema}.{table} IN ACCESS EXCLUSIVE MODE;")

src/psycopack/_introspect.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,21 @@ def get_pk_sequence_name(self, *, table: str) -> str:
613613
assert isinstance(seq, str)
614614
return seq
615615

616+
def get_pk_sequence_value(self, *, seq: str) -> int:
617+
self.cur.execute(
618+
psycopg.sql.SQL("SELECT last_value FROM {schema}.{sequence};")
619+
.format(
620+
schema=psycopg.sql.Identifier(self.schema),
621+
sequence=psycopg.sql.Identifier(seq),
622+
)
623+
.as_string(self.conn)
624+
)
625+
result = self.cur.fetchone()
626+
assert result is not None
627+
value = result[0]
628+
assert isinstance(value, int)
629+
return value
630+
616631
def get_backfill_batch(self, *, table: str) -> BackfillBatch | None:
617632
self.cur.execute(
618633
psycopg.sql.SQL(

src/psycopack/_repack.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,11 @@ def swap(self) -> None:
381381
self.command.swap_pk_sequence_name(
382382
first_table=self.table, second_table=self.copy_table
383383
)
384+
self.command.transfer_pk_sequence_value(
385+
source_table=self.table,
386+
dest_table=self.copy_table,
387+
convert_pk_to_bigint=self.convert_pk_to_bigint,
388+
)
384389
self.command.rename_table(
385390
table_from=self.table, table_to=self.repacked_name
386391
)
@@ -434,6 +439,12 @@ def revert_swap(self) -> None:
434439
self.command.swap_pk_sequence_name(
435440
first_table=self.table, second_table=self.repacked_name
436441
)
442+
self.command.transfer_pk_sequence_value(
443+
source_table=self.table,
444+
dest_table=self.repacked_name,
445+
convert_pk_to_bigint=self.convert_pk_to_bigint,
446+
)
447+
437448
self.command.rename_table(table_from=self.table, table_to=self.copy_table)
438449
self.command.rename_table(
439450
table_from=self.repacked_name, table_to=self.table

tests/factories.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def create_table_for_repacking(
1313
with_exclusion_constraint: bool = False,
1414
pk_type: str = "SERIAL",
1515
pk_name: str = "id",
16+
pk_start: int = 1,
1617
ommit_sequence: bool = False,
1718
schema: str = "public",
1819
) -> None:
@@ -45,7 +46,9 @@ def create_table_for_repacking(
4546
):
4647
# Create a sequence manually.
4748
seq = f"{table_name}_seq"
48-
cur.execute(f"CREATE SEQUENCE {schema}.{seq};")
49+
cur.execute(
50+
f"CREATE SEQUENCE {schema}.{seq} MINVALUE {pk_start} START WITH {pk_start};"
51+
)
4952
pk_type = f"{pk_type} DEFAULT NEXTVAL('{schema}.{seq}')"
5053

5154
cur.execute(
@@ -191,7 +194,7 @@ def create_table_for_repacking(
191194
cur.execute(
192195
dedent(f"""
193196
INSERT INTO {schema}.referring_table ({table_name}_{pk_name})
194-
SELECT generate_series(1, {referring_table_rows});
197+
SELECT generate_series({pk_start}, {pk_start + referring_table_rows - 1});
195198
""")
196199
)
197200
cur.execute(
@@ -213,7 +216,7 @@ def create_table_for_repacking(
213216
cur.execute(
214217
dedent(f"""
215218
INSERT INTO {schema}.not_valid_referring_table ({table_name}_{pk_name})
216-
SELECT generate_series(1, {referring_table_rows});
219+
SELECT generate_series({pk_start}, {pk_start + referring_table_rows - 1});
217220
""")
218221
)
219222

tests/test_repack.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class _TableInfo:
3939
referring_fks: list[_introspect.ReferringForeignKey]
4040
constraints: list[_introspect.Constraint]
4141
pk_seq: str
42+
pk_seq_val: int | None
4243

4344

4445
def _collect_table_info(
@@ -60,13 +61,18 @@ def _collect_table_info(
6061
table=table, types=["c", "f", "n", "p", "u", "t", "x"]
6162
)
6263
pk_seq = introspector.get_pk_sequence_name(table=table)
64+
if pk_seq:
65+
pk_seq_val = introspector.get_pk_sequence_value(seq=pk_seq)
66+
else:
67+
pk_seq_val = None
6368

6469
return _TableInfo(
6570
oid=oid,
6671
indexes=indexes,
6772
referring_fks=referring_fks,
6873
constraints=constraints,
6974
pk_seq=pk_seq,
75+
pk_seq_val=pk_seq_val,
7076
)
7177

7278

@@ -127,6 +133,10 @@ def _assert_repack(
127133
assert table_before.referring_fks == table_after.referring_fks
128134
assert table_before.constraints == table_after.constraints
129135
assert table_before.pk_seq == table_after.pk_seq
136+
if table_before.pk_seq_val is None or table_before.pk_seq_val > 0:
137+
assert table_before.pk_seq_val == table_after.pk_seq_val
138+
else:
139+
assert table_after.pk_seq_val == 2**31
130140

131141
# All functions and triggers are removed.
132142
trigger_info = _get_trigger_info(repack, cur)
@@ -1283,6 +1293,36 @@ def test_repack_full_with_serial_pk(
12831293
)
12841294

12851295

1296+
def test_when_table_has_negative_pk_values(
1297+
connection: _psycopg.Connection,
1298+
) -> None:
1299+
with _cur.get_cursor(connection, logged=True) as cur:
1300+
factories.create_table_for_repacking(
1301+
connection=connection,
1302+
cur=cur,
1303+
table_name="to_repack",
1304+
rows=100,
1305+
pk_type="integer",
1306+
pk_start=-200,
1307+
)
1308+
table_before = _collect_table_info(table="to_repack", connection=connection)
1309+
repack = Psycopack(
1310+
table="to_repack",
1311+
batch_size=1,
1312+
conn=connection,
1313+
cur=cur,
1314+
convert_pk_to_bigint=True,
1315+
)
1316+
repack.full()
1317+
table_after = _collect_table_info(table="to_repack", connection=connection)
1318+
_assert_repack(
1319+
table_before=table_before,
1320+
table_after=table_after,
1321+
repack=repack,
1322+
cur=cur,
1323+
)
1324+
1325+
12861326
def test_when_table_has_large_value_being_inserted(
12871327
connection: _psycopg.Connection,
12881328
) -> None:

0 commit comments

Comments
 (0)