Skip to content

Commit 61951bf

Browse files
authored
Check that synchronous_standby_names contains expected value (patroni#3370)
* Change interface of SyncHandler.current_state() Make it always return nodes from synchronous_standby_names and sync_confirmed instead of numsync_confirmed. * Check that synchronous_standby_names contains expected value Current mechanism implementing state machine for non-quorum synchronous replication didn't check actual value of synchronous_standby_names, what resulted in stale value of synchronous_standby_names being used when pg_stat_replication is a subset of synchronous_standby_names. Such situation is mainly possible when standby nodes with replicatefrom tag connect to the primary because cascading replicas are temporary not accessible. To prevent it from happening we implement following measures: 1. Check the actual value of synchronous_standby_names when deceding whether we should continue to run state machine 2. Take into account `replicatefrom` tag on nodes that are streaming from the primary and don't consider them to be synchronous candidates if some other node in a chain already streaming from the primary. Close patroni#3369
1 parent f26b106 commit 61951bf

File tree

4 files changed

+94
-51
lines changed

4 files changed

+94
-51
lines changed

patroni/ha.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ def _check_timeout(offset: float = 0) -> bool:
860860
voters=sync.voters,
861861
numsync=sync_state.numsync,
862862
sync=sync_state.sync,
863-
numsync_confirmed=sync_state.numsync_confirmed,
863+
numsync_confirmed=len(sync_state.sync_confirmed),
864864
active=sync_state.active,
865865
sync_wanted=sync_wanted,
866866
leader_wanted=self.state_handler.name):
@@ -906,7 +906,7 @@ def _process_multisync_replication(self) -> None:
906906

907907
current_state = self.state_handler.sync_handler.current_state(self.cluster)
908908
picked = current_state.active
909-
allow_promote = current_state.sync
909+
allow_promote = current_state.sync_confirmed
910910
voters = CaseInsensitiveSet(sync.voters)
911911

912912
if picked == voters and voters != allow_promote:
@@ -917,7 +917,7 @@ def _process_multisync_replication(self) -> None:
917917
return logger.warning("Updating sync state failed")
918918
voters = CaseInsensitiveSet(sync.voters)
919919

920-
if picked == voters:
920+
if picked == voters == current_state.sync and current_state.numsync == len(picked):
921921
return
922922

923923
# update synchronous standby list in dcs temporarily to point to common nodes in current and picked
@@ -941,7 +941,7 @@ def _process_multisync_replication(self) -> None:
941941
if picked and picked != CaseInsensitiveSet('*') and allow_promote != picked:
942942
# Wait for PostgreSQL to enable synchronous mode and see if we can immediately set sync_standby
943943
time.sleep(2)
944-
allow_promote = self.state_handler.sync_handler.current_state(self.cluster).sync
944+
allow_promote = self.state_handler.sync_handler.current_state(self.cluster).sync_confirmed
945945

946946
if allow_promote and allow_promote != sync_common:
947947
if self.dcs.write_sync_state(self.state_handler.name, allow_promote, 0, version=sync.version):

patroni/postgresql/sync.py

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from .. import global_config
99
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
10-
from ..dcs import Cluster
10+
from ..dcs import Cluster, Member
1111
from ..psycopg import quote_ident
1212
from .misc import PostgresqlState
1313

