Skip to content

Commit 8f53bff

Browse files
committed
fix(redis): add sentinel_fanout_compat transport option for mixed-version clusters
Commit 06a3b23 changed SentinelChannel's fanout PUB/SUB topic from the literal '/{db}.exchange' to the formatted '/0.exchange'. This breaks control commands (ping, shutdown, etc.) between workers running different kombu versions behind Redis Sentinel. Add a `sentinel_fanout_compat` transport option (default **False**) that, when explicitly enabled: - publishes fanout messages to *both* the new and legacy topics - subscribes to *both* patterns - deduplicates received messages via a bounded hash set This lets operators perform rolling upgrades without losing control messages. Once every worker runs kombu >= 5.4.0 the option can be removed (or left at its default False) to eliminate the overhead. Closes #2152
1 parent 044462a commit 8f53bff

File tree

2 files changed

+417
-1
lines changed

2 files changed

+417
-1
lines changed

kombu/transport/redis.py

Lines changed: 151 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@
5050
* ``retry_on_timeout``
5151
* ``priority_steps``
5252
* ``client_name``: (str) The name to use when connecting to Redis server.
53+
* ``sentinel_fanout_compat``: (bool) Sentinel-only. When True,
54+
publishes and subscribes to both the new formatted fanout topic (e.g.
55+
``/0.exchange``) and the legacy literal topic (``/{db}.exchange``) used
56+
by kombu < 5.4.0. This allows mixed-version clusters to exchange
57+
control commands during a rolling upgrade. Defaults to False.
5358
"""
5459

5560
from __future__ import annotations
@@ -1440,11 +1445,35 @@ class SentinelChannel(Channel):
14401445
from_transport_options = Channel.from_transport_options + (
14411446
'master_name',
14421447
'min_other_sentinels',
1443-
'sentinel_kwargs')
1448+
'sentinel_kwargs',
1449+
'sentinel_fanout_compat')
14441450

14451451
connection_class = sentinel.SentinelManagedConnection if sentinel else None
14461452
connection_class_ssl = SentinelManagedSSLConnection if sentinel else None
14471453

1454+
#: Transport option controlling backward-compatible fanout for
1455+
#: mixed-version sentinel clusters.
1456+
#:
1457+
#: When True, the channel publishes fanout messages to both the new
1458+
#: formatted topic (``/0.exchange``) and the legacy literal topic
1459+
#: (``/{db}.exchange``), and subscribes to both. This allows
1460+
#: clusters with kombu < 5.4.0 and >= 5.4.0 workers to exchange
1461+
#: control commands (ping, shutdown, etc.) during a rolling upgrade.
1462+
#:
1463+
#: Defaults to False. Set to True in ``transport_options`` if you
1464+
#: have a mixed-version cluster.
1465+
sentinel_fanout_compat = False
1466+
1467+
_legacy_keyprefix_fanout = None
1468+
_seen_fanout_payloads = None
1469+
_seen_fanout_payload_order = None
1470+
_SEEN_FANOUT_MAX = 1000
1471+
1472+
def __init__(self, *args, **kwargs):
1473+
self._seen_fanout_payloads = set()
1474+
self._seen_fanout_payload_order = []
1475+
super().__init__(*args, **kwargs)
1476+
14481477
def _sentinel_managed_pool(self, asynchronous=False):
14491478
connparams = self._connparams(asynchronous)
14501479

@@ -1490,9 +1519,130 @@ def _sentinel_managed_pool(self, asynchronous=False):
14901519

14911520
def _get_pool(self, asynchronous=False):
14921521
params = self._connparams(asynchronous=asynchronous)
1522+
if self._legacy_keyprefix_fanout is None:
1523+
self._legacy_keyprefix_fanout = self.keyprefix_fanout
14931524
self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
14941525
return self._sentinel_managed_pool(asynchronous)
14951526

1527+
# -- Backward-compatible fanout helpers (gated on sentinel_fanout_compat)
1528+
1529+
def _get_legacy_publish_topic(self, exchange, routing_key):
1530+
"""Build the PUB/SUB topic using the legacy (unformatted) prefix."""
1531+
if routing_key and self.fanout_patterns:
1532+
return ''.join([
1533+
self._legacy_keyprefix_fanout,
1534+
exchange, '/', routing_key,
1535+
])
1536+
return ''.join([self._legacy_keyprefix_fanout, exchange])
1537+
1538+
def _get_legacy_subscribe_topic(self, queue):
1539+
"""Build the subscribe topic using the legacy prefix."""
1540+
exchange, routing_key = self._fanout_queues[queue]
1541+
return self._get_legacy_publish_topic(exchange, routing_key)
1542+
1543+
@property
1544+
def _compat_enabled(self):
1545+
"""True when dual-publish/dual-subscribe is active."""
1546+
return (self.sentinel_fanout_compat
1547+
and self._legacy_keyprefix_fanout is not None
1548+
and self._legacy_keyprefix_fanout != self.keyprefix_fanout)
1549+
1550+
def _put_fanout(self, exchange, message, routing_key, **kwargs):
1551+
"""Deliver fanout message.
1552+
1553+
When ``sentinel_fanout_compat`` is enabled, publishes to *both*
1554+
the new formatted topic and the legacy literal topic so that
1555+
workers on kombu < 5.4.0 still receive broadcast messages.
1556+
"""
1557+
super()._put_fanout(exchange, message, routing_key, **kwargs)
1558+
if self._compat_enabled:
1559+
legacy_topic = self._get_legacy_publish_topic(
1560+
exchange, routing_key)
1561+
with self.conn_or_acquire() as client:
1562+
client.publish(legacy_topic, dumps(message))
1563+
1564+
def _subscribe(self):
1565+
"""Subscribe to fanout PUB/SUB patterns.
1566+
1567+
When ``sentinel_fanout_compat`` is enabled, subscribes to both
1568+
the new formatted pattern and the legacy literal pattern.
1569+
"""
1570+
keys = [self._get_subscribe_topic(queue)
1571+
for queue in self.active_fanout_queues]
1572+
if self._compat_enabled:
1573+
for queue in self.active_fanout_queues:
1574+
legacy_key = self._get_legacy_subscribe_topic(queue)
1575+
if legacy_key not in keys:
1576+
keys.append(legacy_key)
1577+
if not keys:
1578+
return
1579+
c = self.subclient
1580+
if c.connection._sock is None:
1581+
c.connection.connect()
1582+
self._in_listen = c.connection
1583+
c.psubscribe(keys)
1584+
1585+
def _unsubscribe_from(self, queue):
1586+
"""Unsubscribe from fanout topics for *queue*."""
1587+
topic = self._get_subscribe_topic(queue)
1588+
c = self.subclient
1589+
topics = [topic]
1590+
if self._compat_enabled:
1591+
legacy_topic = self._get_legacy_subscribe_topic(queue)
1592+
if legacy_topic != topic:
1593+
topics.append(legacy_topic)
1594+
if c.connection and c.connection._sock:
1595+
c.unsubscribe(topics)
1596+
1597+
def _receive_one(self, c):
1598+
"""Receive one fanout message, with deduplication when compat is on.
1599+
1600+
When ``sentinel_fanout_compat`` is enabled we subscribe to both
1601+
the new and legacy topics, so the same message may arrive twice.
1602+
A bounded set of recent payload hashes is used to deduplicate.
1603+
"""
1604+
if not self.sentinel_fanout_compat:
1605+
return super()._receive_one(c)
1606+
1607+
response = None
1608+
try:
1609+
response = c.parse_response()
1610+
except self.connection_errors:
1611+
self._in_listen = None
1612+
raise
1613+
if isinstance(response, (list, tuple)):
1614+
payload = self._handle_message(c, response)
1615+
if bytes_to_str(payload['type']).endswith('message'):
1616+
channel = bytes_to_str(payload['channel'])
1617+
if payload['data']:
1618+
# -- deduplicate
1619+
raw = payload['data']
1620+
if isinstance(raw, str):
1621+
raw = raw.encode()
1622+
h = hash(raw)
1623+
if h in self._seen_fanout_payloads:
1624+
return False
1625+
self._seen_fanout_payloads.add(h)
1626+
self._seen_fanout_payload_order.append(h)
1627+
if len(self._seen_fanout_payload_order) > \
1628+
self._SEEN_FANOUT_MAX:
1629+
old = self._seen_fanout_payload_order.pop(0)
1630+
self._seen_fanout_payloads.discard(old)
1631+
# -- deliver
1632+
if channel[0] == '/':
1633+
_, _, channel = channel.partition('.')
1634+
try:
1635+
message = loads(bytes_to_str(payload['data']))
1636+
except (TypeError, ValueError):
1637+
warning(
1638+
'Cannot process event on channel %r: %s',
1639+
channel, repr(payload)[:4096], exc_info=1)
1640+
raise Empty()
1641+
exchange = channel.split('/', 1)[0]
1642+
self.connection._deliver(
1643+
message, self._fanout_to_queue[exchange])
1644+
return True
1645+
14961646

14971647
class SentinelTransport(Transport):
14981648
"""Redis Sentinel Transport."""

0 commit comments

Comments
 (0)