Skip to content

Commit a623b2e

Browse files
authored
Adding OSS Notification Classes for SMIGRATING and SMIGRATED. Handling of SMIGRATING is completed and covered with tests. (#3849)
* Adding maintenance notifications for OSS API enabled connections * Adding oss maint notifications handler configurations to parsers. Placeholder for smigrated handler in OSSMaintNotificationsHandler class * Applying review comments. Finilized the notifications format.
1 parent b881d55 commit a623b2e

File tree

7 files changed

+631
-65
lines changed

7 files changed

+631
-65
lines changed

redis/_parsers/base.py

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
NodeMigratedNotification,
1212
NodeMigratingNotification,
1313
NodeMovingNotification,
14+
OSSNodeMigratedNotification,
15+
OSSNodeMigratingNotification,
1416
)
1517

1618
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
@@ -179,16 +181,39 @@ async def read_response(
179181
class MaintenanceNotificationsParser:
180182
"""Protocol defining maintenance push notification parsing functionality"""
181183

184+
@staticmethod
185+
def parse_oss_maintenance_start_msg(response):
186+
# Expected message format is:
187+
# SMIGRATING <seq_number> <slot, range1-range2,...>
188+
id = response[1]
189+
slots = response[2]
190+
return OSSNodeMigratingNotification(id, slots)
191+
192+
@staticmethod
193+
def parse_oss_maintenance_completed_msg(response):
194+
# Expected message format is:
195+
# SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
196+
id = response[1]
197+
node_address = response[2]
198+
slots = response[3]
199+
return OSSNodeMigratedNotification(id, node_address, slots)
200+
182201
@staticmethod
183202
def parse_maintenance_start_msg(response, notification_type):
184203
# Expected message format is: <notification_type> <seq_number> <time>
204+
# Examples:
205+
# MIGRATING 1 10
206+
# FAILING_OVER 2 20
185207
id = response[1]
186208
ttl = response[2]
187209
return notification_type(id, ttl)
188210

189211
@staticmethod
190212
def parse_maintenance_completed_msg(response, notification_type):
191213
# Expected message format is: <notification_type> <seq_number>
214+
# Examples:
215+
# MIGRATED 1
216+
# FAILED_OVER 2
192217
id = response[1]
193218
return notification_type(id)
194219

@@ -215,12 +240,15 @@ def parse_moving_msg(response):
215240
_MIGRATED_MESSAGE = "MIGRATED"
216241
_FAILING_OVER_MESSAGE = "FAILING_OVER"
217242
_FAILED_OVER_MESSAGE = "FAILED_OVER"
243+
_SMIGRATING_MESSAGE = "SMIGRATING"
244+
_SMIGRATED_MESSAGE = "SMIGRATED"
218245

219246
_MAINTENANCE_MESSAGES = (
220247
_MIGRATING_MESSAGE,
221248
_MIGRATED_MESSAGE,
222249
_FAILING_OVER_MESSAGE,
223250
_FAILED_OVER_MESSAGE,
251+
_SMIGRATING_MESSAGE,
224252
)
225253

226254
MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
@@ -246,6 +274,14 @@ def parse_moving_msg(response):
246274
NodeMovingNotification,
247275
MaintenanceNotificationsParser.parse_moving_msg,
248276
),
277+
_SMIGRATING_MESSAGE: (
278+
OSSNodeMigratingNotification,
279+
MaintenanceNotificationsParser.parse_oss_maintenance_start_msg,
280+
),
281+
_SMIGRATED_MESSAGE: (
282+
OSSNodeMigratedNotification,
283+
MaintenanceNotificationsParser.parse_oss_maintenance_completed_msg,
284+
),
249285
}
250286

251287

@@ -256,6 +292,7 @@ class PushNotificationsParser(Protocol):
256292
invalidation_push_handler_func: Optional[Callable] = None
257293
node_moving_push_handler_func: Optional[Callable] = None
258294
maintenance_push_handler_func: Optional[Callable] = None
295+
oss_cluster_maint_push_handler_func: Optional[Callable] = None
259296

260297
def handle_pubsub_push_response(self, response):
261298
"""Handle pubsub push responses"""
@@ -270,6 +307,7 @@ def handle_push_response(self, response, **kwargs):
270307
_INVALIDATION_MESSAGE,
271308
*_MAINTENANCE_MESSAGES,
272309
_MOVING_MESSAGE,
310+
_SMIGRATED_MESSAGE,
273311
):
274312
return self.pubsub_push_handler_func(response)
275313

@@ -292,13 +330,27 @@ def handle_push_response(self, response, **kwargs):
292330
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
293331
msg_type
294332
][1]
295-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
296-
msg_type
297-
][0]
298-
notification = parser_function(response, notification_type)
333+
if msg_type == _SMIGRATING_MESSAGE:
334+
notification = parser_function(response)
335+
else:
336+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
337+
msg_type
338+
][0]
339+
notification = parser_function(response, notification_type)
299340

