Skip to content

Commit 72615a7

Browse files
committed
Handling of topology update push notifications for Standalone Redis client.
1 parent a001416 commit 72615a7

File tree

7 files changed

+982
-96
lines changed

7 files changed

+982
-96
lines changed

redis/_parsers/base.py

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
from asyncio import IncompleteReadError, StreamReader, TimeoutError
44
from typing import Callable, List, Optional, Protocol, Union
55

6+
from redis.maintenance_events import (
7+
NodeMigratedEvent,
8+
NodeMigratingEvent,
9+
NodeMovingEvent,
10+
)
11+
612
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
713
from asyncio import timeout as async_timeout
814
else:
@@ -158,48 +164,114 @@ async def read_response(
158164
raise NotImplementedError()
159165

160166

161-
_INVALIDATION_MESSAGE = [b"invalidate", "invalidate"]
167+
_INVALIDATION_MESSAGE = (b"invalidate", "invalidate")
168+
_MOVING_MESSAGE = (b"MOVING", "MOVING")
169+
_MIGRATING_MESSAGE = (b"MIGRATING", "MIGRATING")
170+
_MIGRATED_MESSAGE = (b"MIGRATED", "MIGRATED")
171+
_FAILING_OVER_MESSAGE = (b"FAILING_OVER", "FAILING_OVER")
172+
_FAILED_OVER_MESSAGE = (b"FAILED_OVER", "FAILED_OVER")
173+
174+
_MAINTENANCE_MESSAGES = (
175+
*_MIGRATING_MESSAGE,
176+
*_MIGRATED_MESSAGE,
177+
*_FAILING_OVER_MESSAGE,
178+
*_FAILED_OVER_MESSAGE,
179+
)
162180

163181

164182
class PushNotificationsParser(Protocol):
165183
"""Protocol defining RESP3-specific parsing functionality"""
166184

167185
pubsub_push_handler_func: Callable
168186
invalidation_push_handler_func: Optional[Callable] = None
187+
node_moving_push_handler_func: Optional[Callable] = None
188+
maintenance_push_handler_func: Optional[Callable] = None
169189

170190
def handle_pubsub_push_response(self, response):
171191
"""Handle pubsub push responses"""
172192
raise NotImplementedError()
173193

174194
def handle_push_response(self, response, **kwargs):
175-
if response[0] not in _INVALIDATION_MESSAGE:
195+
msg_type = response[0]
196+
if msg_type not in (
197+
*_INVALIDATION_MESSAGE,
198+
*_MAINTENANCE_MESSAGES,
199+
*_MOVING_MESSAGE,
200+
):
176201
return self.pubsub_push_handler_func(response)
177-
if self.invalidation_push_handler_func:
202+
if msg_type in _INVALIDATION_MESSAGE and self.invalidation_push_handler_func:
178203
return self.invalidation_push_handler_func(response)
204+
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
205+
if msg_type in _MOVING_MESSAGE:
206+
host, port = response[2].split(":")
207+
ttl = response[1]
208+
notification = NodeMovingEvent(host, port, ttl)
209+
return self.node_moving_push_handler_func(notification)
210+
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
211+
if msg_type in _MIGRATING_MESSAGE:
212+
ttl = response[1]
213+
notification = NodeMigratingEvent(ttl)
214+
elif msg_type in _MIGRATED_MESSAGE:
215+
notification = NodeMigratedEvent()
216+
else:
217+
notification = None
218+
if notification is not None:
219+
return self.maintenance_push_handler_func(notification)
220+
else:
221+
return None
179222

180223
def set_pubsub_push_handler(self, pubsub_push_handler_func):
181224
self.pubsub_push_handler_func = pubsub_push_handler_func
182225

183226
def set_invalidation_push_handler(self, invalidation_push_handler_func):
184227
self.invalidation_push_handler_func = invalidation_push_handler_func
185228

229+
def set_node_moving_push_handler(self, node_moving_push_handler_func):
230+
self.node_moving_push_handler_func = node_moving_push_handler_func
231+
232+
def set_maintenance_push_handler(self, maintenance_push_handler_func):
233+
self.maintenance_push_handler_func = maintenance_push_handler_func
234+
186235

187236
class AsyncPushNotificationsParser(Protocol):
188237
"""Protocol defining async RESP3-specific parsing functionality"""
189238

190239
pubsub_push_handler_func: Callable
191240
invalidation_push_handler_func: Optional[Callable] = None
241+
node_moving_push_handler_func: Optional[Callable] = None
242+
maintenance_push_handler_func: Optional[Callable] = None
192243

193244
async def handle_pubsub_push_response(self, response):
194245
"""Handle pubsub push responses asynchronously"""
195246
raise NotImplementedError()
196247

197248
async def handle_push_response(self, response, **kwargs):
198249
"""Handle push responses asynchronously"""
199-
if response[0] not in _INVALIDATION_MESSAGE:
250+
msg_type = response[0]
251+
if msg_type not in (
252+
*_INVALIDATION_MESSAGE,
253+
*_MAINTENANCE_MESSAGES,
254+
*_MOVING_MESSAGE,
255+
):
200256
return await self.pubsub_push_handler_func(response)
201-
if self.invalidation_push_handler_func:
257+
if msg_type in _INVALIDATION_MESSAGE and self.invalidation_push_handler_func:
202258
return await self.invalidation_push_handler_func(response)
259+
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
260+
# push notification from enterprise cluster for node moving
261+
host, port = response[2].split(":")
262+
ttl = response[1]
263+
id = 1 # TODO: get unique id from push notification
264+
notification = NodeMovingEvent(id, host, port, ttl)
265+
return await self.node_moving_push_handler_func(notification)
266+
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
267+
if msg_type in _MIGRATING_MESSAGE:
268+
ttl = response[1]
269+
id = 1 # TODO: get unique id from push notification
270+
notification = NodeMigratingEvent(id, ttl)
271+
elif msg_type in _MIGRATED_MESSAGE:
272+
id = 1 # TODO: get unique id from push notification
273+
notification = NodeMigratedEvent(id)
274+
return await self.maintenance_push_handler_func(notification)
203275

204276
def set_pubsub_push_handler(self, pubsub_push_handler_func):
205277
"""Set the pubsub push handler function"""
@@ -209,6 +281,12 @@ def set_invalidation_push_handler(self, invalidation_push_handler_func):
209281
"""Set the invalidation push handler function"""
210282
self.invalidation_push_handler_func = invalidation_push_handler_func
211283

284+
def set_node_moving_push_handler_func(self, node_moving_push_handler_func):
285+
self.node_moving_push_handler_func = node_moving_push_handler_func
286+
287+
def set_maintenance_push_handler(self, maintenance_push_handler_func):
288+
self.maintenance_push_handler_func = maintenance_push_handler_func
289+
212290

213291
class _AsyncRESPBase(AsyncBaseParser):
214292
"""Base class for async resp parsing"""

redis/_parsers/hiredis.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ def __init__(self, socket_read_size):
4747
self.socket_read_size = socket_read_size
4848
self._buffer = bytearray(socket_read_size)
4949
self.pubsub_push_handler_func = self.handle_pubsub_push_response
50+
self.node_moving_push_handler_func = None
51+
self.maintenance_push_handler_func = None
5052
self.invalidation_push_handler_func = None
5153
self._hiredis_PushNotificationType = None
5254

@@ -141,13 +143,15 @@ def read_response(self, disable_decoding=False, push_request=False):
141143
response, self._hiredis_PushNotificationType
142144
):
143145
response = self.handle_push_response(response)
144-
if not push_request:
145-
return self.read_response(
146-
disable_decoding=disable_decoding, push_request=push_request
147-
)
148-
else:
146+
147+
# if this is a push request return the push response
148+
if push_request:
149149
return response
150-
return response
150+
151+
return self.read_response(
152+
disable_decoding=disable_decoding,
153+
push_request=push_request,
154+
)
151155

