Skip to content

Commit d5d6a51

Browse files
Make sure inactive hot physical replication slots don't hold xmin (patroni#3148)
Since `3.2.0` Patroni is able to create physical replication slots on replica nodes just for the case if this node at some moment will become the primary. There are two potential problems of having such slots: 1. They prevent recycling of WAL files. 2. They may affect vacuum on the primary is hot_standby_feedback is enabled. The first class of issues is already addressed by periodically calling pg_replication_slot_advance() function. However the second class of issues doesn't happen instantly, but only when the old primary switched to a replica. In this case physical replication slots that were at some moment activate will hold NOT NULL value of `xmin`, which will be propagated to the primary via hot_standby_feedback mechanism. To address the second problem we will detect that a physical replication slot is not supposed to be active, but having NOT NULL `xmin` and drop/crecreate it. Close patroni#3146 Close patroni#3153 Co-authored-by: Polina Bungina <[email protected]>
1 parent 2f80017 commit d5d6a51

File tree

7 files changed

+122
-30
lines changed

7 files changed

+122
-30
lines changed

features/permanent_slots.feature

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,7 @@ Feature: permanent slots
8181
Then postgres1 has a physical replication slot named test_physical after 10 seconds
8282
And postgres1 has a physical replication slot named postgres0 after 10 seconds
8383
And postgres1 has a physical replication slot named postgres3 after 10 seconds
84+
When I start postgres0
85+
Then postgres0 role is the replica after 20 seconds
86+
And physical replication slot named postgres1 on postgres0 has no xmin value after 10 seconds
87+
And physical replication slot named postgres2 on postgres0 has no xmin value after 10 seconds

features/steps/slots.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,26 @@ def has_physical_replication_slot(context, pg_name, slot_name, time_limit):
9494
assert False, f"Physical slot {slot_name} doesn't exist after {time_limit} seconds"
9595

9696

97+
@step('physical replication slot named {slot_name} on {pg_name:w} has no xmin value after {time_limit:d} seconds')
98+
def physical_slot_no_xmin(context, pg_name, slot_name, time_limit):
99+
time_limit *= context.timeout_multiplier
100+
max_time = time.time() + int(time_limit)
101+
query = "SELECT xmin FROM pg_catalog.pg_replication_slots WHERE slot_type = 'physical'"
102+
f" AND slot_name = '{slot_name}'"
103+
exists = False
104+
while time.time() < max_time:
105+
try:
106+
row = context.pctl.query(pg_name, query).fetchone()
107+
exists = bool(row)
108+
if exists and row[0] is None:
109+
return
110+
except Exception:
111+
pass
112+
time.sleep(1)
113+
assert False, f"Physical slot {slot_name} doesn't exist after {time_limit} seconds" if not exists \
114+
else f"Physical slot {slot_name} has xmin value after {time_limit} seconds"
115+
116+
97117
@step('"{name}" key in DCS has {subkey:w} in {key:w}')
98118
def dcs_key_contains(context, name, subkey, key):
99119
response = json.loads(context.dcs_ctl.query(name))

patroni/dcs/__init__.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,7 +1068,7 @@ def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slo
10681068

10691069
if value['type'] == 'physical':
10701070
# Don't try to create permanent physical replication slot for yourself
1071-
if slot_name != slot_name_from_member_name(name):
1071+
if slot_name not in slots and slot_name != slot_name_from_member_name(name):
10721072
slots[slot_name] = value
10731073
continue
10741074

