Skip to content

Commit 102f286

Browse files
Add Sentinel +switch-master linstener option to monitor failovers.
1 parent dfcf08c commit 102f286

File tree

1 file changed

+139
-11
lines changed

1 file changed

+139
-11
lines changed

redis/sentinel.py

Lines changed: 139 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import random
2+
import time
23
import weakref
4+
from collections import defaultdict
5+
from threading import Lock, Thread, Event
36
from typing import Optional
47

58
from redis.client import Redis
@@ -104,21 +107,18 @@ def __init__(
104107
self.check_connection = check_connection
105108
self.service_name = service_name
106109
self.sentinel_manager = sentinel_manager
110+
111+
self._lock = Lock()
107112
self.reset()
108113

109114
def reset(self):
110-
self.master_address = None
111115
self.slave_rr_counter = None
116+
with self._lock:
117+
self.master_address = None
112118

113119
def get_master_address(self):
114120
master_address = self.sentinel_manager.discover_master(self.service_name)
115-
if self.is_master and self.master_address != master_address:
116-
self.master_address = master_address
117-
# disconnect any idle connections so that they reconnect
118-
# to the new master the next time that they are used.
119-
connection_pool = self.connection_pool_ref()
120-
if connection_pool is not None:
121-
connection_pool.disconnect(inuse_connections=False)
121+
self.update_master_address(master_address)
122122
return master_address
123123

124124
def rotate_slaves(self):
@@ -137,6 +137,24 @@ def rotate_slaves(self):
137137
pass
138138
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
139139

140+
def update_master_address(self, master_address):
141+
if not self.is_master:
142+
return
143+
144+
changed = False
145+
with self._lock:
146+
if self.master_address != master_address:
147+
self.master_address = master_address
148+
changed = True
149+
150+
if changed:
151+
# disconnect any idle connections so that they reconnect
152+
# to the new master the next time that they are used.
153+
connection_pool = self.connection_pool_ref()
154+
if connection_pool is not None:
155+
connection_pool.disconnect(inuse_connections=False)
156+
157+
140158

141159
class SentinelConnectionPool(ConnectionPool):
142160
"""
@@ -224,6 +242,11 @@ class Sentinel(SentinelCommands):
224242
not specified, any socket_timeout and socket_keepalive options specified
225243
in ``connection_kwargs`` will be used.
226244
245+
``m̀onitor_failover`` indicates whether the Sentinel client should monitor
246+
for master failover events. If set to True, the client will subscribe to
247+
Sentinel's "+switch-master" notifications and update any registered
248+
connection pools automatically when a failover occurs.
249+
227250
``connection_kwargs`` are keyword arguments that will be used when
228251
establishing a connection to a Redis server.
229252
"""
@@ -234,6 +257,7 @@ def __init__(
234257
min_other_sentinels=0,
235258
sentinel_kwargs=None,
236259
force_master_ip=None,
260+
monitor_failover: bool = False,
237261
**connection_kwargs,
238262
):
239263
# if sentinel_kwargs isn't defined, use the socket_* options from
@@ -252,6 +276,11 @@ def __init__(
252276
self.connection_kwargs = connection_kwargs
253277
self._force_master_ip = force_master_ip
254278

279+
self._monitor_failover = monitor_failover
280+
self._listener_lock = Lock()
281+
self._switch_master_listener = None
282+
self._proxies_by_service = defaultdict(weakref.WeakSet)
283+
255284
def execute_command(self, *args, **kwargs):
256285
"""
257286
Execute Sentinel command in sentinel nodes.
@@ -355,6 +384,29 @@ def discover_slaves(self, service_name):
355384
return slaves
356385
return []
357386

387+
def close(self):
388+
with self._listener_lock:
389+
if self._switch_master_listener is not None:
390+
self._switch_master_listener.stop()
391+
self._switch_master_listener.join(timeout=2)
392+
self._switch_master_listener = None
393+
394+
def _register_proxy(self, service_name: str, proxy: SentinelConnectionPoolProxy):
395+
self._proxies_by_service[service_name].add(proxy)
396+
if not self._monitor_failover:
397+
return
398+
with self._listener_lock:
399+
if self._switch_master_listener is None:
400+
self._switch_master_listener = _SwitchMasterListener(self)
401+
self._switch_master_listener.start()
402+
403+
def _on_switch_master(self, service_name: str, new_master_address: tuple[str, int]):
404+
proxies = self._proxies_by_service.get(service_name)
405+
if not proxies:
406+
return
407+
for proxy in list(proxies):
408+
proxy.update_master_address(new_master_address)
409+
358410
def master_for(
359411
self,
360412
service_name,
@@ -389,9 +441,11 @@ def master_for(
389441
kwargs["is_master"] = True
390442
connection_kwargs = dict(self.connection_kwargs)
391443
connection_kwargs.update(kwargs)
392-
return redis_class.from_pool(
393-
connection_pool_class(service_name, self, **connection_kwargs)
394-
)
444+
445+
pool = connection_pool_class(service_name, self, **connection_kwargs)
446+
self._register_proxy(service_name, pool.proxy)
447+
448+
return redis_class.from_pool(pool)
395449

396450
def slave_for(
397451
self,
@@ -423,3 +477,77 @@ def slave_for(
423477
return redis_class.from_pool(
424478
connection_pool_class(service_name, self, **connection_kwargs)
425479
)
480+
481+
482+
class _SwitchMasterListener(Thread):
483+
def __init__(self, sentinel: Sentinel):
484+
super().__init__(daemon=True)
485+
self._sentinel = sentinel
486+
self._stop = Event()
487+
488+
def stop(self):
489+
self._stop.set()
490+
491+
def run(self):
492+
493+
while not self._stop.is_set():
494+
pubsub = None
495+
496+
try:
497+
sentinel = self._get_working_sentinel()
498+
if sentinel is None:
499+
time.sleep(1)
500+
continue
501+
502+
pubsub = sentinel.pubsub(ignore_subscribe_messages=True)
503+
pubsub.subscribe("+switch-master")
504+
505+
while not self._stop.is_set():
506+
msg = pubsub.get_message(timeout=1)
507+
if msg is None:
508+
continue
509+
if msg.get("type") != "message":
510+
continue
511+
512+
513+
if (data := msg.get("data")) is None:
514+
continue
515+
if isinstance(data, bytes):
516+
data = data.decode("utf-8", errors="replace")
517+
518+
parts = str(data).split()
519+
if len(parts) < 5:
520+
continue
521+
522+
service_name = parts[0]
523+
new_master_ip = parts[3]
524+
try:
525+
new_master_port = int(parts[4])
526+
except ValueError:
527+
continue
528+
529+
self._sentinel._on_switch_master(
530+
service_name, (new_master_ip, new_master_port)
531+
)
532+
533+
except (ConnectionError, TimeoutError):
534+
time.sleep(0.5)
535+
finally:
536+
try:
537+
if pubsub is not None:
538+
pubsub.close()
539+
except Exception:
540+
pass
541+
542+
543+
544+
def _get_working_sentinel(self):
545+
sentinels = list(self._sentinel.sentinels)
546+
for sentinel in sentinels:
547+
try:
548+
sentinel.ping()
549+
return sentinel
550+
except (ConnectionError, TimeoutError):
551+
continue
552+
553+
return None

0 commit comments

Comments
 (0)