@@ -167,17 +167,16 @@ class _SyncState(NamedTuple):
167167
:ivar sync_type: possible values: ``off``, ``priority``, ``quorum``
168168
:ivar numsync: how many nodes are required to be synchronous (according to ``synchronous_standby_names``).
169169
Is ``0`` if ``synchronous_standby_names`` value is invalid or contains ``*``.
170-
:ivar numsync_confirmed: how many nodes are known to be synchronous according to the ``pg_stat_replication`` view.
171-
Only nodes that caught up with the :attr:`SyncHandler._primary_flush_lsn` are counted.
172-
:ivar sync: collection of synchronous node names. In case of quorum commit all nodes listed
173-
in ``synchronous_standby_names``, otherwise nodes that are confirmed to be synchronous according
174-
to the ``pg_stat_replication`` view.
170+
:ivar sync: collection of synchronous node names from ``synchronous_standby_names``.
171+
:ivar sync_confirmed: collection of synchronous node names from ``synchronous_standby_names`` that are
172+
confirmed to be synchronous according to the ``pg_stat_replication`` view.
173+
Only nodes that caught up with the :attr:`SyncHandler._primary_flush_lsn` are counted.
175174
:ivar active: collection of node names that are streaming and have no restrictions to become synchronous.
176175
"""
177176
sync_type: str
178177
numsync: int
179-
numsync_confirmed: int
180178
sync: CaseInsensitiveSet
179+
sync_confirmed: CaseInsensitiveSet
181180
active: CaseInsensitiveSet
182181

183182

@@ -229,16 +228,20 @@ def __init__(self, postgresql: 'Postgresql', cluster: Cluster) -> None:
229228
'remote_write': 'write'
230229
}.get(postgresql.synchronous_commit(), 'flush') + '_lsn'
231230

232-
members = CaseInsensitiveDict({m.name: m for m in cluster.members if m.name.lower() != postgresql.name.lower()})
233-
for row in postgresql.pg_stat_replication():
231+
members = CaseInsensitiveDict({m.name: m for m in cluster.members
232+
if m.is_running and not m.nosync and m.name.lower() != postgresql.name.lower()})
233+
replication = CaseInsensitiveDict({row['application_name']: row for row in postgresql.pg_stat_replication()
234+
if row[sort_col] is not None and row['application_name'] in members})
235+
for row in replication.values():
234236
member = members.get(row['application_name'])
235237

236238
# We want to consider only rows from ``pg_stat_replication` that:
237239
# 1. are known to be streaming (write/flush/replay LSN are not NULL).
238240
# 2. can be mapped to a ``Member`` of the ``Cluster``:
239241
# a. ``Member`` doesn't have ``nosync`` tag set;
240242
# b. PostgreSQL on the member is known to be running and accepting client connections.
241-
if member and row[sort_col] is not None and member.is_running and not member.nosync:
243+
# c. ``Member`` isn't supposed to stream from another standby (``replicatefrom`` tag).
244+
if member and row[sort_col] is not None and not self._should_cascade(members, replication, member):
242245
self.append(_Replica(row['pid'], row['application_name'],
243246
row['sync_state'], row[sort_col],
244247
bool(member.nofailover), member.sync_priority))
@@ -251,6 +254,27 @@ def __init__(self, postgresql: 'Postgresql', cluster: Cluster) -> None:
251254
# up-to-date replica otherwise with cluster LSN if there is only one replica.
252255
self.max_lsn = max(self, key=lambda x: x.lsn).lsn if len(self) > 1 else postgresql.last_operation()
253256

257+
@staticmethod
258+
def _should_cascade(members: CaseInsensitiveDict, replication: CaseInsensitiveDict, member: Member) -> bool:
259+
"""Check whether *member* is supposed to cascade from another standby node.
260+
261+
:param members: members that are eligible to stream (state=running and don't have nosync tag)
262+
:param replication: state of ``pg_stat_replication``, already filtered by member names from *members*
263+
:param member: member that we want to check
264+
265+
:returns: ``True`` if provided member should stream from other standby node in
266+
the cluster (according to ``replicatefrom`` tag), because some standbys
267+
in a chain already streaming from the primary, otherwise ``False``
268+
"""
269+
if not member.replicatefrom or member.replicatefrom not in members:
270+
return False
271+
272+
member = members[member.replicatefrom]
273+
if not member.replicatefrom:
274+
return member.name in replication
275+
276+
return _ReplicaList._should_cascade(members, replication, member)
277+
254278

255279
class SyncHandler(object):
256280
"""Class responsible for working with the `synchronous_standby_names`.
@@ -350,8 +374,7 @@ def current_state(self, cluster: Cluster) -> _SyncState:
350374
self._process_replica_readiness(cluster, replica_list)
351375

352376
active = CaseInsensitiveSet()
353-
sync_nodes = CaseInsensitiveSet()
354-
numsync_confirmed = 0
377+
sync_confirmed = CaseInsensitiveSet()
355378

356379
sync_node_count = global_config.synchronous_node_count if self._postgresql.supports_multiple_sync else 1
357380
sync_node_maxlag = global_config.maximum_lag_on_syncnode
@@ -365,24 +388,20 @@ def current_state(self, cluster: Cluster) -> _SyncState:
365388
# there is a chance that a non-promotable node is ahead of a promotable one.
366389
if not replica.nofailover or len(active) < sync_node_count:
367390
if replica.application_name in self._ready_replicas:
368-
numsync_confirmed += 1
391+
sync_confirmed.add(replica.application_name)
369392
active.add(replica.application_name)
370393
else:
371394
active.add(replica.application_name)
372395
if replica.sync_state == 'sync' and replica.application_name in self._ready_replicas:
373-
sync_nodes.add(replica.application_name)
374-
numsync_confirmed += 1
396+
sync_confirmed.add(replica.application_name)
375397
if len(active) >= sync_node_count:
376398
break
377399

378-
if global_config.is_quorum_commit_mode:
379-
sync_nodes = CaseInsensitiveSet() if self._ssn_data.has_star else self._ssn_data.members
380-
381400
return _SyncState(
382401
self._ssn_data.sync_type,
383-
0 if self._ssn_data.has_star else self._ssn_data.num,
384-
numsync_confirmed,
385-
sync_nodes,
402+
self._ssn_data.num,
403+
CaseInsensitiveSet() if self._ssn_data.has_star else self._ssn_data.members,
404+
sync_confirmed,
386405
active)
387406

388407
def set_synchronous_standby_names(self, sync: Collection[str], num: Optional[int] = None) -> None:

tests/test_ha.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -979,7 +979,6 @@ def test_manual_failover_process_no_leader_in_synchronous_mode(self):
979979
with patch('patroni.ha.logger.warning') as mock_warning:
980980
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None),
981981
sync=('leader1', 'postgresql0'))
982-
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
983982
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
984983
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
985984
self.assertEqual(mock_warning.call_args_list[0][0],
@@ -990,8 +989,6 @@ def test_manual_failover_process_no_leader_in_synchronous_mode(self):
990989
self.p.set_role(PostgresqlRole.REPLICA)
991990
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None),
992991
sync=('leader1', 'other'))
993-
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['leader1']),
994-
CaseInsensitiveSet(['leader1'])))
995992
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
996993

