Skip to content

Commit feb83ec

Browse files
committed
Ensure reset drops change log artefacts
1 parent b176068 commit feb83ec

File tree

2 files changed

+57
-4
lines changed

2 files changed

+57
-4
lines changed

src/psycopack/_repack.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,19 @@ def reset(self) -> None:
667667
self.command.drop_sequence_if_exists(seq=self.id_seq)
668668
self.command.drop_table_if_exists(table=self.tracker.tracker_table)
669669

670+
if self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG:
671+
# The change log trigger and function have already been
672+
# dropped during the schema sync stage. The table is the
673+
# only artefact remaining.
674+
assert self.change_log is not None
675+
assert self.change_log_trigger is not None
676+
assert self.change_log_function is not None
677+
self.command.drop_table_if_exists(table=self.change_log)
678+
self.command.drop_trigger_if_exists(
679+
table=self.table, trigger=self.change_log_trigger
680+
)
681+
self.command.drop_function_if_exists(function=self.change_log_function)
682+
670683
def _create_copy_table(self) -> None:
671684
# Checks if other relating objects have FKs pointing to the copy table
672685
# first. Deletes them (if any) as they might have been created by a

tests/test_repack.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,32 +84,53 @@ def _collect_table_info(
8484
class _TriggerInfo:
8585
trigger_exists: bool
8686
repacked_trigger_exists: bool
87+
change_log_trigger_exists: bool
8788

8889

8990
def _get_trigger_info(repack: Psycopack, cur: _cur.Cursor) -> _TriggerInfo:
9091
cur.execute(f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.trigger}'")
9192
trigger_exists = cur.fetchone() is not None
9293
cur.execute(f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.repacked_trigger}'")
94+
repacked_trigger_exists = cur.fetchone() is not None
95+
if repack.change_log is not None:
96+
cur.execute(
97+
f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.change_log_trigger}'"
98+
)
99+
change_log_trigger_exists = cur.fetchone() is not None
100+
else:
101+
change_log_trigger_exists = False
102+
93103
repacked_trigger_exists = cur.fetchone() is not None
94104
return _TriggerInfo(
95-
trigger_exists=trigger_exists, repacked_trigger_exists=repacked_trigger_exists
105+
trigger_exists=trigger_exists,
106+
repacked_trigger_exists=repacked_trigger_exists,
107+
change_log_trigger_exists=change_log_trigger_exists,
96108
)
97109

98110

99111
@dataclasses.dataclass
100112
class _FunctionInfo:
101113
function_exists: bool
102114
repacked_function_exists: bool
115+
change_log_function_exists: bool
103116

104117

105118
def _get_function_info(repack: Psycopack, cur: _cur.Cursor) -> _FunctionInfo:
106119
cur.execute(f"SELECT 1 FROM pg_proc WHERE proname = '{repack.function}'")
107120
function_exists = cur.fetchone() is not None
108121
cur.execute(f"SELECT 1 FROM pg_proc WHERE proname = '{repack.repacked_function}'")
109122
repacked_function_exists = cur.fetchone() is not None
123+
if repack.change_log_function is not None:
124+
cur.execute(
125+
f"SELECT 1 FROM pg_proc WHERE proname = '{repack.change_log_function}'"
126+
)
127+
change_log_function_exists = cur.fetchone() is not None
128+
else:
129+
change_log_function_exists = False
110130
return _FunctionInfo(
111131
function_exists=function_exists,
112132
repacked_function_exists=repacked_function_exists,
133+
change_log_function_exists=change_log_function_exists,
113134
)
114135

115136

@@ -162,12 +183,21 @@ def _assert_repack(
162183

163184

164185
def _assert_reset(repack: Psycopack, cur: _cur.Cursor) -> None:
165-
assert _get_trigger_info(repack, cur).trigger_exists is False
166-
assert _get_function_info(repack, cur).function_exists is False
186+
trigger_info = _get_trigger_info(repack, cur)
187+
assert trigger_info.trigger_exists is False
188+
189+
function_info = _get_function_info(repack, cur)
190+
assert function_info.function_exists is False
167191
assert _get_sequence_info(repack, cur).sequence_exists is False
168192
assert repack.introspector.get_table_oid(table=repack.copy_table) is None
169193
assert repack.introspector.get_table_oid(table=repack.tracker.tracker_table) is None
170194

195+
if repack.sync_strategy == SyncStrategy.CHANGE_LOG:
196+
assert trigger_info.change_log_trigger_exists is False
197+
assert function_info.change_log_function_exists is False
198+
assert repack.change_log is not None
199+
assert repack.introspector.get_table_oid(table=repack.change_log) is None
200+
171201

172202
def _do_writes(
173203
table: str,
@@ -1785,7 +1815,11 @@ def side_effect(*args: object, **kwargs: object) -> None:
17851815
)
17861816

17871817

1788-
def test_reset(connection: _psycopg.Connection) -> None:
1818+
@pytest.mark.parametrize(
1819+
"sync_strategy",
1820+
[SyncStrategy.DIRECT_TRIGGER, SyncStrategy.CHANGE_LOG],
1821+
)
1822+
def test_reset(connection: _psycopg.Connection, sync_strategy: SyncStrategy) -> None:
17891823
with _cur.get_cursor(connection, logged=True) as cur:
17901824
factories.create_table_for_repacking(
17911825
connection=connection,
@@ -1794,12 +1828,18 @@ def test_reset(connection: _psycopg.Connection) -> None:
17941828
rows=100,
17951829
)
17961830
table_before = _collect_table_info(table="to_repack", connection=connection)
1831+
17971832
repack = Psycopack(
17981833
table="to_repack",
17991834
batch_size=1,
18001835
conn=connection,
18011836
cur=cur,
1837+
sync_strategy=sync_strategy,
1838+
change_log_batch_size=10
1839+
if sync_strategy == SyncStrategy.CHANGE_LOG
1840+
else None,
18021841
)
1842+
18031843
# Psycopack hasn't run yet, no reason to reset.
18041844
with pytest.raises(InvalidStageForReset, match="Psycopack hasn't run yet"):
18051845
repack.reset()

0 commit comments

Comments
 (0)