Skip to content

Commit 4cdf082

Browse files
Support for maintenance push notifications handling during server upgrade or maintenance procedures. (#3756)
* Hitless upgrade: Support initial implementation for synchronous Redis client - no handshake, no failing over notifications support. (#3713) * Adding handling of FAILING_OVER and FAILED_OVER events/push notifications (#3716) * Hitless upgrade: Adding handshake command to enable the notifications after connection is established (#3735) * Hitless-Upgrade: Add handling of MOVING push notification with "null" host:port info. (#3751) --------- Co-authored-by: Elena Kolevska <[email protected]>
1 parent b5a8ef0 commit 4cdf082

File tree

9 files changed

+4926
-94
lines changed

9 files changed

+4926
-94
lines changed

redis/_parsers/base.py

Lines changed: 173 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
1+
import logging
12
import sys
23
from abc import ABC
34
from asyncio import IncompleteReadError, StreamReader, TimeoutError
4-
from typing import Callable, List, Optional, Protocol, Union
5+
from typing import Awaitable, Callable, List, Optional, Protocol, Union
6+
7+
from redis.maintenance_events import (
8+
MaintenanceEvent,
9+
NodeFailedOverEvent,
10+
NodeFailingOverEvent,
11+
NodeMigratedEvent,
12+
NodeMigratingEvent,
13+
NodeMovingEvent,
14+
)
515

616
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
717
from asyncio import timeout as async_timeout
@@ -50,6 +60,8 @@
5060
"Client sent AUTH, but no password is set": AuthenticationError,
5161
}
5262

63+
logger = logging.getLogger(__name__)
64+
5365

5466
class BaseParser(ABC):
5567
EXCEPTION_CLASSES = {
@@ -158,48 +170,195 @@ async def read_response(
158170
raise NotImplementedError()
159171

160172

161-
_INVALIDATION_MESSAGE = [b"invalidate", "invalidate"]
173+
class MaintenanceNotificationsParser:
174+
"""Protocol defining maintenance push notification parsing functionality"""
175+
176+
@staticmethod
177+
def parse_maintenance_start_msg(response, notification_type):
178+
# Expected message format is: <event_type> <seq_number> <time>
179+
id = response[1]
180+
ttl = response[2]
181+
return notification_type(id, ttl)
182+
183+
@staticmethod
184+
def parse_maintenance_completed_msg(response, notification_type):
185+
# Expected message format is: <event_type> <seq_number>
186+
id = response[1]
187+
return notification_type(id)
188+
189+
@staticmethod
190+
def parse_moving_msg(response):
191+
# Expected message format is: MOVING <seq_number> <time> <endpoint>
192+
id = response[1]
193+
ttl = response[2]
194+
if response[3] in [b"null", "null"]:
195+
host, port = None, None
196+
else:
197+
value = response[3]
198+
if isinstance(value, bytes):
199+
value = value.decode()
200+
host, port = value.split(":")
201+
port = int(port) if port is not None else None
202+
203+
return NodeMovingEvent(id, host, port, ttl)
204+
205+
206+
_INVALIDATION_MESSAGE = "invalidate"
207+
_MOVING_MESSAGE = "MOVING"
208+
_MIGRATING_MESSAGE = "MIGRATING"
209+
_MIGRATED_MESSAGE = "MIGRATED"
210+
_FAILING_OVER_MESSAGE = "FAILING_OVER"
211+
_FAILED_OVER_MESSAGE = "FAILED_OVER"
212+
213+
_MAINTENANCE_MESSAGES = (
214+
_MIGRATING_MESSAGE,
215+
_MIGRATED_MESSAGE,
216+
_FAILING_OVER_MESSAGE,
217+
_FAILED_OVER_MESSAGE,
218+
)
219+
220+
MSG_TYPE_TO_EVENT_PARSER_MAPPING: dict[str, tuple[type[MaintenanceEvent], Callable]] = {
221+
_MIGRATING_MESSAGE: (
222+
NodeMigratingEvent,
223+
MaintenanceNotificationsParser.parse_maintenance_start_msg,
224+
),
225+
_MIGRATED_MESSAGE: (
226+
NodeMigratedEvent,
227+
MaintenanceNotificationsParser.parse_maintenance_completed_msg,
228+
),
229+
_FAILING_OVER_MESSAGE: (
230+
NodeFailingOverEvent,
231+
MaintenanceNotificationsParser.parse_maintenance_start_msg,
232+
),
233+
_FAILED_OVER_MESSAGE: (
234+
NodeFailedOverEvent,
235+
MaintenanceNotificationsParser.parse_maintenance_completed_msg,
236+
),
237+
_MOVING_MESSAGE: (
238+
NodeMovingEvent,
239+
MaintenanceNotificationsParser.parse_moving_msg,
240+
),
241+
}
162242

163243

164244
class PushNotificationsParser(Protocol):
165245
"""Protocol defining RESP3-specific parsing functionality"""
166246

167247
pubsub_push_handler_func: Callable
168248
invalidation_push_handler_func: Optional[Callable] = None
249+
node_moving_push_handler_func: Optional[Callable] = None
250+
maintenance_push_handler_func: Optional[Callable] = None
169251

170252
def handle_pubsub_push_response(self, response):
171253
"""Handle pubsub push responses"""
172254
raise NotImplementedError()
173255

174256
def handle_push_response(self, response, **kwargs):
175-
if response[0] not in _INVALIDATION_MESSAGE:
257+
msg_type = response[0]
258+
if isinstance(msg_type, bytes):
259+
msg_type = msg_type.decode()
260+
261+
if msg_type not in (
262+
_INVALIDATION_MESSAGE,
263+
*_MAINTENANCE_MESSAGES,
264+
_MOVING_MESSAGE,
265+
):
176266
return self.pubsub_push_handler_func(response)
177-
if self.invalidation_push_handler_func:
178-
return self.invalidation_push_handler_func(response)
267+
268+
try:
269+
if (
270+
msg_type == _INVALIDATION_MESSAGE
271+
and self.invalidation_push_handler_func
272+
):
273+
return self.invalidation_push_handler_func(response)
274+
275+
if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
276+
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
277+
278+
notification = parser_function(response)
279+
return self.node_moving_push_handler_func(notification)
280+
281+
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
282+
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
283+
notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0]
284+
notification = parser_function(response, notification_type)
285+
286+
if notification is not None:
287+
return self.maintenance_push_handler_func(notification)
288+
except Exception as e:
289+
logger.error(
290+
"Error handling {} message ({}): {}".format(msg_type, response, e)
291+
)
292+
293+
return None
179294

180295
def set_pubsub_push_handler(self, pubsub_push_handler_func):
181296
self.pubsub_push_handler_func = pubsub_push_handler_func
182297

183298
def set_invalidation_push_handler(self, invalidation_push_handler_func):
184299
self.invalidation_push_handler_func = invalidation_push_handler_func
185300

301+
def set_node_moving_push_handler(self, node_moving_push_handler_func):
302+
self.node_moving_push_handler_func = node_moving_push_handler_func
303+
304+
def set_maintenance_push_handler(self, maintenance_push_handler_func):
305+
self.maintenance_push_handler_func = maintenance_push_handler_func
306+
186307

187308
class AsyncPushNotificationsParser(Protocol):
188309
"""Protocol defining async RESP3-specific parsing functionality"""
189310

190311
pubsub_push_handler_func: Callable
191312
invalidation_push_handler_func: Optional[Callable] = None
313+
node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
314+
maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
192315

193316
async def handle_pubsub_push_response(self, response):
194317
"""Handle pubsub push responses asynchronously"""
195318
raise NotImplementedError()
196319

197320
async def handle_push_response(self, response, **kwargs):
198321
"""Handle push responses asynchronously"""
199-
if response[0] not in _INVALIDATION_MESSAGE:
322+
323+
msg_type = response[0]
324+
if isinstance(msg_type, bytes):
325+
msg_type = msg_type.decode()
326+
327+
if msg_type not in (
328+
_INVALIDATION_MESSAGE,
329+
*_MAINTENANCE_MESSAGES,
330+
_MOVING_MESSAGE,
331+
):
200332
return await self.pubsub_push_handler_func(response)
201-
if self.invalidation_push_handler_func:
202-
return await self.invalidation_push_handler_func(response)
333+
334+
try:
335+
if (
336+
msg_type == _INVALIDATION_MESSAGE
337+
and self.invalidation_push_handler_func
338+
):
339+
return await self.invalidation_push_handler_func(response)
340+
341+
if isinstance(msg_type, bytes):
342+
msg_type = msg_type.decode()
343+
344+
if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
345+
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
346+
notification = parser_function(response)
347+
return await self.node_moving_push_handler_func(notification)
348+
349+
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
350+
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
351+
notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0]
352+
notification = parser_function(response, notification_type)
353+
354+
if notification is not None:
355+
return await self.maintenance_push_handler_func(notification)
356+
except Exception as e:
357+
logger.error(
358+
"Error handling {} message ({}): {}".format(msg_type, response, e)
359+
)
360+
361+
return None
203362

204363
def set_pubsub_push_handler(self, pubsub_push_handler_func):
205364
"""Set the pubsub push handler function"""
@@ -209,6 +368,12 @@ def set_invalidation_push_handler(self, invalidation_push_handler_func):
209368
"""Set the invalidation push handler function"""
210369
self.invalidation_push_handler_func = invalidation_push_handler_func
211370

371+
def set_node_moving_push_handler(self, node_moving_push_handler_func):
372+
self.node_moving_push_handler_func = node_moving_push_handler_func
373+
374+
def set_maintenance_push_handler(self, maintenance_push_handler_func):
375+
self.maintenance_push_handler_func = maintenance_push_handler_func
376+
212377

213378
class _AsyncRESPBase(AsyncBaseParser):
214379
"""Base class for async resp parsing"""

redis/_parsers/hiredis.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ def __init__(self, socket_read_size):
4747
self.socket_read_size = socket_read_size
4848
self._buffer = bytearray(socket_read_size)
4949
self.pubsub_push_handler_func = self.handle_pubsub_push_response
50+
self.node_moving_push_handler_func = None
51+
self.maintenance_push_handler_func = None
5052
self.invalidation_push_handler_func = None
5153
self._hiredis_PushNotificationType = None
5254

@@ -141,12 +143,15 @@ def read_response(self, disable_decoding=False, push_request=False):
141143
response, self._hiredis_PushNotificationType
142144
):
143145
response = self.handle_push_response(response)
144-
if not push_request:
145-
return self.read_response(
146-
disable_decoding=disable_decoding, push_request=push_request
147-
)
148-
else:
146+
147+
# if this is a push request return the push response
148+
if push_request:
149149
return response
150+
151+
return self.read_response(
152+
disable_decoding=disable_decoding,
153+
push_request=push_request,
154+
)
150155
return response
151156