152156
if disable_decoding:
153157
response = self._reader.gets(False)
@@ -169,12 +173,13 @@ def read_response(self, disable_decoding=False, push_request=False):
169173
response, self._hiredis_PushNotificationType
170174
):
171175
response = self.handle_push_response(response)
172-
if not push_request:
173-
return self.read_response(
174-
disable_decoding=disable_decoding, push_request=push_request
175-
)
176-
else:
176+
if push_request:
177177
return response
178+
return self.read_response(
179+
disable_decoding=disable_decoding,
180+
push_request=push_request,
181+
)
182+
178183
elif (
179184
isinstance(response, list)
180185
and response

redis/_parsers/resp3.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ class _RESP3Parser(_RESPBase, PushNotificationsParser):
1818
def __init__(self, socket_read_size):
1919
super().__init__(socket_read_size)
2020
self.pubsub_push_handler_func = self.handle_pubsub_push_response
21+
self.node_moving_push_handler_func = None
22+
self.maintenance_push_handler_func = None
2123
self.invalidation_push_handler_func = None
2224

2325
def handle_pubsub_push_response(self, response):
@@ -117,17 +119,21 @@ def _read_response(self, disable_decoding=False, push_request=False):
117119
for _ in range(int(response))
118120
]
119121
response = self.handle_push_response(response)
120-
if not push_request:
121-
return self._read_response(
122-
disable_decoding=disable_decoding, push_request=push_request
123-
)
124-
else:
122+
123+
# if this is a push request return the push response
124+
if push_request:
125125
return response
126+
127+
return self._read_response(
128+
disable_decoding=disable_decoding,
129+
push_request=push_request,
130+
)
126131
else:
127132
raise InvalidResponse(f"Protocol Error: {raw!r}")
128133

129134
if isinstance(response, bytes) and disable_decoding is False:
130135
response = self.encoder.decode(response)
136+
131137
return response
132138

133139

0 commit comments

Comments
 (0)