Skip to content

Commit 081992d

Browse files
authored
Consider replay_lsn when advancing slots (patroni#3356)
Do not try to advance slots past replay_lsn Advance slot to replay_lsn position if it is already past confirmed_flush_lsn of this slot on the replica but the replica has still not replayed the actual LSN which this slot is at on the primary (the position stored in the DCS).
1 parent 9644a83 commit 081992d

File tree

2 files changed

+28
-9
lines changed

2 files changed

+28
-9
lines changed

patroni/postgresql/slots.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -491,8 +491,8 @@ def _ensure_logical_slots_replica(self, slots: Dict[str, Any]) -> List[str]:
491491
"""Update logical *slots* on replicas.
492492
493493
If the logical slot already exists, copy state information into the replication slots structure stored in the
494-
class instance. Slots that exist are also advanced if their ``confirmed_flush_lsn`` is greater than the stored
495-
state of the slot.
494+
class instance. Slots that exist are also advanced if their ``confirmed_flush_lsn`` is smaller than the state
495+
of the slot stored in DCS or at least the ``replay_lsn`` of this replica.
496496
497497
As logical slots can only be created when the primary is available, pass the list of slots that need to be
498498
copied back to the caller. They will be created on replicas with :meth:`SlotsHandler.copy_logical_slots`.
@@ -513,9 +513,14 @@ class instance. Slots that exist are also advanced if their ``confirmed_flush_ls
513513
# If the logical already exists, copy some information about it into the original structure
514514
if name in self._replication_slots and compare_slots(value, self._replication_slots[name]):
515515
self._copy_items(self._replication_slots[name], value)
516-
if 'lsn' in value and value['confirmed_flush_lsn'] < value['lsn']: # The slot has feedback in DCS
516+
517+
# The slot has feedback in DCS
518+
if 'lsn' in value:
519+
# we can not advance past replay_lsn
520+
advance_value = min(value['lsn'], self._postgresql.replay_lsn())
517521
# Skip slots that don't need to be advanced
518-
advance_slots[value['database']][name] = value['lsn']
522+
if value['confirmed_flush_lsn'] < advance_value:
523+
advance_slots[value['database']][name] = advance_value
519524
elif name not in self._replication_slots and 'lsn' in value:
520525
# We want to copy only slots with feedback in a DCS
521526
create_slots.append(name)

tests/test_slots.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -221,18 +221,32 @@ def test_should_enforce_hot_standby_feedback(self):
221221
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
222222
def test__ensure_logical_slots_replica(self):
223223
self.p.set_role(PostgresqlRole.REPLICA)
224-
self.cluster.status.slots['ls'] = 12346
224+
225+
self.cluster.status.slots['ls'] = 800
225226
with patch.object(SlotsHandler, 'check_logical_slots_readiness', Mock(return_value=False)):
226227
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
228+
227229
with patch.object(SlotsHandler, '_query', Mock(return_value=[('ls', 'logical', 1, 499, 'b',
228230
'a', 5, 100, 500)])), \
229231
patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)), \
230-
patch.object(SlotsAdvanceThread, 'schedule', Mock(return_value=(True, ['ls']))), \
231-
patch.object(psycopg.OperationalError, 'diag') as mock_diag:
232-
type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
233-
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])
232+
patch.object(SlotsAdvanceThread, 'schedule', Mock(return_value=(True, ['ls']))):
233+
# copy invalidated slot
234+
with patch.object(psycopg.OperationalError, 'diag') as mock_diag:
235+
type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
236+
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])
237+
# advance slots based on the replay lsn value
238+
with patch.object(Postgresql, 'replay_lsn', Mock(side_effect=[200, 700, 900])), \
239+
patch.object(SlotsHandler, 'schedule_advance_slots') as advance_mock:
240+
self.s.sync_replication_slots(self.cluster, self.tags)
241+
advance_mock.assert_called_with(dict())
242+
self.s.sync_replication_slots(self.cluster, self.tags)
243+
advance_mock.assert_called_with({'a': {'ls': 700}})
244+
self.s.sync_replication_slots(self.cluster, self.tags)
245+
advance_mock.assert_called_with({'a': {'ls': 800}})
246+
234247
self.cluster.status.slots['ls'] = 'a'
235248
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
249+
236250
self.cluster.config.data['slots']['ls']['database'] = 'b'
237251
self.cluster.status.slots['ls'] = '500'
238252
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):

0 commit comments

Comments
 (0)