152157
if disable_decoding:
@@ -169,12 +174,13 @@ def read_response(self, disable_decoding=False, push_request=False):
169174
response, self._hiredis_PushNotificationType
170175
):
171176
response = self.handle_push_response(response)
172-
if not push_request:
173-
return self.read_response(
174-
disable_decoding=disable_decoding, push_request=push_request
175-
)
176-
else:
177+
if push_request:
177178
return response
179+
return self.read_response(
180+
disable_decoding=disable_decoding,
181+
push_request=push_request,
182+
)
183+
178184
elif (
179185
isinstance(response, list)
180186
and response

redis/_parsers/resp3.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ class _RESP3Parser(_RESPBase, PushNotificationsParser):
1818
def __init__(self, socket_read_size):
1919
super().__init__(socket_read_size)
2020
self.pubsub_push_handler_func = self.handle_pubsub_push_response
21+
self.node_moving_push_handler_func = None
22+
self.maintenance_push_handler_func = None
2123
self.invalidation_push_handler_func = None
2224

2325
def handle_pubsub_push_response(self, response):
@@ -117,17 +119,21 @@ def _read_response(self, disable_decoding=False, push_request=False):
117119
for _ in range(int(response))
118120
]
119121
response = self.handle_push_response(response)
120-
if not push_request:
121-
return self._read_response(
122-
disable_decoding=disable_decoding, push_request=push_request
123-
)
124-
else:
122+
123+
# if this is a push request return the push response
124+
if push_request:
125125
return response
126+
127+
return self._read_response(
128+
disable_decoding=disable_decoding,
129+
push_request=push_request,
130+
)
126131
else:
127132
raise InvalidResponse(f"Protocol Error: {raw!r}")
128133

129134
if isinstance(response, bytes) and disable_decoding is False:
130135
response = self.encoder.decode(response)
136+
131137
return response
132138

133139

0 commit comments

Comments
 (0)