@@ -1146,17 +1146,40 @@ def _get_members_slots(self, name: str, role: str, nofailover: bool,
11461146
# also exlude members with disabled WAL streaming
11471147
members = filter(lambda m: m.name != name and not m.nostream, self.members)
11481148

1149+
def leader_filter(member: Member) -> bool:
1150+
"""Check whether provided *member* should replicate from the current node when it is running as a leader.
1151+
1152+
:param member: a :class:`Member` object.
1153+
1154+
:returns: ``True`` if provided member should replicate from the current node, ``False`` otherwise.
1155+
"""
1156+
return member.replicatefrom is None or\
1157+
member.replicatefrom == name or\
1158+
not self.has_member(member.replicatefrom)
1159+
1160+
def replica_filter(member: Member) -> bool:
1161+
"""Check whether provided *member* should replicate from the current node when it is running as a replica.
1162+
1163+
..note::
1164+
We only consider members with ``replicatefrom`` tag that matches our name and always exclude the leader.
1165+
1166+
:param member: a :class:`Member` object.
1167+
1168+
:returns: ``True`` if provided member should replicate from the current node, ``False`` otherwise.
1169+
"""
1170+
return member.replicatefrom == name and member.name != self.leader_name
1171+
1172+
# In case when retention of replication slots is possible the `expected_active` function
1173+
# will be used to figure out whether the replication slot is expected to be active.
1174+
# Otherwise it will be used to find replication slots that should exist on a current node.
1175+
expected_active = leader_filter if role in ('primary', 'standby_leader') else replica_filter
1176+
11491177
if can_advance_slots and global_config.member_slots_ttl > 0:
11501178
# if the node does only cascading and can't become the leader, we
11511179
# want only to have slots for members that could connect to it.
11521180
members = [m for m in members if not nofailover or m.replicatefrom == name]
1153-
elif role in ('primary', 'standby_leader'): # PostgreSQL is older than 11
1154-
# on the leader want to have slots only for the nodes that are supposed to be replicating from it.
1155-
members = [m for m in members if m.replicatefrom is None
1156-
or m.replicatefrom == name or not self.has_member(m.replicatefrom)]
11571181
else:
1158-
# only manage slots for replicas that replicate from this one, except for the leader among them
1159-
members = [m for m in members if m.replicatefrom == name and m.name != self.leader_name]
1182+
members = [m for m in members if expected_active(m)]
11601183

11611184
slots: Dict[str, int] = self.slots
11621185
ret: Dict[str, Dict[str, Any]] = {}
@@ -1169,7 +1192,7 @@ def _get_members_slots(self, name: str, role: str, nofailover: bool,
11691192
# reported by the member to advance replication slot LSN.
11701193
# `max` is only a fallback so we take the LSN from the slot when there is no feedback from the member.
11711194
lsn = max(member.lsn or 0, lsn)
1172-
ret[slot_name] = {'type': 'physical', 'lsn': lsn}
1195+
ret[slot_name] = {'type': 'physical', 'lsn': lsn, 'expected_active': expected_active(member)}
11731196
slot_name = slot_name_from_member_name(name)
11741197
ret.update({slot: {'type': 'physical'} for slot in self.status.retain_slots
11751198
if slot not in ret and slot != slot_name})

patroni/postgresql/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ def cluster_info_query(self) -> str:
230230
", " + ("(SELECT pg_catalog.json_agg(s.*) FROM (SELECT slot_name, slot_type as type, datoid::bigint, "
231231
"plugin, catalog_xmin, pg_catalog.pg_wal_lsn_diff(confirmed_flush_lsn, '0/0')::bigint"
232232
" AS confirmed_flush_lsn, pg_catalog.pg_wal_lsn_diff(restart_lsn, '0/0')::bigint"
233-
" AS restart_lsn FROM pg_catalog.pg_get_replication_slots()) AS s)"
233+
" AS restart_lsn, xmin FROM pg_catalog.pg_get_replication_slots()) AS s)"
234234
if self._should_query_slots and self.can_advance_slots else "NULL") + extra
235235
extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END,"
236236
" slot_name, conninfo, status, {0} FROM pg_catalog.pg_stat_get_wal_receiver()").format(extra)

patroni/postgresql/slots.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ def process_permanent_slots(self, slots: List[Dict[str, Any]]) -> Dict[str, int]
246246
ret[name] = value['confirmed_flush_lsn']
247247
self._copy_items(value, self._replication_slots[name])
248248
else:
249-
self._replication_slots[name]['restart_lsn'] = ret[name] = value['restart_lsn']
249+
ret[name] = value['restart_lsn']
250+
self._copy_items(value, self._replication_slots[name], ('restart_lsn', 'xmin'))
250251
else:
251252
self._schedule_load_slots = True
252253

@@ -273,15 +274,16 @@ def load_replication_slots(self) -> None:
273274
extra = f", catalog_xmin, {pg_wal_lsn_diff}(confirmed_flush_lsn, '0/0')::bigint" \
274275
if self._postgresql.major_version >= 100000 else ""
275276
skip_temp_slots = ' WHERE NOT temporary' if self._postgresql.major_version >= 100000 else ''
276-
for r in self._query(f"SELECT slot_name, slot_type, {pg_wal_lsn_diff}(restart_lsn, '0/0')::bigint, plugin,"
277-
f" database, datoid{extra} FROM pg_catalog.pg_replication_slots{skip_temp_slots}"):
277+
for r in self._query("SELECT slot_name, slot_type, xmin, "
278+
f"{pg_wal_lsn_diff}(restart_lsn, '0/0')::bigint, plugin, database, datoid{extra}"
279+
f" FROM pg_catalog.pg_replication_slots{skip_temp_slots}"):
278280
value = {'type': r[1]}
279281
if r[1] == 'logical':
280-
value.update(plugin=r[3], database=r[4], datoid=r[5])
282+
value.update(plugin=r[4], database=r[5], datoid=r[6])
281283
if self._postgresql.major_version >= 100000:
282-
value.update(catalog_xmin=r[6], confirmed_flush_lsn=r[7])
284+
value.update(catalog_xmin=r[7], confirmed_flush_lsn=r[8])
283285
else:
284-
value['restart_lsn'] = r[2]
286+
value.update(xmin=r[2], restart_lsn=r[3])
285287
replication_slots[r[0]] = value
286288
self._replication_slots = replication_slots
287289
self._schedule_load_slots = False
@@ -375,6 +377,29 @@ def _ensure_physical_slots(self, slots: Dict[str, Any]) -> None:
375377
for name, value in slots.items():
376378
if value['type'] != 'physical':
377379
continue
380+
# First we want to detect physical replication slots that are not
381+
# expected to be active but have NOT NULL xmin value and drop them.
382+
# As the slot is not expected to be active, nothing would be consuming this
383+
# slot, consequently no hot-standby feedback messages would be received
384+
# by Postgres regarding this slot. In that case, the `xmin` value would never
385+
# change, which would prevent Postgres from advancing the xmin horizon.
386+
if self._postgresql.can_advance_slots and name in self._replication_slots and\
387+
self._replication_slots[name]['type'] == 'physical':
388+
self._copy_items(self._replication_slots[name], value, ('restart_lsn', 'xmin'))
389+
if value.get('expected_active') is False and value['xmin']:
390+
logger.warning('Dropping physical replication slot %s because of its xmin value %s',
391+
name, value['xmin'])
392+
active, dropped = self.drop_replication_slot(name)
393+
if dropped:
394+
self._replication_slots.pop(name)
395+
else:
396+
self._schedule_load_slots = True
397+
if active:
398+
logger.warning("Unable to drop replication slot '%s', slot is active", name)
399+
else:
400+
logger.error("Failed to drop replication slot '%s'", name)
401+
402+
# Now we will create physical replication slots that are missing.
378403
if name not in self._replication_slots:
379404
try:
380405
self._query(f"SELECT pg_catalog.pg_create_physical_replication_slot(%s{immediately_reserve})"
@@ -384,8 +409,9 @@ def _ensure_physical_slots(self, slots: Dict[str, Any]) -> None:
384409
except Exception:
385410
logger.exception("Failed to create physical replication slot '%s'", name)
386411
self._schedule_load_slots = True
387-
elif self._postgresql.can_advance_slots and self._replication_slots[name]['type'] == 'physical':
388-
value['restart_lsn'] = self._replication_slots[name]['restart_lsn']
412+
# And advance restart_lsn on physical replication slots that are not expected to be active.
413+
elif self._postgresql.can_advance_slots and self._replication_slots[name]['type'] == 'physical' and\
414+
value.get('expected_active') is not True and not value['xmin']:
389415
lsn = value.get('lsn')
390416
if lsn and lsn > value['restart_lsn']: # The slot has feedback in DCS and needs to be advanced
391417
try:

tests/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ def execute(self, sql, *params):
129129
elif sql.startswith('SELECT slot_name, slot_type, datname, plugin, catalog_xmin'):
130130
self.results = [('ls', 'logical', 'a', 'b', 100, 500, b'123456')]
131131
elif sql.startswith('SELECT slot_name'):
132-
self.results = [('blabla', 'physical', 12345),
133-
('foobar', 'physical', 12345),
134-
('ls', 'logical', 499, 'b', 'a', 5, 100, 500)]
132+
self.results = [('blabla', 'physical', 1, 12345),
133+
('foobar', 'physical', 1, 12345),
134+
('ls', 'logical', 1, 499, 'b', 'a', 5, 100, 500)]
135135
elif sql.startswith('WITH slots AS (SELECT slot_name, active'):
136136
self.results = [(False, True)] if self.rowcount == 1 else []
137137
elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'):

tests/test_slots.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,16 @@ def test_process_permanent_slots(self):
110110
self.p.reset_cluster_info_state(None)
111111
mock_query.return_value = [(
112112
1, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
113-
[{"slot_name": "ls", "type": "logical", "datoid": 5, "plugin": "b",
113+
[{"slot_name": "ls", "type": "logical", "datoid": 5, "plugin": "b", "xmin": 105,
114114
"confirmed_flush_lsn": 12345, "catalog_xmin": 105, "restart_lsn": 12344},
115-
{"slot_name": "blabla", "type": "physical", "datoid": None, "plugin": None,
115+
{"slot_name": "blabla", "type": "physical", "datoid": None, "plugin": None, "xmin": 105,
116116
"confirmed_flush_lsn": None, "catalog_xmin": 105, "restart_lsn": 12344}])]
117117
self.assertEqual(self.p.slots(), {'ls': 12345, 'blabla': 12344, 'postgresql0': 0})
118118

119119
self.p.reset_cluster_info_state(None)
120120
mock_query.return_value = [(
121121
1, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
122-
[{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b",
122+
[{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b", "xmin": 105,
123123
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])]
124124
self.assertEqual(self.p.slots(), {'postgresql0': 0})
125125

@@ -151,7 +151,8 @@ def test_nostream_slot_processing(self):
151151
{'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}})
152152
self.assertEqual(
153153
cluster._get_members_slots(self.p.name, 'primary', False, True),
154-
{'test_3': {'type': 'physical', 'lsn': 98}, 'test_4': {'type': 'physical', 'lsn': 98}})
154+
{'test_3': {'type': 'physical', 'lsn': 98, 'expected_active': False},
155+
'test_4': {'type': 'physical', 'lsn': 98, 'expected_active': True}})
155156

156157
# nostream node must not have slot on primary
157158
self.p.name = nostream_node.name
@@ -166,9 +167,9 @@ def test_nostream_slot_processing(self):
166167
# check cascade member-slot existence on nostream node
167168
self.assertEqual(
168169
cluster._get_members_slots(nostream_node.name, 'replica', False, True),
169-
{'leader': {'type': 'physical', 'lsn': 99},
170-
'test_3': {'type': 'physical', 'lsn': 98},
171-
'test_4': {'type': 'physical', 'lsn': 98}})
170+
{'leader': {'type': 'physical', 'lsn': 99, 'expected_active': False},
171+
'test_3': {'type': 'physical', 'lsn': 98, 'expected_active': True},
172+
'test_4': {'type': 'physical', 'lsn': 98, 'expected_active': False}})
172173

173174
# cascade also does not entitled to have logical slot on itself ...
174175
self.p.name = cascade_node.name
@@ -222,7 +223,8 @@ def test__ensure_logical_slots_replica(self):
222223
self.cluster.status.slots['ls'] = 12346
223224
with patch.object(SlotsHandler, 'check_logical_slots_readiness', Mock(return_value=False)):
224225
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
225-
with patch.object(SlotsHandler, '_query', Mock(return_value=[('ls', 'logical', 499, 'b', 'a', 5, 100, 500)])), \
226+
with patch.object(SlotsHandler, '_query', Mock(return_value=[('ls', 'logical', 1, 499, 'b',
227+
'a', 5, 100, 500)])), \
226228
patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)), \
227229
patch.object(SlotsAdvanceThread, 'schedule', Mock(return_value=(True, ['ls']))), \
228230
patch.object(psycopg.OperationalError, 'diag') as mock_diag:
@@ -300,11 +302,28 @@ def test_advance_physical_slots(self):
300302
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
301303
global_config.update(cluster)
302304
self.s.sync_replication_slots(cluster, self.tags)
303-
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('blabla', 'physical', 12345, None, None, None,
304-
None, None)], Exception])) as mock_query, \
305+
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('blabla', 'physical', None, 12345, None, None,
306+
None, None, None)], Exception])) as mock_query, \
305307
patch('patroni.postgresql.slots.logger.error') as mock_error:
306308
self.s.sync_replication_slots(cluster, self.tags)
307309
self.assertEqual(mock_query.call_args[0],
308310
("SELECT pg_catalog.pg_replication_slot_advance(%s, %s)", "blabla", '0/303A'))
309311
self.assertEqual(mock_error.call_args[0][0],
310312
"Error while advancing replication slot %s to position '%s': %r")
313+
314+
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('test_1', 'physical', 1, 12345, None, None,
315+
None, None, None)], Exception])), \
316+
patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=(False, True))):
317+
self.s.sync_replication_slots(cluster, self.tags)
318+
319+
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('test_1', 'physical', 1, 12345, None, None,
320+
None, None, None)], Exception])), \
321+
patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=(True, False))), \
322+
patch('patroni.postgresql.slots.logger.warning') as mock_warning:
323+
self.s.sync_replication_slots(cluster, self.tags)
324+
self.assertEqual(mock_warning.call_args_list[-1][0], ("Unable to drop replication slot '%s', slot is active", 'test_1'))
325+
326+
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('test_1', 'physical', 1, 12345, None, None,
327+
None, None, None)], Exception])), \
328+
patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=(False, False))):
329+
self.s.sync_replication_slots(cluster, self.tags)

0 commit comments

Comments
 (0)