300341
if notification is not None:
301342
return self.maintenance_push_handler_func(notification)
343+
if (
344+
msg_type == _SMIGRATED_MESSAGE
345+
and self.oss_cluster_maint_push_handler_func
346+
):
347+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
348+
msg_type
349+
][1]
350+
notification = parser_function(response)
351+
352+
if notification is not None:
353+
return self.oss_cluster_maint_push_handler_func(notification)
302354
except Exception as e:
303355
logger.error(
304356
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -318,6 +370,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
318370
def set_maintenance_push_handler(self, maintenance_push_handler_func):
319371
self.maintenance_push_handler_func = maintenance_push_handler_func
320372

373+
def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
374+
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
375+
321376

322377
class AsyncPushNotificationsParser(Protocol):
323378
"""Protocol defining async RESP3-specific parsing functionality"""
@@ -326,6 +381,7 @@ class AsyncPushNotificationsParser(Protocol):
326381
invalidation_push_handler_func: Optional[Callable] = None
327382
node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
328383
maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
384+
oss_cluster_maint_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
329385

330386
async def handle_pubsub_push_response(self, response):
331387
"""Handle pubsub push responses asynchronously"""
@@ -342,6 +398,7 @@ async def handle_push_response(self, response, **kwargs):
342398
_INVALIDATION_MESSAGE,
343399
*_MAINTENANCE_MESSAGES,
344400
_MOVING_MESSAGE,
401+
_SMIGRATED_MESSAGE,
345402
):
346403
return await self.pubsub_push_handler_func(response)
347404

@@ -366,13 +423,26 @@ async def handle_push_response(self, response, **kwargs):
366423
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
367424
msg_type
368425
][1]
369-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
370-
msg_type
371-
][0]
372-
notification = parser_function(response, notification_type)
426+
if msg_type == _SMIGRATING_MESSAGE:
427+
notification = parser_function(response)
428+
else:
429+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
430+
msg_type
431+
][0]
432+
notification = parser_function(response, notification_type)
373433

374434
if notification is not None:
375435
return await self.maintenance_push_handler_func(notification)
436+
if (
437+
msg_type == _SMIGRATED_MESSAGE
438+
and self.oss_cluster_maint_push_handler_func
439+
):
440+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
441+
msg_type
442+
][1]
443+
notification = parser_function(response)
444+
if notification is not None:
445+
return await self.oss_cluster_maint_push_handler_func(notification)
376446
except Exception as e:
377447
logger.error(
378448
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -394,6 +464,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
394464
def set_maintenance_push_handler(self, maintenance_push_handler_func):
395465
self.maintenance_push_handler_func = maintenance_push_handler_func
396466

467+
def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
468+
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
469+
397470

398471
class _AsyncRESPBase(AsyncBaseParser):
399472
"""Base class for async resp parsing"""

redis/_parsers/hiredis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, socket_read_size):
4949
self.pubsub_push_handler_func = self.handle_pubsub_push_response
5050
self.node_moving_push_handler_func = None
5151
self.maintenance_push_handler_func = None
52+
self.oss_cluster_maint_push_handler_func = None
5253
self.invalidation_push_handler_func = None
5354
self._hiredis_PushNotificationType = None
5455

redis/_parsers/resp3.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __init__(self, socket_read_size):
2020
self.pubsub_push_handler_func = self.handle_pubsub_push_response
2121
self.node_moving_push_handler_func = None
2222
self.maintenance_push_handler_func = None
23+
self.oss_cluster_maint_push_handler_func = None
2324
self.invalidation_push_handler_func = None
2425

2526
def handle_pubsub_push_response(self, response):

redis/maint_notifications.py

Lines changed: 152 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
import threading
66
import time
77
from abc import ABC, abstractmethod
8-
from typing import TYPE_CHECKING, Literal, Optional, Union
8+
from typing import TYPE_CHECKING, List, Literal, Optional, Union
99

1010
from redis.typing import Number
1111

12+
if TYPE_CHECKING:
13+
from redis.cluster import NodesManager
14+
1215

1316
class MaintenanceState(enum.Enum):
1417
NONE = "none"
@@ -394,6 +397,125 @@ def __hash__(self) -> int:
394397
return hash((self.__class__.__name__, int(self.id)))
395398

396399

