Skip to content

Commit e3c8b3f

Browse files
committed
Adding oss maint notifications handler configurations to parsers. Placeholder for smigrated handler in OSSMaintNotificationsHandler class
1 parent e0847c9 commit e3c8b3f

File tree

6 files changed

+315
-64
lines changed

6 files changed

+315
-64
lines changed

redis/_parsers/base.py

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
NodeMigratedNotification,
1212
NodeMigratingNotification,
1313
NodeMovingNotification,
14+
OSSNodeMigratedNotification,
15+
OSSNodeMigratingNotification,
1416
)
1517

1618
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
@@ -179,16 +181,59 @@ async def read_response(
179181
class MaintenanceNotificationsParser:
180182
"""Protocol defining maintenance push notification parsing functionality"""
181183

184+
@staticmethod
185+
def parse_oss_maintenance_start_msg(response):
186+
# Expected message format is:
187+
# TODO Clarify which format will be the final one
188+
# SMIGRATING <seq_number> <slot, range1-range2,...>
189+
# or
190+
# (using the one below for testing until the server changes are done)
191+
# SMIGRATING <transaction_id> TO <endpoint> <list of slots and slot-ranges> ## received on source endpoint
192+
# SMIGRATING <transaction_id> FROM <list of slots and slot-ranges>. ## received on target endpoint
193+
id = response[1]
194+
195+
address_value = response[3]
196+
if isinstance(address_value, bytes):
197+
address_value = address_value.decode()
198+
if response[2] in ("TO", b"TO"):
199+
dest_node = address_value
200+
src_node = None
201+
else:
202+
dest_node = None
203+
src_node = address_value
204+
205+
slots = response[4]
206+
return OSSNodeMigratingNotification(id, src_node, dest_node, slots)
207+
208+
@staticmethod
209+
def parse_oss_maintenance_completed_msg(response):
210+
# Expected message format is:
211+
# TODO Clarify which format will be the final one
212+
# SMIGRATED <seq_number> NodeA’<host:port> <slot, range1-range2,...>
213+
# or
214+
# SMIGRATED <transaction_id> <list of slots and slot-ranges>
215+
# received on source and target endpoints when migration is over
216+
id = response[1]
217+
node_address = None
218+
slots = response[2]
219+
return OSSNodeMigratedNotification(id, node_address, slots)
220+
182221
@staticmethod
183222
def parse_maintenance_start_msg(response, notification_type):
184223
# Expected message format is: <notification_type> <seq_number> <time>
224+
# Examples:
225+
# MIGRATING 1 10
226+
# FAILING_OVER 2 20
185227
id = response[1]
186228
ttl = response[2]
187229
return notification_type(id, ttl)
188230

189231
@staticmethod
190232
def parse_maintenance_completed_msg(response, notification_type):
191233
# Expected message format is: <notification_type> <seq_number>
234+
# Examples:
235+
# MIGRATED 1
236+
# FAILED_OVER 2
192237
id = response[1]
193238
return notification_type(id)
194239

@@ -215,12 +260,15 @@ def parse_moving_msg(response):
215260
_MIGRATED_MESSAGE = "MIGRATED"
216261
_FAILING_OVER_MESSAGE = "FAILING_OVER"
217262
_FAILED_OVER_MESSAGE = "FAILED_OVER"
263+
_SMIGRATING_MESSAGE = "SMIGRATING"
264+
_SMIGRATED_MESSAGE = "SMIGRATED"
218265

219266
_MAINTENANCE_MESSAGES = (
220267
_MIGRATING_MESSAGE,
221268
_MIGRATED_MESSAGE,
222269
_FAILING_OVER_MESSAGE,
223270
_FAILED_OVER_MESSAGE,
271+
_SMIGRATING_MESSAGE,
224272
)
225273

226274
MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
@@ -246,6 +294,14 @@ def parse_moving_msg(response):
246294
NodeMovingNotification,
247295
MaintenanceNotificationsParser.parse_moving_msg,
248296
),
297+
_SMIGRATING_MESSAGE: (
298+
OSSNodeMigratingNotification,
299+
MaintenanceNotificationsParser.parse_oss_maintenance_start_msg,
300+
),
301+
_SMIGRATED_MESSAGE: (
302+
OSSNodeMigratedNotification,
303+
MaintenanceNotificationsParser.parse_oss_maintenance_completed_msg,
304+
),
249305
}
250306

251307

@@ -256,6 +312,7 @@ class PushNotificationsParser(Protocol):
256312
invalidation_push_handler_func: Optional[Callable] = None
257313
node_moving_push_handler_func: Optional[Callable] = None
258314
maintenance_push_handler_func: Optional[Callable] = None
315+
oss_cluster_maint_push_handler_func: Optional[Callable] = None
259316

