@@ -72,15 +72,22 @@ def __init__(self, config, on_change=None):
7272 msconfig = config ['multisite' ]
7373
7474 from .dcs import get_dcs
75+
76+ # Multisite configuration inherits values from main configuration
7577 inherited_keys = ['name' , 'scope' , 'namespace' , 'loop_wait' , 'ttl' , 'retry_timeout' ]
7678 for key in inherited_keys :
7779 if key not in msconfig and key in config :
7880 msconfig [key ] = config [key ]
7981
82+ msconfig .setdefault ('observe_interval' , config .get ('loop_wait' ))
83+
8084 # TODO: fetch default host/port from postgresql section
8185 if 'host' not in msconfig or 'port' not in msconfig :
8286 raise Exception ("Missing host or port from multisite configuration" )
8387
88+ # Disable etcd3 lease ownership detection warning
89+ msconfig ['multisite' ] = True
90+
8491 self .config = msconfig
8592
8693 self .name = msconfig ['name' ]
@@ -118,7 +125,10 @@ def get_active_standby_config(self):
118125 def resolve_leader (self ):
119126 """Try to become leader, update active config correspondingly.
120127
121- Returns error message encountered when unable to resolve"""
128+ Must be called from Patroni main thread. After a successful return get_active_standby_config() will
129+ return a value corresponding to a multisite status that was active after start of the call.
130+
131+ Returns error message encountered when unable to resolve leader status."""
122132 self ._leader_resolved .clear ()
123133 self ._heartbeat .set ()
124134 self ._leader_resolved .wait ()
@@ -128,7 +138,7 @@ def heartbeat(self):
128138 """Notify multisite mechanism that this site has a properly operating cluster mechanism.
129139
130140 Need to send out an async lease update. If that fails to complete within safety margin of ttl running
131- out then we need to
141+ out then we need to demote.
132142 """
133143 logger .info ("Triggering multisite hearbeat" )
134144 self ._heartbeat .set ()
@@ -180,6 +190,7 @@ def _check_transition(self, leader, note=None):
180190 def _resolve_multisite_leader (self ):
181191 logger .info ("Running multisite consensus." )
182192 try :
193+ # Refresh the latest known state
183194 cluster = self .dcs .get_cluster ()
184195 self ._dcs_error = None
185196
@@ -245,7 +256,8 @@ def _resolve_multisite_leader(self):
245256 self ._failover_target = None
246257 self ._failover_timeout = None
247258 if self ._set_standby_config (cluster .leader .member ):
248- # Wake up anyway to notice that we need to replicate from new leader
259+ # Wake up anyway to notice that we need to replicate from new leader. For the other case
260+ # _check_transition() handles the wake.
249261 if not self ._has_leader :
250262 self .on_change ()
251263 note = f"Lost leader lock to { lock_owner } " if self ._has_leader else f"Current leader { lock_owner } "
@@ -267,6 +279,31 @@ def _resolve_multisite_leader(self):
267279 except DCSError as e :
268280 pass
269281
282+ def _observe_leader (self ):
283+ """
284+ Observe multisite state and make sure
285+
286+ """
287+ try :
288+ cluster = self .dcs .get_cluster ()
289+
290+ if cluster .is_unlocked ():
291+ logger .info ("Multisite has no leader" )
292+ self ._disconnected_operation ()
293+ else :
294+ # There is a leader cluster
295+ lock_owner = cluster .leader and cluster .leader .name
296+ # The leader is us
297+ if lock_owner == self .name :
298+ logger .info ("Multisite leader is us" )
299+ self ._standby_config = None
300+ else :
301+ logger .info (f"Multisite leader is { lock_owner } " )
302+ self ._set_standby_config (cluster .leader .member )
303+ except DCSError as e :
304+ # On replicas we need to know the multisite status only for rewinding.
305+ logger .warning (f"Error accessing multisite DCS: { e } " )
306+
270307 def _update_history (self , cluster ):
271308 if cluster .history and cluster .history .lines and isinstance (cluster .history .lines [0 ], dict ):
272309 self .site_switches = cluster .history .lines [0 ].get ('switches' )
@@ -306,14 +343,18 @@ def touch_member(self):
306343 self .dcs .touch_member (data )
307344
308345 def run (self ):
309- self ._heartbeat .wait ()
346+ self ._observe_leader ()
347+ while not self ._heartbeat .wait (self .config ['observe_interval' ]):
348+ # Keep track of who is the leader even when we are not the primary node to be able to rewind from them
349+ self ._observe_leader ()
310350 while not self .stop_requested :
311351 self ._resolve_multisite_leader ()
312352 self ._heartbeat .clear ()
313353 self ._leader_resolved .set ()
314354 if self ._state_updater :
315355 self ._state_updater .store_updates ()
316- self ._heartbeat .wait ()
356+ while not self ._heartbeat .wait (self .config ['observe_interval' ]):
357+ self ._observe_leader ()
317358
318359 def shutdown (self ):
319360 self .stop_requested = True
0 commit comments