Skip to content

Commit b7bf8e3

Browse files
committed
Adding oss maint notifications handler configurations to parsers. Placeholder for smigrated handler in OSSMaintNotificationsHandler class
1 parent 08d13fb commit b7bf8e3

File tree

4 files changed

+111
-8
lines changed

4 files changed

+111
-8
lines changed

redis/_parsers/base.py

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
NodeMigratedNotification,
1212
NodeMigratingNotification,
1313
NodeMovingNotification,
14+
OSSNodeMigratedNotification,
15+
OSSNodeMigratingNotification,
1416
)
1517

1618
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
@@ -179,6 +181,25 @@ async def read_response(
179181
class MaintenanceNotificationsParser:
180182
"""Protocol defining maintenance push notification parsing functionality"""
181183

184+
@staticmethod
185+
def parse_oss_maintenance_start_msg(response):
186+
# TODO This format is not the final - will be changed later
187+
# Expected message format is: SMIGRATING <seq_number> <src_node> <dest_node> <slots>
188+
id = response[1]
189+
src_node = response[2]
190+
dest_node = response[3]
191+
slots = response[4]
192+
return OSSNodeMigratingNotification(id, src_node, dest_node, slots)
193+
194+
@staticmethod
195+
def parse_oss_maintenance_completed_msg(response):
196+
# TODO This format is not the final - will be changed later
197+
# Expected message format is: SMIGRATED <seq_number> <node_address> <slots>
198+
id = response[1]
199+
node_address = response[2]
200+
slots = response[3]
201+
return OSSNodeMigratedNotification(id, node_address, slots)
202+
182203
@staticmethod
183204
def parse_maintenance_start_msg(response, notification_type):
184205
# Expected message format is: <notification_type> <seq_number> <time>
@@ -215,12 +236,15 @@ def parse_moving_msg(response):
215236
_MIGRATED_MESSAGE = "MIGRATED"
216237
_FAILING_OVER_MESSAGE = "FAILING_OVER"
217238
_FAILED_OVER_MESSAGE = "FAILED_OVER"
239+
_SMIGRATING_MESSAGE = "SMIGRATING"
240+
_SMIGRATED_MESSAGE = "SMIGRATED"
218241

219242
_MAINTENANCE_MESSAGES = (
220243
_MIGRATING_MESSAGE,
221244
_MIGRATED_MESSAGE,
222245
_FAILING_OVER_MESSAGE,
223246
_FAILED_OVER_MESSAGE,
247+
_SMIGRATING_MESSAGE,
224248
)
225249

226250
MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
@@ -246,6 +270,14 @@ def parse_moving_msg(response):
246270
NodeMovingNotification,
247271
MaintenanceNotificationsParser.parse_moving_msg,
248272
),
273+
_SMIGRATING_MESSAGE: (
274+
OSSNodeMigratingNotification,
275+
MaintenanceNotificationsParser.parse_oss_maintenance_start_msg,
276+
),
277+
_SMIGRATED_MESSAGE: (
278+
OSSNodeMigratedNotification,
279+
MaintenanceNotificationsParser.parse_oss_maintenance_completed_msg,
280+
),
249281
}
250282

251283

@@ -256,6 +288,7 @@ class PushNotificationsParser(Protocol):
256288
invalidation_push_handler_func: Optional[Callable] = None
257289
node_moving_push_handler_func: Optional[Callable] = None
258290
maintenance_push_handler_func: Optional[Callable] = None
291+
oss_maintenance_push_handler_func: Optional[Callable] = None
259292

260293
def handle_pubsub_push_response(self, response):
261294
"""Handle pubsub push responses"""
@@ -270,6 +303,7 @@ def handle_push_response(self, response, **kwargs):
270303
_INVALIDATION_MESSAGE,
271304
*_MAINTENANCE_MESSAGES,
272305
_MOVING_MESSAGE,
306+
_SMIGRATED_MESSAGE,
273307
):
274308
return self.pubsub_push_handler_func(response)
275309

@@ -292,13 +326,27 @@ def handle_push_response(self, response, **kwargs):
292326
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
293327
msg_type
294328
][1]
295-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
296-
msg_type
297-
][0]
298-
notification = parser_function(response, notification_type)
329+
if msg_type == _SMIGRATING_MESSAGE:
330+
notification = parser_function(response)
331+
else:
332+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
333+
msg_type
334+
][0]
335+
notification = parser_function(response, notification_type)
299336