997994
def test_manual_switchover_process_no_leader_in_synchronous_mode(self):
@@ -1366,7 +1363,8 @@ def test__process_multisync_replication(self):
13661363
self.ha.is_synchronous_mode = true
13671364

13681365
# Test sync standby not touched when picking the same node
1369-
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
1366+
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1,
1367+
CaseInsensitiveSet(['other']),
13701368
CaseInsensitiveSet(['other']),
13711369
CaseInsensitiveSet(['other'])))
13721370
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
@@ -1377,15 +1375,17 @@ def test__process_multisync_replication(self):
13771375
mock_cfg_set_sync.reset_mock()
13781376

13791377
# Test sync standby is replaced when switching standbys
1380-
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
1378+
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, CaseInsensitiveSet(),
1379+
CaseInsensitiveSet(),
13811380
CaseInsensitiveSet(['other2'])))
13821381
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
13831382
self.ha.run_cycle()
13841383
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(['other2']))
13851384
mock_cfg_set_sync.assert_not_called()
13861385

13871386
# Test sync standby is replaced when new standby is joined
1388-
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
1387+
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1,
1388+
CaseInsensitiveSet(['other2']),
13891389
CaseInsensitiveSet(['other2']),
13901390
CaseInsensitiveSet(['other2', 'other3'])))
13911391
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
@@ -1408,7 +1408,8 @@ def test__process_multisync_replication(self):
14081408
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
14091409
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other')))
14101410
# self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
1411-
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
1411+
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1,
1412+
CaseInsensitiveSet(['other2']),
14121413
CaseInsensitiveSet(['other2']),
14131414
CaseInsensitiveSet(['other2'])))
14141415
self.ha.run_cycle()
@@ -1433,8 +1434,8 @@ def test__process_multisync_replication(self):
14331434
# Test sync set to '*' when synchronous_mode_strict is enabled
14341435
mock_set_sync.reset_mock()
14351436
mock_cfg_set_sync.reset_mock()
1436-
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
1437-
CaseInsensitiveSet()))
1437+
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, CaseInsensitiveSet(),
1438+
CaseInsensitiveSet(), CaseInsensitiveSet()))
14381439
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
14391440
self.ha.run_cycle()
14401441
mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*'))
@@ -1545,7 +1546,7 @@ def test_enable_synchronous_mode(self):
15451546
self.ha.is_synchronous_mode = true
15461547
self.ha.has_lock = true
15471548
self.p.name = 'leader'
1548-
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
1549+
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, CaseInsensitiveSet(),
15491550
CaseInsensitiveSet(), CaseInsensitiveSet()))
15501551
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
15511552
with patch('patroni.ha.logger.info') as mock_logger:
@@ -1562,7 +1563,7 @@ def test_inconsistent_synchronous_state(self):
15621563
self.ha.has_lock = true
15631564
self.p.name = 'leader'
15641565
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'a'))
1565-
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
1566+
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, CaseInsensitiveSet(),
15661567
CaseInsensitiveSet(), CaseInsensitiveSet('a')))
15671568
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
15681569
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
@@ -1790,7 +1791,8 @@ def test__process_quorum_replication(self):
17901791