260317
def handle_pubsub_push_response(self, response):
261318
"""Handle pubsub push responses"""
@@ -270,6 +327,7 @@ def handle_push_response(self, response, **kwargs):
270327
_INVALIDATION_MESSAGE,
271328
*_MAINTENANCE_MESSAGES,
272329
_MOVING_MESSAGE,
330+
_SMIGRATED_MESSAGE,
273331
):
274332
return self.pubsub_push_handler_func(response)
275333

@@ -292,13 +350,27 @@ def handle_push_response(self, response, **kwargs):
292350
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
293351
msg_type
294352
][1]
295-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
296-
msg_type
297-
][0]
298-
notification = parser_function(response, notification_type)
353+
if msg_type == _SMIGRATING_MESSAGE:
354+
notification = parser_function(response)
355+
else:
356+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
357+
msg_type
358+
][0]
359+
notification = parser_function(response, notification_type)
299360

300361
if notification is not None:
301362
return self.maintenance_push_handler_func(notification)
363+
if (
364+
msg_type == _SMIGRATED_MESSAGE
365+
and self.oss_cluster_maint_push_handler_func
366+
):
367+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
368+
msg_type
369+
][1]
370+
notification = parser_function(response)
371+
372+
if notification is not None:
373+
return self.oss_cluster_maint_push_handler_func(notification)
302374
except Exception as e:
303375
logger.error(
304376
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -318,6 +390,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
318390
def set_maintenance_push_handler(self, maintenance_push_handler_func):
319391
self.maintenance_push_handler_func = maintenance_push_handler_func
320392

393+
def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
394+
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
395+
321396

322397
class AsyncPushNotificationsParser(Protocol):
323398
"""Protocol defining async RESP3-specific parsing functionality"""
@@ -326,6 +401,7 @@ class AsyncPushNotificationsParser(Protocol):
326401
invalidation_push_handler_func: Optional[Callable] = None
327402
node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
328403
maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
404+
oss_cluster_maint_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
329405

330406
async def handle_pubsub_push_response(self, response):
331407
"""Handle pubsub push responses asynchronously"""
@@ -342,6 +418,7 @@ async def handle_push_response(self, response, **kwargs):
342418
_INVALIDATION_MESSAGE,
343419
*_MAINTENANCE_MESSAGES,
344420
_MOVING_MESSAGE,
421+
_SMIGRATED_MESSAGE,
345422
):
346423
return await self.pubsub_push_handler_func(response)
347424

@@ -366,13 +443,28 @@ async def handle_push_response(self, response, **kwargs):
366443
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
367444
msg_type
368445
][1]
369-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
370-
msg_type
371-
][0]
372-
notification = parser_function(response, notification_type)
446+
if msg_type == _SMIGRATING_MESSAGE:
447+
notification = parser_function(response)
448+
else:
449+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
450+
msg_type
451+
][0]
452+
notification = parser_function(response, notification_type)
373453

374454
if notification is not None:
375455
return await self.maintenance_push_handler_func(notification)
456+
if (
457+
msg_type == _SMIGRATED_MESSAGE
458+
and self.oss_cluster_maint_push_handler_func
459+
):
460+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
461+
msg_type
462+
][1]
463+
notification = parser_function(response)
464+
if notification is not None:
465+
return await self.oss_cluster_maint_push_handler_func(
466+
notification
467+
)
376468
except Exception as e:
377469
logger.error(
378470
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -394,6 +486,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
394486
def set_maintenance_push_handler(self, maintenance_push_handler_func):
395487
self.maintenance_push_handler_func = maintenance_push_handler_func
396488

489+
def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
490+
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
491+
397492

398493
class _AsyncRESPBase(AsyncBaseParser):
399494
"""Base class for async resp parsing"""

redis/_parsers/hiredis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, socket_read_size):
4949
self.pubsub_push_handler_func = self.handle_pubsub_push_response
5050
self.node_moving_push_handler_func = None
5151
self.maintenance_push_handler_func = None
52+
self.oss_cluster_maint_push_handler_func = None
5253
self.invalidation_push_handler_func = None
5354
self._hiredis_PushNotificationType = None
5455

redis/_parsers/resp3.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __init__(self, socket_read_size):
2020
self.pubsub_push_handler_func = self.handle_pubsub_push_response
2121
self.node_moving_push_handler_func = None
2222
self.maintenance_push_handler_func = None
23+
self.oss_cluster_maint_push_handler_func = None
2324
self.invalidation_push_handler_func = None
2425