300337
if notification is not None:
301338
return self.maintenance_push_handler_func(notification)
339+
if (
340+
msg_type == _SMIGRATED_MESSAGE
341+
and self.oss_maintenance_push_handler_func
342+
):
343+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
344+
msg_type
345+
][1]
346+
notification = parser_function(response)
347+
348+
if notification is not None:
349+
return self.oss_maintenance_push_handler_func(notification)
302350
except Exception as e:
303351
logger.error(
304352
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -318,6 +366,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
318366
def set_maintenance_push_handler(self, maintenance_push_handler_func):
319367
self.maintenance_push_handler_func = maintenance_push_handler_func
320368

369+
def set_oss_maintenance_push_handler(self, oss_maintenance_push_handler_func):
370+
self.oss_maintenance_push_handler_func = oss_maintenance_push_handler_func
371+
321372

322373
class AsyncPushNotificationsParser(Protocol):
323374
"""Protocol defining async RESP3-specific parsing functionality"""
@@ -326,6 +377,7 @@ class AsyncPushNotificationsParser(Protocol):
326377
invalidation_push_handler_func: Optional[Callable] = None
327378
node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
328379
maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
380+
oss_maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
329381

330382
async def handle_pubsub_push_response(self, response):
331383
"""Handle pubsub push responses asynchronously"""
@@ -342,6 +394,7 @@ async def handle_push_response(self, response, **kwargs):
342394
_INVALIDATION_MESSAGE,
343395
*_MAINTENANCE_MESSAGES,
344396
_MOVING_MESSAGE,
397+
_SMIGRATED_MESSAGE,
345398
):
346399
return await self.pubsub_push_handler_func(response)
347400

@@ -366,13 +419,28 @@ async def handle_push_response(self, response, **kwargs):
366419
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
367420
msg_type
368421
][1]
369-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
370-
msg_type
371-
][0]
372-
notification = parser_function(response, notification_type)
422+
if msg_type == _SMIGRATING_MESSAGE:
423+
notification = parser_function(response)
424+
else:
425+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
426+
msg_type
427+
][0]
428+
notification = parser_function(response, notification_type)
373429

374430
if notification is not None:
375431
return await self.maintenance_push_handler_func(notification)
432+
if (
433+
msg_type == _SMIGRATED_MESSAGE
434+
and self.oss_maintenance_push_handler_func
435+
):
436+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
437+
msg_type
438+
][1]
439+
notification = parser_function(response)
440+
if notification is not None:
441+
return await self.oss_maintenance_push_handler_func(
442+
notification
443+
)
376444
except Exception as e:
377445
logger.error(
378446
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -394,6 +462,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
394462
def set_maintenance_push_handler(self, maintenance_push_handler_func):
395463
self.maintenance_push_handler_func = maintenance_push_handler_func
396464

465+
def set_oss_maintenance_push_handler(self, oss_maintenance_push_handler_func):
466+
self.oss_maintenance_push_handler_func = oss_maintenance_push_handler_func
467+
397468

398469
class _AsyncRESPBase(AsyncBaseParser):
399470
"""Base class for async resp parsing"""

redis/_parsers/hiredis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, socket_read_size):
4949
self.pubsub_push_handler_func = self.handle_pubsub_push_response
5050
self.node_moving_push_handler_func = None
5151
self.maintenance_push_handler_func = None
52+
self.oss_maintenance_push_handler_func = None
5253
self.invalidation_push_handler_func = None
5354
self._hiredis_PushNotificationType = None
5455

redis/_parsers/resp3.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __init__(self, socket_read_size):
2020
self.pubsub_push_handler_func = self.handle_pubsub_push_response
2121
self.node_moving_push_handler_func = None
2222
self.maintenance_push_handler_func = None
23+
self.oss_maintenance_push_handler_func = None
2324
self.invalidation_push_handler_func = None
2425

2526
def handle_pubsub_push_response(self, response):

redis/maint_notifications.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from abc import ABC, abstractmethod
88
from typing import TYPE_CHECKING, List, Literal, Optional, Union
99

10+
from redis.cluster import NodesManager
1011
from redis.typing import Number
1112

1213

@@ -886,6 +887,7 @@ class MaintNotificationsConnectionHandler:
886887
_NOTIFICATION_TYPES: dict[type["MaintenanceNotification"], int] = {
887888
NodeMigratingNotification: 1,
888889
NodeFailingOverNotification: 1,
890+
OSSNodeMigratingNotification: 1,
889891
NodeMigratedNotification: 0,
890892
NodeFailedOverNotification: 0,
891893
}
@@ -939,3 +941,31 @@ def handle_maintenance_completed_notification(self):
939941
# timeouts by providing -1 as the relaxed timeout
940942
self.connection.update_current_socket_timeout(-1)
941943
self.connection.maintenance_state = MaintenanceState.NONE
944+
945+
946+
class OSSMaintNotificationsHandler:
947+
def __init__(
948+
self, nodes_manager: "NodesManager", config: MaintNotificationsConfig
949+
) -> None:
950+
self.nodes_manager = nodes_manager
951+
self.config = config
952+
self._processed_notifications = set()
953+
self._lock = threading.RLock()
954+
955+
def remove_expired_notifications(self):
956+
with self._lock:
957+
for notification in tuple(self._processed_notifications):
958+
if notification.is_expired():
959+
self._processed_notifications.remove(notification)
960+
961+
def handle_notification(self, notification: MaintenanceNotification):
962+
if isinstance(notification, OSSNodeMigratedNotification):
963+
self.handle_oss_maintenance_completed_notification(notification)
964+
else:
965+
logging.error(f"Unhandled notification type: {notification}")
966+
967+
def handle_oss_maintenance_completed_notification(
968+
self, notification: OSSNodeMigratedNotification
969+
):
970+
self.remove_expired_notifications()
971+
logging.info(f"Received OSS maintenance completed notification: {notification}")

0 commit comments

Comments
 (0)