17911792
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
17921793
# Test /sync key is attempted to set and failed when missing or invalid
1793-
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 1, CaseInsensitiveSet(['other']),
1794+
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, CaseInsensitiveSet(['other']),
1795+
CaseInsensitiveSet(['other']),
17941796
CaseInsensitiveSet(['other'])))
17951797
self.ha.run_cycle()
17961798
self.assertEqual(mock_write_sync.call_count, 1)
@@ -1810,9 +1812,11 @@ def test__process_quorum_replication(self):
18101812
self.assertEqual(mock_write_sync.call_args_list[1][1], {'version': None})
18111813
self.assertEqual(mock_set_sync.call_count, 0)
18121814

1813-
self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, 0, CaseInsensitiveSet(['foo']),
1815+
self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, CaseInsensitiveSet(['foo']),
1816+
CaseInsensitiveSet(),
18141817
CaseInsensitiveSet(['other'])),
1815-
_SyncState('quorum', 1, 1, CaseInsensitiveSet(['foo']),
1818+
_SyncState('quorum', 1, CaseInsensitiveSet(['foo']),
1819+
CaseInsensitiveSet(['foo']),
18161820
CaseInsensitiveSet(['foo']))])
18171821
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState(1, 'leader', 'foo', 0))
18181822
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo'))
@@ -1827,8 +1831,9 @@ def test__process_quorum_replication(self):
18271831
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (other)',))
18281832

18291833
# Test ANY 1 (*) when synchronous_mode_strict and no nodes available
1830-
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 0,
1834+
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1,
18311835
CaseInsensitiveSet(['other', 'foo']),
1836+
CaseInsensitiveSet(),
18321837
CaseInsensitiveSet()))
18331838
mock_write_sync.reset_mock()
18341839
mock_set_sync.reset_mock()

0 commit comments

Comments
 (0)