Skip to content

Commit 03735ec

Browse files
authored
Recreate (permanent) physical slot when it doesn't reserve WAL (patroni#3388)
It could happen that (permanent) physical slot was created outside of Patroni without reserving WAL. For such slots pg_replication_slot_advance() function throws an error: ```sql ERROR: replication slot "foo" cannot be advanced DETAIL: This slot has never previously reserved WAL, or it has been invalidated. ``` To mitigate the problem we will drop (permanent) physical slots without `restart_lsn` and let state machine to recreate them on the next cycle.
1 parent 7f89ead commit 03735ec

File tree

2 files changed

+104
-25
lines changed

2 files changed

+104
-25
lines changed

patroni/postgresql/slots.py

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,26 @@ def drop_replication_slot(self, name: str) -> Tuple[bool, bool]:
332332
' FULL OUTER JOIN dropped ON true'), name)
333333
return (rows[0][0], rows[0][1]) if rows else (False, False)
334334

335+
def _drop_physical_slot(self, name: str) -> None:
336+
"""Drop a physical replication slot by name.
337+
338+
.. note::
339+
If not able to drop the slot, it will log a message and set the flag to reload slots.
340+
341+
:param name: name of the slot to be dropped.
342+
"""
343+
active, dropped = self.drop_replication_slot(name)
344+
if dropped:
345+
logger.info("Dropped physical replication slot '%s'", name)
346+
if name in self._replication_slots:
347+
del self._replication_slots[name]
348+
else:
349+
self._schedule_load_slots = True
350+
if active:
351+
logger.warning("Unable to drop replication slot '%s', slot is active", name)
352+
else:
353+
logger.error("Failed to drop replication slot '%s'", name)
354+
335355
def _drop_incorrect_slots(self, cluster: Cluster, slots: Dict[str, Any]) -> None:
336356
"""Compare required slots and configured as permanent slots with those found, dropping extraneous ones.
337357
@@ -347,15 +367,8 @@ def _drop_incorrect_slots(self, cluster: Cluster, slots: Dict[str, Any]) -> None
347367
# drop old replication slots which are not presented in desired slots.
348368
for name in set(self._replication_slots) - set(slots):
349369
if not global_config.is_paused and not self.ignore_replication_slot(cluster, name):
350-
active, dropped = self.drop_replication_slot(name)
351-
if dropped:
352-
logger.info("Dropped unknown replication slot '%s'", name)
353-
else:
354-
self._schedule_load_slots = True
355-
if active:
356-
logger.debug("Unable to drop unknown replication slot '%s', slot is still active", name)
357-
else:
358-
logger.error("Failed to drop replication slot '%s'", name)
370+
logger.info("Trying to drop unknown replication slot '%s'", name)
371+
self._drop_physical_slot(name)
359372

360373
# drop slots with matching names but attributes that do not match, e.g. `plugin` or `database`.
361374
for name, value in slots.items():
@@ -394,15 +407,7 @@ def _ensure_physical_slots(self, slots: Dict[str, Any], clean_inactive_physical_
394407
if clean_inactive_physical_slots and value.get('expected_active') is False and value['xmin']:
395408
logger.warning('Dropping physical replication slot %s because of its xmin value %s',
396409
name, value['xmin'])
397-
active, dropped = self.drop_replication_slot(name)
398-
if dropped:
399-
self._replication_slots.pop(name)
400-
else:
401-
self._schedule_load_slots = True
402-
if active:
403-
logger.warning("Unable to drop replication slot '%s', slot is active", name)
404-
else:
405-
logger.error("Failed to drop replication slot '%s'", name)
410+
self._drop_physical_slot(name)
406411

407412
# Now we will create physical replication slots that are missing.
408413
if name not in self._replication_slots:
@@ -417,8 +422,19 @@ def _ensure_physical_slots(self, slots: Dict[str, Any], clean_inactive_physical_
417422
# And advance restart_lsn on physical replication slots that are not expected to be active.
418423
elif self._postgresql.can_advance_slots and self._replication_slots[name]['type'] == 'physical' and\
419424
value.get('expected_active') is not True and not value['xmin']:
425+
restart_lsn = value.get('restart_lsn')
426+
if not restart_lsn:
427+
# This slot either belongs to a member or was configured as a permanent slot. However, for some
428+
# reason the slot was created by an external agent instead of by Patroni, and it was created without
429+
# reserving the LSN. We drop the slot here, as we cannot advance it, and let Patroni recreate and
430+
# manage it in the next cycle.
431+
logger.warning('Dropping physical replication slot %s because it has no restart_lsn. '
432+
'This slot was probably not created by Patroni, but by an external agent.',
433+
name)
434+
self._drop_physical_slot(name)
435+
continue
420436
lsn = value.get('lsn')
421-
if lsn and lsn > value['restart_lsn']: # The slot has feedback in DCS and needs to be advanced
437+
if lsn and lsn > restart_lsn: # The slot has feedback in DCS and needs to be advanced
422438
try:
423439
lsn = format_lsn(lsn)
424440
self._query("SELECT pg_catalog.pg_replication_slot_advance(%s, %s)", name, lsn)

tests/test_slots.py

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ def test_sync_replication_slots(self):
5555
self.p.set_role(PostgresqlRole.STANDBY_LEADER)
5656
with patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=(True, False))), \
5757
patch.object(global_config.__class__, 'is_standby_cluster', PropertyMock(return_value=True)), \
58-
patch('patroni.postgresql.slots.logger.debug') as mock_debug:
58+
patch('patroni.postgresql.slots.logger.warning') as mock_warning:
5959
self.s.sync_replication_slots(cluster, self.tags)
60-
mock_debug.assert_called_once()
60+
mock_warning.assert_called_once_with("Unable to drop replication slot '%s', slot is active", 'foobar')
6161
self.p.set_role(PostgresqlRole.REPLICA)
6262
with patch.object(Postgresql, 'is_primary', Mock(return_value=False)), \
6363
patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)), \
64-
patch.object(SlotsHandler, 'drop_replication_slot') as mock_drop:
64+
patch.object(SlotsHandler, '_drop_physical_slot') as mock_drop:
6565
config.data['slots'].pop('ls')
6666
self.s.sync_replication_slots(cluster, self.tags)
6767
mock_drop.assert_not_called()
@@ -343,6 +343,7 @@ def test_advance_physical_slots(self):
343343
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
344344
global_config.update(cluster)
345345
self.s.sync_replication_slots(cluster, self.tags)
346+
346347
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('blabla', 'physical', None, 12345, None, None,
347348
None, None, None)], Exception])) as mock_query, \
348349
patch('patroni.postgresql.slots.logger.error') as mock_error:
@@ -354,7 +355,7 @@ def test_advance_physical_slots(self):
354355

355356
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('test_1', 'physical', 1, 12345, None, None,
356357
None, None, None)], Exception])), \
357-
patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=(False, True))):
358+
patch.object(SlotsHandler, '_drop_physical_slot', Mock(return_value=(True))):
358359
self.s.sync_replication_slots(cluster, self.tags)
359360

360361
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('test_1', 'physical', 1, 12345, None, None,
@@ -367,16 +368,31 @@ def test_advance_physical_slots(self):
367368

368369
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('test_1', 'physical', 1, 12345, None, None,
369370
None, None, None)], Exception])), \
370-
patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=(False, False))):
371+
patch.object(SlotsHandler, '_drop_physical_slot', Mock(return_value=(False))):
371372
self.s.sync_replication_slots(cluster, self.tags)
372373

373374
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('test_1', 'physical', 1, 12345, None, None,
374375
None, None, None)], Exception])), \
375376
patch.object(Cluster, 'is_unlocked', Mock(return_value=True)), \
376-
patch.object(SlotsHandler, 'drop_replication_slot') as mock_drop:
377+
patch.object(SlotsHandler, '_drop_physical_slot') as mock_drop:
377378
self.s.sync_replication_slots(cluster, self.tags)
378379
mock_drop.assert_not_called()
379380

381+
# If the slot has no restart_lsn, we should not try to advance it, and only warn the user that this is not an
382+
# expected situation.
383+
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('blabla', 'physical', None, None, None, None,
384+
None, None, None)], Exception])) as mock_query, \
385+
patch('patroni.postgresql.slots.logger.warning') as mock_warning, \
386+
patch.object(SlotsHandler, '_drop_physical_slot') as mock_drop:
387+
self.s.sync_replication_slots(cluster, self.tags)
388+
for mock_call in mock_query.call_args_list:
389+
self.assertNotIn("pg_catalog.pg_replication_slot_advance", mock_call[0][0])
390+
self.assertEqual(mock_warning.call_args[0][0],
391+
'Dropping physical replication slot %s because it has no restart_lsn. '
392+
'This slot was probably not created by Patroni, but by an external agent.')
393+
self.assertEqual(mock_warning.call_args[0][1], 'blabla')
394+
mock_drop.assert_called_once_with('blabla')
395+
380396
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
381397
@patch.object(Postgresql, 'role', PropertyMock(return_value=PostgresqlRole.REPLICA))
382398
@patch.object(TestTags, 'tags', PropertyMock(return_value={'nofailover': True}))
@@ -390,3 +406,50 @@ def test_slots_nofailover_tag(self):
390406
None, None, None)], Exception])) as mock_query:
391407
self.s.sync_replication_slots(cluster, self.tags)
392408
self.assertTrue(mock_query.call_args[0][0].startswith('SELECT slot_name, slot_type, xmin, '))
409+
410+
def test__drop_physical_slot(self):
411+
"""Test the :meth:~SlotsHandler._drop_physical_slot` method."""
412+
# Should log info and remove the slot from the list when the slot is dropped
413+
self.s._replication_slots['testslot'] = {'type': 'physical'}
414+
self.s._schedule_load_slots = False
415+
with patch.object(self.s, 'drop_replication_slot', return_value=(False, True)) as mock_drop, \
416+
patch('patroni.postgresql.slots.logger.info') as mock_info, \
417+
patch('patroni.postgresql.slots.logger.warning') as mock_warning, \
418+
patch('patroni.postgresql.slots.logger.error') as mock_error:
419+
self.s._drop_physical_slot('testslot')
420+
mock_drop.assert_called_once_with('testslot')
421+
mock_info.assert_called_once_with("Dropped physical replication slot '%s'", 'testslot')
422+
mock_warning.assert_not_called()
423+
mock_error.assert_not_called()
424+
self.assertFalse(self.s._schedule_load_slots)
425+
self.assertNotIn('testslot', self.s._replication_slots)
426+
427+
# Should log warning and keep slot in the list when the slot is active and not dropped
428+
self.s._replication_slots['testslot'] = {'type': 'physical'}
429+
self.s._schedule_load_slots = False
430+
with patch.object(self.s, 'drop_replication_slot', return_value=(True, False)) as mock_drop, \
431+
patch('patroni.postgresql.slots.logger.info') as mock_info, \
432+
patch('patroni.postgresql.slots.logger.warning') as mock_warning, \
433+
patch('patroni.postgresql.slots.logger.error') as mock_error:
434+
self.s._drop_physical_slot('testslot')
435+
mock_drop.assert_called_once_with('testslot')
436+
mock_info.assert_not_called()
437+
mock_warning.assert_called_once_with("Unable to drop replication slot '%s', slot is active", 'testslot')
438+
mock_error.assert_not_called()
439+
self.assertTrue(self.s._schedule_load_slots)
440+
self.assertIn('testslot', self.s._replication_slots)
441+
442+
# Should log error and keep the slot in the list when the slot is not active and not dropped
443+
self.s._replication_slots['testslot'] = {'type': 'physical'}
444+
self.s._schedule_load_slots = False
445+
with patch.object(self.s, 'drop_replication_slot', return_value=(False, False)) as mock_drop, \
446+
patch('patroni.postgresql.slots.logger.info') as mock_info, \
447+
patch('patroni.postgresql.slots.logger.warning') as mock_warning, \
448+
patch('patroni.postgresql.slots.logger.error') as mock_error:
449+
self.s._drop_physical_slot('testslot')
450+
mock_drop.assert_called_once_with('testslot')
451+
mock_info.assert_not_called()
452+
mock_warning.assert_not_called()
453+
mock_error.assert_called_once_with("Failed to drop replication slot '%s'", 'testslot')
454+
self.assertTrue(self.s._schedule_load_slots)
455+
self.assertIn('testslot', self.s._replication_slots)

0 commit comments

Comments
 (0)