400+
class OSSNodeMigratingNotification(MaintenanceNotification):
401+
"""
402+
Notification for when a Redis OSS API client is used and a node is in the process of migrating slots.
403+
404+
This notification is received when a node starts migrating its slots to another node
405+
during cluster rebalancing or maintenance operations.
406+
407+
Args:
408+
id (int): Unique identifier for this notification
409+
slots (Optional[List[int]]): List of slots being migrated
410+
"""
411+
412+
DEFAULT_TTL = 30
413+
414+
def __init__(
415+
self,
416+
id: int,
417+
slots: Optional[List[int]] = None,
418+
):
419+
super().__init__(id, OSSNodeMigratingNotification.DEFAULT_TTL)
420+
self.slots = slots
421+
422+
def __repr__(self) -> str:
423+
expiry_time = self.creation_time + self.ttl
424+
remaining = max(0, expiry_time - time.monotonic())
425+
return (
426+
f"{self.__class__.__name__}("
427+
f"id={self.id}, "
428+
f"slots={self.slots}, "
429+
f"ttl={self.ttl}, "
430+
f"creation_time={self.creation_time}, "
431+
f"expires_at={expiry_time}, "
432+
f"remaining={remaining:.1f}s, "
433+
f"expired={self.is_expired()}"
434+
f")"
435+
)
436+
437+
def __eq__(self, other) -> bool:
438+
"""
439+
Two OSSNodeMigratingNotification notifications are considered equal if they have the same
440+
id and are of the same type.
441+
"""
442+
if not isinstance(other, OSSNodeMigratingNotification):
443+
return False
444+
return self.id == other.id and type(self) is type(other)
445+
446+
def __hash__(self) -> int:
447+
"""
448+
Return a hash value for the notification to allow
449+
instances to be used in sets and as dictionary keys.
450+
451+
Returns:
452+
int: Hash value based on notification type and id
453+
"""
454+
return hash((self.__class__.__name__, int(self.id)))
455+
456+
457+
class OSSNodeMigratedNotification(MaintenanceNotification):
458+
"""
459+
Notification for when a Redis OSS API client is used and a node has completed migrating slots.
460+
461+
This notification is received when a node has finished migrating all its slots
462+
to other nodes during cluster rebalancing or maintenance operations.
463+
464+
Args:
465+
id (int): Unique identifier for this notification
466+
node_address (Optional[str]): Address of the node that has
467+
completed migration - this is the destination node.
468+
slots (Optional[List[int]]): List of slots that have been migrated
469+
"""
470+
471+
DEFAULT_TTL = 30
472+
473+
def __init__(
474+
self,
475+
id: int,
476+
node_address: Optional[str] = None,
477+
slots: Optional[List[int]] = None,
478+
):
479+
super().__init__(id, OSSNodeMigratedNotification.DEFAULT_TTL)
480+
self.node_address = node_address
481+
self.slots = slots
482+
483+
def __repr__(self) -> str:
484+
expiry_time = self.creation_time + self.ttl
485+
remaining = max(0, expiry_time - time.monotonic())
486+
return (
487+
f"{self.__class__.__name__}("
488+
f"id={self.id}, "
489+
f"node_address={self.node_address}, "
490+
f"slots={self.slots}, "
491+
f"ttl={self.ttl}, "
492+
f"creation_time={self.creation_time}, "
493+
f"expires_at={expiry_time}, "
494+
f"remaining={remaining:.1f}s, "
495+
f"expired={self.is_expired()}"
496+
f")"
497+
)
498+
499+
def __eq__(self, other) -> bool:
500+
"""
501+
Two OSSNodeMigratedNotification notifications are considered equal if they have the same
502+
id and are of the same type.
503+
"""
504+
if not isinstance(other, OSSNodeMigratedNotification):
505+
return False
506+
return self.id == other.id and type(self) is type(other)
507+
508+
def __hash__(self) -> int:
509+
"""
510+
Return a hash value for the notification to allow
511+
instances to be used in sets and as dictionary keys.
512+
513+
Returns:
514+
int: Hash value based on notification type and id
515+
"""
516+
return hash((self.__class__.__name__, int(self.id)))
517+
518+
397519
def _is_private_fqdn(host: str) -> bool:
398520
"""
399521
Determine if an FQDN is likely to be internal/private.
@@ -755,6 +877,7 @@ class MaintNotificationsConnectionHandler:
755877
_NOTIFICATION_TYPES: dict[type["MaintenanceNotification"], int] = {
756878
NodeMigratingNotification: 1,
757879
NodeFailingOverNotification: 1,
880+
OSSNodeMigratingNotification: 1,
758881
NodeMigratedNotification: 0,
759882
NodeFailedOverNotification: 0,
760883
}
@@ -808,3 +931,31 @@ def handle_maintenance_completed_notification(self):
808931
# timeouts by providing -1 as the relaxed timeout
809932
self.connection.update_current_socket_timeout(-1)
810933
self.connection.maintenance_state = MaintenanceState.NONE
934+
935+
936+
class OSSMaintNotificationsHandler:
937+
def __init__(
938+
self, nodes_manager: "NodesManager", config: MaintNotificationsConfig
939+
) -> None:
940+
self.nodes_manager = nodes_manager
941+
self.config = config
942+
self._processed_notifications = set()
943+
self._lock = threading.RLock()
944+
945+
def remove_expired_notifications(self):
946+
with self._lock:
947+
for notification in tuple(self._processed_notifications):
948+
if notification.is_expired():
949+
self._processed_notifications.remove(notification)
950+
951+
def handle_notification(self, notification: MaintenanceNotification):
952+
if isinstance(notification, OSSNodeMigratedNotification):
953+
self.handle_oss_maintenance_completed_notification(notification)
954+
else:
955+
logging.error(f"Unhandled notification type: {notification}")
956+
957+
def handle_oss_maintenance_completed_notification(
958+
self, notification: OSSNodeMigratedNotification
959+
):
960+
self.remove_expired_notifications()
961+
logging.info(f"Received OSS maintenance completed notification: {notification}")

0 commit comments

Comments
 (0)