From e0847c9c9776be49e688e55c5089e632e1274757 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Mon, 17 Nov 2025 17:50:17 +0200 Subject: [PATCH 1/3] Adding maintenance notifications for OSS API enabled connections --- redis/maint_notifications.py | 133 +++++++++- .../test_maint_notifications.py | 234 ++++++++++++++++++ 2 files changed, 366 insertions(+), 1 deletion(-) diff --git a/redis/maint_notifications.py b/redis/maint_notifications.py index 5b8b08c1be..824d9b5c56 100644 --- a/redis/maint_notifications.py +++ b/redis/maint_notifications.py @@ -5,7 +5,7 @@ import threading import time from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Literal, Optional, Union +from typing import TYPE_CHECKING, List, Literal, Optional, Union from redis.typing import Number @@ -394,6 +394,137 @@ def __hash__(self) -> int: return hash((self.__class__.__name__, int(self.id))) +class OSSNodeMigratingNotification(MaintenanceNotification): + """ + Notification for when a Redis OSS API client is used and a node is in the process of migrating slots. + + This notification is received when a node starts migrating its slots to another node + during cluster rebalancing or maintenance operations. + + Args: + id (int): Unique identifier for this notification + src_node (Optional[str]): Source node address - the notifications + received by the connections to the src node will + receive the dest node address + dest_node (Optional[str]): Destination node address - the notifications + received by the connections to the dst node will + receive the src node address + slots (Optional[List[int]]): List of slots being migrated + """ + + DEFAULT_TTL = 30 + + def __init__( + self, + id: int, + src_node: Optional[str] = None, + dest_node: Optional[str] = None, + slots: Optional[List[int]] = None, + ): + super().__init__(id, OSSNodeMigratingNotification.DEFAULT_TTL) + self.slots = slots + self.src_node = src_node + self.dest_node = dest_node + + def __repr__(self) -> str: + expiry_time = self.creation_time + self.ttl + remaining = max(0, expiry_time - time.monotonic()) + return ( + f"{self.__class__.__name__}(" + f"id={self.id}, " + f"src_node={self.src_node}, " + f"dest_node={self.dest_node}, " + f"slots={self.slots}, " + f"ttl={self.ttl}, " + f"creation_time={self.creation_time}, " + f"expires_at={expiry_time}, " + f"remaining={remaining:.1f}s, " + f"expired={self.is_expired()}" + f")" + ) + + def __eq__(self, other) -> bool: + """ + Two OSSNodeMigratingNotification notifications are considered equal if they have the same + id and are of the same type. + """ + if not isinstance(other, OSSNodeMigratingNotification): + return False + return self.id == other.id and type(self) is type(other) + + def __hash__(self) -> int: + """ + Return a hash value for the notification to allow + instances to be used in sets and as dictionary keys. + + Returns: + int: Hash value based on notification type and id + """ + return hash((self.__class__.__name__, int(self.id))) + + +class OSSNodeMigratedNotification(MaintenanceNotification): + """ + Notification for when a Redis OSS API client is used and a node has completed migrating slots. + + This notification is received when a node has finished migrating all its slots + to other nodes during cluster rebalancing or maintenance operations. + + Args: + id (int): Unique identifier for this notification + node_address (Optional[str]): Address of the node that has + completed migration - this is the destination node. + slots (Optional[List[int]]): List of slots that have been migrated + """ + + DEFAULT_TTL = 30 + + def __init__( + self, + id: int, + node_address: Optional[str] = None, + slots: Optional[List[int]] = None, + ): + super().__init__(id, OSSNodeMigratedNotification.DEFAULT_TTL) + self.node_address = node_address + self.slots = slots + + def __repr__(self) -> str: + expiry_time = self.creation_time + self.ttl + remaining = max(0, expiry_time - time.monotonic()) + return ( + f"{self.__class__.__name__}(" + f"id={self.id}, " + f"node_address={self.node_address}, " + f"slots={self.slots}, " + f"ttl={self.ttl}, " + f"creation_time={self.creation_time}, " + f"expires_at={expiry_time}, " + f"remaining={remaining:.1f}s, " + f"expired={self.is_expired()}" + f")" + ) + + def __eq__(self, other) -> bool: + """ + Two OSSNodeMigratedNotification notifications are considered equal if they have the same + id and are of the same type. + """ + if not isinstance(other, OSSNodeMigratedNotification): + return False + return self.id == other.id and type(self) is type(other) + + def __hash__(self) -> int: + """ + Return a hash value for the notification to allow + instances to be used in sets and as dictionary keys. + + Returns: + int: Hash value based on notification type and id + """ + return hash((self.__class__.__name__, int(self.id))) + + def _is_private_fqdn(host: str) -> bool: """ Determine if an FQDN is likely to be internal/private. diff --git a/tests/maint_notifications/test_maint_notifications.py b/tests/maint_notifications/test_maint_notifications.py index 85aa671390..d2a15775d5 100644 --- a/tests/maint_notifications/test_maint_notifications.py +++ b/tests/maint_notifications/test_maint_notifications.py @@ -11,6 +11,8 @@ NodeMigratedNotification, NodeFailingOverNotification, NodeFailedOverNotification, + OSSNodeMigratingNotification, + OSSNodeMigratedNotification, MaintNotificationsConfig, MaintNotificationsPoolHandler, MaintNotificationsConnectionHandler, @@ -381,6 +383,238 @@ def test_equality_and_hash(self): assert hash(notification1) != hash(notification3) +class TestOSSNodeMigratingNotification: + """Test the OSSNodeMigratingNotification class.""" + + def test_init_with_defaults(self): + """Test OSSNodeMigratingNotification initialization with default values.""" + with patch("time.monotonic", return_value=1000): + notification = OSSNodeMigratingNotification(id=1) + assert notification.id == 1 + assert notification.ttl == OSSNodeMigratingNotification.DEFAULT_TTL + assert notification.creation_time == 1000 + assert notification.src_node is None + assert notification.dest_node is None + assert notification.slots is None + + def test_init_with_all_parameters(self): + """Test OSSNodeMigratingNotification initialization with all parameters.""" + with patch("time.monotonic", return_value=1000): + slots = [1, 2, 3, 4, 5] + notification = OSSNodeMigratingNotification( + id=1, + src_node="127.0.0.1:6379", + dest_node="127.0.0.1:6380", + slots=slots, + ) + assert notification.id == 1 + assert notification.ttl == OSSNodeMigratingNotification.DEFAULT_TTL + assert notification.creation_time == 1000 + assert notification.src_node == "127.0.0.1:6379" + assert notification.dest_node == "127.0.0.1:6380" + assert notification.slots == slots + + def test_default_ttl(self): + """Test that DEFAULT_TTL is used correctly.""" + assert OSSNodeMigratingNotification.DEFAULT_TTL == 30 + notification = OSSNodeMigratingNotification(id=1) + assert notification.ttl == 30 + + def test_repr(self): + """Test OSSNodeMigratingNotification string representation.""" + with patch("time.monotonic", return_value=1000): + notification = OSSNodeMigratingNotification( + id=1, + src_node="127.0.0.1:6379", + dest_node="127.0.0.1:6380", + slots=[1, 2, 3], + ) + + with patch("time.monotonic", return_value=1005): # 5 seconds later + repr_str = repr(notification) + assert "OSSNodeMigratingNotification" in repr_str + assert "id=1" in repr_str + assert "ttl=30" in repr_str + assert "remaining=25.0s" in repr_str + assert "expired=False" in repr_str + + def test_equality_same_id_and_type(self): + """Test equality for notifications with same id and type.""" + notification1 = OSSNodeMigratingNotification( + id=1, + src_node="127.0.0.1:6379", + dest_node="127.0.0.1:6380", + slots=[1, 2, 3], + ) + notification2 = OSSNodeMigratingNotification( + id=1, + src_node="127.0.0.1:6381", + dest_node="127.0.0.1:6382", + slots=[4, 5, 6], + ) + # Should be equal because id and type are the same + assert notification1 == notification2 + + def test_equality_different_id(self): + """Test inequality for notifications with different id.""" + notification1 = OSSNodeMigratingNotification(id=1) + notification2 = OSSNodeMigratingNotification(id=2) + assert notification1 != notification2 + + def test_equality_different_type(self): + """Test inequality for notifications of different types.""" + notification1 = OSSNodeMigratingNotification(id=1) + notification2 = NodeMigratingNotification(id=1, ttl=30) + assert notification1 != notification2 + + def test_hash_same_id_and_type(self): + """Test hash for notifications with same id and type.""" + notification1 = OSSNodeMigratingNotification( + id=1, + src_node="127.0.0.1:6379", + dest_node="127.0.0.1:6380", + slots=[1, 2, 3], + ) + notification2 = OSSNodeMigratingNotification( + id=1, + src_node="127.0.0.1:6381", + dest_node="127.0.0.1:6382", + slots=[4, 5, 6], + ) + # Should have same hash because id and type are the same + assert hash(notification1) == hash(notification2) + + def test_hash_different_id(self): + """Test hash for notifications with different id.""" + notification1 = OSSNodeMigratingNotification(id=1) + notification2 = OSSNodeMigratingNotification(id=2) + assert hash(notification1) != hash(notification2) + + def test_in_set(self): + """Test that notifications can be used in sets.""" + notification1 = OSSNodeMigratingNotification(id=1) + notification2 = OSSNodeMigratingNotification(id=1) + notification3 = OSSNodeMigratingNotification(id=2) + notification4 = OSSNodeMigratingNotification(id=2) + + notification_set = {notification1, notification2, notification3, notification4} + assert ( + len(notification_set) == 2 + ) # notification1 and notification2 should be the same + + +class TestOSSNodeMigratedNotification: + """Test the OSSNodeMigratedNotification class.""" + + def test_init_with_defaults(self): + """Test OSSNodeMigratedNotification initialization with default values.""" + with patch("time.monotonic", return_value=1000): + notification = OSSNodeMigratedNotification(id=1) + assert notification.id == 1 + assert notification.ttl == OSSNodeMigratedNotification.DEFAULT_TTL + assert notification.creation_time == 1000 + assert notification.node_address is None + assert notification.slots is None + + def test_init_with_all_parameters(self): + """Test OSSNodeMigratedNotification initialization with all parameters.""" + with patch("time.monotonic", return_value=1000): + slots = [1, 2, 3, 4, 5] + notification = OSSNodeMigratedNotification( + id=1, + node_address="127.0.0.1:6380", + slots=slots, + ) + assert notification.id == 1 + assert notification.ttl == OSSNodeMigratedNotification.DEFAULT_TTL + assert notification.creation_time == 1000 + assert notification.node_address == "127.0.0.1:6380" + assert notification.slots == slots + + def test_default_ttl(self): + """Test that DEFAULT_TTL is used correctly.""" + assert OSSNodeMigratedNotification.DEFAULT_TTL == 30 + notification = OSSNodeMigratedNotification(id=1) + assert notification.ttl == 30 + + def test_repr(self): + """Test OSSNodeMigratedNotification string representation.""" + with patch("time.monotonic", return_value=1000): + notification = OSSNodeMigratedNotification( + id=1, + node_address="127.0.0.1:6380", + slots=[1, 2, 3], + ) + + with patch("time.monotonic", return_value=1010): # 10 seconds later + repr_str = repr(notification) + assert "OSSNodeMigratedNotification" in repr_str + assert "id=1" in repr_str + assert "ttl=30" in repr_str + assert "remaining=20.0s" in repr_str + assert "expired=False" in repr_str + + def test_equality_same_id_and_type(self): + """Test equality for notifications with same id and type.""" + notification1 = OSSNodeMigratedNotification( + id=1, + node_address="127.0.0.1:6380", + slots=[1, 2, 3], + ) + notification2 = OSSNodeMigratedNotification( + id=1, + node_address="127.0.0.1:6381", + slots=[4, 5, 6], + ) + # Should be equal because id and type are the same + assert notification1 == notification2 + + def test_equality_different_id(self): + """Test inequality for notifications with different id.""" + notification1 = OSSNodeMigratedNotification(id=1) + notification2 = OSSNodeMigratedNotification(id=2) + assert notification1 != notification2 + + def test_equality_different_type(self): + """Test inequality for notifications of different types.""" + notification1 = OSSNodeMigratedNotification(id=1) + notification2 = NodeMigratedNotification(id=1) + assert notification1 != notification2 + + def test_hash_same_id_and_type(self): + """Test hash for notifications with same id and type.""" + notification1 = OSSNodeMigratedNotification( + id=1, + node_address="127.0.0.1:6380", + slots=[1, 2, 3], + ) + notification2 = OSSNodeMigratedNotification( + id=1, + node_address="127.0.0.1:6381", + slots=[4, 5, 6], + ) + # Should have same hash because id and type are the same + assert hash(notification1) == hash(notification2) + + def test_hash_different_id(self): + """Test hash for notifications with different id.""" + notification1 = OSSNodeMigratedNotification(id=1) + notification2 = OSSNodeMigratedNotification(id=2) + assert hash(notification1) != hash(notification2) + + def test_in_set(self): + """Test that notifications can be used in sets.""" + notification1 = OSSNodeMigratedNotification(id=1) + notification2 = OSSNodeMigratedNotification(id=1) + notification3 = OSSNodeMigratedNotification(id=2) + notification4 = OSSNodeMigratedNotification(id=2) + + notification_set = {notification1, notification2, notification3, notification4} + assert ( + len(notification_set) == 2 + ) # notification1 and notification2 should be the same + + class TestMaintNotificationsConfig: """Test the MaintNotificationsConfig class.""" From e3c8b3fd34b7c03252a2fb5fadc1b683e484d94e Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Mon, 17 Nov 2025 19:25:25 +0200 Subject: [PATCH 2/3] Adding oss maint notifications handler configurations to parsers. Placeholder for smigrated handler in OSSMaintNotificationsHandler class --- redis/_parsers/base.py | 111 +++++++++- redis/_parsers/hiredis.py | 1 + redis/_parsers/resp3.py | 1 + redis/maint_notifications.py | 32 +++ .../proxy_server_helpers.py | 26 ++- ...st_cluster_maint_notifications_handling.py | 208 +++++++++++++----- 6 files changed, 315 insertions(+), 64 deletions(-) diff --git a/redis/_parsers/base.py b/redis/_parsers/base.py index 63bf5d7795..bb9c4f5fea 100644 --- a/redis/_parsers/base.py +++ b/redis/_parsers/base.py @@ -11,6 +11,8 @@ NodeMigratedNotification, NodeMigratingNotification, NodeMovingNotification, + OSSNodeMigratedNotification, + OSSNodeMigratingNotification, ) if sys.version_info.major >= 3 and sys.version_info.minor >= 11: @@ -179,9 +181,49 @@ async def read_response( class MaintenanceNotificationsParser: """Protocol defining maintenance push notification parsing functionality""" + @staticmethod + def parse_oss_maintenance_start_msg(response): + # Expected message format is: + # TODO Clarify which format will be the final one + # SMIGRATING + # or + # (using the one below for testing until the server changes are done) + # SMIGRATING TO ## received on source endpoint + # SMIGRATING FROM . ## received on target endpoint + id = response[1] + + address_value = response[3] + if isinstance(address_value, bytes): + address_value = address_value.decode() + if response[2] in ("TO", b"TO"): + dest_node = address_value + src_node = None + else: + dest_node = None + src_node = address_value + + slots = response[4] + return OSSNodeMigratingNotification(id, src_node, dest_node, slots) + + @staticmethod + def parse_oss_maintenance_completed_msg(response): + # Expected message format is: + # TODO Clarify which format will be the final one + # SMIGRATED NodeA’ + # or + # SMIGRATED + # received on source and target endpoints when migration is over + id = response[1] + node_address = None + slots = response[2] + return OSSNodeMigratedNotification(id, node_address, slots) + @staticmethod def parse_maintenance_start_msg(response, notification_type): # Expected message format is: