Skip to content

Commit 78a46b9

Browse files
authored
Follow up on patroni#3148 (patroni#3167)
the original fix didn't address same problem with with permanent slots, but only the part with member slots being retained due to `member_slots_ttl`.
1 parent d7e172c commit 78a46b9

File tree

2 files changed

+43
-6
lines changed

2 files changed

+43
-6
lines changed

patroni/dcs/__init__.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,16 +1028,16 @@ def get_replication_slots(self, postgresql: 'Postgresql', member: Tags, *,
10281028
permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role)
10291029

10301030
disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots(
1031-
slots, permanent_slots, name, postgresql.can_advance_slots)
1031+
slots, permanent_slots, name, role, postgresql.can_advance_slots)
10321032

10331033
if disabled_permanent_logical_slots and show_error:
10341034
logger.error("Permanent logical replication slots supported by Patroni only starting from PostgreSQL 11. "
10351035
"Following slots will not be created: %s.", disabled_permanent_logical_slots)
10361036

10371037
return slots
10381038

1039-
def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slots: Dict[str, Any], name: str,
1040-
can_advance_slots: bool) -> List[str]:
1039+
def _merge_permanent_slots(self, slots: Dict[str, Dict[str, Any]], permanent_slots: Dict[str, Any],
1040+
name: str, role: str, can_advance_slots: bool) -> List[str]:
10411041
"""Merge replication *slots* for members with *permanent_slots*.
10421042
10431043
Perform validation of configured permanent slot name, skipping invalid names.
@@ -1047,12 +1047,17 @@ def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slo
10471047
10481048
:param slots: Slot names with existing attributes if known.
10491049
:param name: name of this node.
1050+
:param role: role of the node -- ``primary``, ``standby_leader`` or ``replica``.
10501051
:param permanent_slots: dictionary containing slot name key and slot information values.
10511052
:param can_advance_slots: ``True`` if ``pg_replication_slot_advance()`` function is available,
10521053
``False`` otherwise.
10531054
10541055
:returns: List of disabled permanent, logical slot names, if postgresql version < 11.
10551056
"""
1057+
name = slot_name_from_member_name(name)
1058+
topology = {slot_name_from_member_name(m.name): m.replicatefrom and slot_name_from_member_name(m.replicatefrom)
1059+
for m in self.members}
1060+
10561061
disabled_permanent_logical_slots: List[str] = []
10571062

10581063
for slot_name, value in permanent_slots.items():
@@ -1068,8 +1073,14 @@ def _merge_permanent_slots(self, slots: Dict[str, Dict[str, str]], permanent_slo
10681073

10691074
if value['type'] == 'physical':
10701075
# Don't try to create permanent physical replication slot for yourself
1071-
if slot_name not in slots and slot_name != slot_name_from_member_name(name):
1072-
slots[slot_name] = value
1076+
if slot_name not in slots and slot_name != name:
1077+
# On the leader we expected to have permanent slots active, except the case when it is a slot
1078+
# for a cascading replica. Lets consider a configuration with C being a permanent slot. In this
1079+
# case we should have the following: A(B: active, C: inactive) <- B (C: active) <- C
1080+
# We don't consider the same situation on node B, because if node C doesn't exists, we will not
1081+
# be able to know its `replicatefrom` tag value.
1082+
expected_active = not topology.get(slot_name) and role in ('primary', 'standby_leader')
1083+
slots[slot_name] = {**value, 'expected_active': expected_active}
10731084
continue
10741085

10751086
if self.is_logical_slot(value):
@@ -1228,7 +1239,7 @@ def has_permanent_slots(self, postgresql: 'Postgresql', member: Tags) -> bool:
12281239
postgresql.can_advance_slots)
12291240
permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role)
12301241
slots = deepcopy(members_slots)
1231-
self._merge_permanent_slots(slots, permanent_slots, postgresql.name, postgresql.can_advance_slots)
1242+
self._merge_permanent_slots(slots, permanent_slots, postgresql.name, role, postgresql.can_advance_slots)
12321243
return len(slots) > len(members_slots) or any(self.is_physical_slot(v) for v in permanent_slots.values())
12331244

12341245
def maybe_filter_permanent_slots(self, postgresql: 'Postgresql', slots: Dict[str, int]) -> Dict[str, int]:

tests/test_slots.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,33 @@ def test_slots_advance_thread(self):
295295
self.s.schedule_advance_slots({'foo': {'bar': 100}})
296296
self.s._advance.sync_slots()
297297

298+
def test_advance_physical_primary(self):
299+
self.p.name = self.me.name
300+
config = ClusterConfig(1, {'member_slots_ttl': 0, 'slots': {'test_1': {'type': 'physical'}}}, 1)
301+
cluster = Cluster(True, config, self.leader, Status(0, {}, []),
302+
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
303+
self.other.data['xlog_location'] = 12346
304+
global_config.update(cluster)
305+
306+
# Should advance permanent physical slot on the primary for a node that is cascading from the other node
307+
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('test_1', 'physical', None, 12345, None, None,
308+
None, None, None)], Exception])) as mock_query, \
309+
patch('patroni.postgresql.slots.logger.error') as mock_error:
310+
self.s.sync_replication_slots(cluster, self.tags)
311+
self.assertEqual(mock_query.call_args[0],
312+
("SELECT pg_catalog.pg_replication_slot_advance(%s, %s)", "test_1", '0/303A'))
313+
self.assertEqual(mock_error.call_args[0][0],
314+
"Error while advancing replication slot %s to position '%s': %r")
315+
316+
# Should drop permanent physical slot on the primary for a node
317+
# that is cascading from the other node if given slot has xmin set
318+
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('test_1', 'physical', 1, 12345, None, None,
319+
None, None, None)], Exception])) as mock_query:
320+
self.s.sync_replication_slots(cluster, self.tags)
321+
self.assertTrue(mock_query.call_args[0][0].startswith('WITH slots AS (SELECT slot_name, active'))
322+
298323
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
324+
@patch.object(Postgresql, 'role', PropertyMock(return_value='replica'))
299325
def test_advance_physical_slots(self):
300326
config = ClusterConfig(1, {'slots': {'blabla': {'type': 'physical'}, 'leader': None}}, 1)
301327
cluster = Cluster(True, config, self.leader, Status(0, {'blabla': 12346}, []),

0 commit comments

Comments
 (0)