2526
def handle_pubsub_push_response(self, response):

redis/maint_notifications.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99

1010
from redis.typing import Number
1111

12+
if TYPE_CHECKING:
13+
from redis.cluster import NodesManager
14+
1215

1316
class MaintenanceState(enum.Enum):
1417
NONE = "none"
@@ -886,6 +889,7 @@ class MaintNotificationsConnectionHandler:
886889
_NOTIFICATION_TYPES: dict[type["MaintenanceNotification"], int] = {
887890
NodeMigratingNotification: 1,
888891
NodeFailingOverNotification: 1,
892+
OSSNodeMigratingNotification: 1,
889893
NodeMigratedNotification: 0,
890894
NodeFailedOverNotification: 0,
891895
}
@@ -939,3 +943,31 @@ def handle_maintenance_completed_notification(self):
939943
# timeouts by providing -1 as the relaxed timeout
940944
self.connection.update_current_socket_timeout(-1)
941945
self.connection.maintenance_state = MaintenanceState.NONE
946+
947+
948+
class OSSMaintNotificationsHandler:
949+
def __init__(
950+
self, nodes_manager: "NodesManager", config: MaintNotificationsConfig
951+
) -> None:
952+
self.nodes_manager = nodes_manager
953+
self.config = config
954+
self._processed_notifications = set()
955+
self._lock = threading.RLock()
956+
957+
def remove_expired_notifications(self):
958+
with self._lock:
959+
for notification in tuple(self._processed_notifications):
960+
if notification.is_expired():
961+
self._processed_notifications.remove(notification)
962+
963+
def handle_notification(self, notification: MaintenanceNotification):
964+
if isinstance(notification, OSSNodeMigratedNotification):
965+
self.handle_oss_maintenance_completed_notification(notification)
966+
else:
967+
logging.error(f"Unhandled notification type: {notification}")
968+
969+
def handle_oss_maintenance_completed_notification(
970+
self, notification: OSSNodeMigratedNotification
971+
):
972+
self.remove_expired_notifications()
973+
logging.info(f"Received OSS maintenance completed notification: {notification}")

tests/maint_notifications/proxy_server_helpers.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import base64
22
import logging
3+
import re
34
from typing import Union
45

56
from redis.http.http_client import HttpClient, HttpError
@@ -10,6 +11,19 @@
1011
class RespTranslator:
1112
"""Helper class to translate between RESP and other encodings."""
1213

14+
@staticmethod
15+
def str_or_list_to_resp(txt: str) -> str:
16+
"""
17+
Convert specific string or list to RESP format.
18+
"""
19+
if re.match(r"^<.*>$", txt):
20+
items = txt[1:-1].split(",")
21+
return f"*{len(items)}\r\n" + "\r\n".join(
22+
f"${len(x)}\r\n{x}" for x in items
23+
)
24+
else:
25+
return f"${len(txt)}\r\n{txt}"
26+
1327
@staticmethod
1428
def cluster_slots_to_resp(resp: str) -> str:
1529
"""Convert query to RESP format."""
@@ -24,7 +38,9 @@ def smigrating_to_resp(resp: str) -> str:
2438
"""Convert query to RESP format."""
2539
return (
2640
f">{len(resp.split())}\r\n"
27-
+ "\r\n".join(f"${len(x)}\r\n{x}" for x in resp.split())
41+
+ "\r\n".join(
42+
f"{RespTranslator.str_or_list_to_resp(x)}" for x in resp.split()
43+
)
2844
+ "\r\n"
2945
)
3046

@@ -118,6 +134,8 @@ def get_stats(self) -> dict:
118134

119135
try:
120136
response = self.http_client.get(url)
137+
if isinstance(response, dict):
138+
return response
121139
return response.json()
122140

123141
except HttpError as e:
@@ -134,6 +152,8 @@ def get_connections(self) -> dict:
134152

135153
try:
136154
response = self.http_client.get(url)
155+
if isinstance(response, dict):
156+
return response
137157
return response.json()
138158
except HttpError as e:
139159
raise RuntimeError(f"Failed to get connections: {e}")
@@ -192,7 +212,9 @@ def send_notification(
192212
data = base64.b64encode(notification.encode("utf-8"))
193213

194214
try:
195-
response = self.http_client.post(url, json_body=data)
215+
response = self.http_client.post(url, data=data)
216+
if isinstance(response, dict):
217+
return response
196218
results = response.json()
197219
except HttpError as e:
198220
results = {"error": str(e)}

0 commit comments

Comments
 (0)