5
5
from typing import Awaitable , Callable , List , Optional , Protocol , Union
6
6
7
7
from redis .maintenance_events import (
8
+ MaintenanceEvent ,
9
+ NodeFailedOverEvent ,
10
+ NodeFailingOverEvent ,
8
11
NodeMigratedEvent ,
9
12
NodeMigratingEvent ,
10
13
NodeMovingEvent ,
@@ -167,20 +170,76 @@ async def read_response(
167
170
raise NotImplementedError ()
168
171
169
172
170
- _INVALIDATION_MESSAGE = (b"invalidate" , "invalidate" )
171
- _MOVING_MESSAGE = (b"MOVING" , "MOVING" )
172
- _MIGRATING_MESSAGE = (b"MIGRATING" , "MIGRATING" )
173
- _MIGRATED_MESSAGE = (b"MIGRATED" , "MIGRATED" )
174
- _FAILING_OVER_MESSAGE = (b"FAILING_OVER" , "FAILING_OVER" )
175
- _FAILED_OVER_MESSAGE = (b"FAILED_OVER" , "FAILED_OVER" )
173
+ class MaintenanceNotificationsParser :
174
+ """Protocol defining maintenance push notification parsing functionality"""
175
+
176
+ @staticmethod
177
+ def parse_maintenance_start_msg (response , notification_type ):
178
+ # Expected message format is: <event_type> <seq_number> <time>
179
+ id = response [1 ]
180
+ ttl = response [2 ]
181
+ return notification_type (id , ttl )
182
+
183
+ @staticmethod
184
+ def parse_maintenance_completed_msg (response , notification_type ):
185
+ # Expected message format is: <event_type> <seq_number>
186
+ id = response [1 ]
187
+ return notification_type (id )
188
+
189
+ @staticmethod
190
+ def parse_moving_msg (response ):
191
+ # Expected message format is: MOVING <seq_number> <time> <endpoint>
192
+ id = response [1 ]
193
+ ttl = response [2 ]
194
+ if response [3 ] in [b"null" , "null" ]:
195
+ host , port = None , None
196
+ else :
197
+ value = response [3 ]
198
+ if isinstance (value , bytes ):
199
+ value = value .decode ()
200
+ host , port = value .split (":" )
201
+ port = int (port ) if port is not None else None
202
+
203
+ return NodeMovingEvent (id , host , port , ttl )
204
+
205
+
206
+ _INVALIDATION_MESSAGE = "invalidate"
207
+ _MOVING_MESSAGE = "MOVING"
208
+ _MIGRATING_MESSAGE = "MIGRATING"
209
+ _MIGRATED_MESSAGE = "MIGRATED"
210
+ _FAILING_OVER_MESSAGE = "FAILING_OVER"
211
+ _FAILED_OVER_MESSAGE = "FAILED_OVER"
176
212
177
213
_MAINTENANCE_MESSAGES = (
178
- * _MIGRATING_MESSAGE ,
179
- * _MIGRATED_MESSAGE ,
180
- * _FAILING_OVER_MESSAGE ,
181
- * _FAILED_OVER_MESSAGE ,
214
+ _MIGRATING_MESSAGE ,
215
+ _MIGRATED_MESSAGE ,
216
+ _FAILING_OVER_MESSAGE ,
217
+ _FAILED_OVER_MESSAGE ,
182
218
)
183
219
220
+ MSG_TYPE_TO_EVENT_PARSER_MAPPING : dict [str , tuple [type [MaintenanceEvent ], Callable ]] = {
221
+ _MIGRATING_MESSAGE : (
222
+ NodeMigratingEvent ,
223
+ MaintenanceNotificationsParser .parse_maintenance_start_msg ,
224
+ ),
225
+ _MIGRATED_MESSAGE : (
226
+ NodeMigratedEvent ,
227
+ MaintenanceNotificationsParser .parse_maintenance_completed_msg ,
228
+ ),
229
+ _FAILING_OVER_MESSAGE : (
230
+ NodeFailingOverEvent ,
231
+ MaintenanceNotificationsParser .parse_maintenance_start_msg ,
232
+ ),
233
+ _FAILED_OVER_MESSAGE : (
234
+ NodeFailedOverEvent ,
235
+ MaintenanceNotificationsParser .parse_maintenance_completed_msg ,
236
+ ),
237
+ _MOVING_MESSAGE : (
238
+ NodeMovingEvent ,
239
+ MaintenanceNotificationsParser .parse_moving_msg ,
240
+ ),
241
+ }
242
+
184
243
185
244
class PushNotificationsParser (Protocol ):
186
245
"""Protocol defining RESP3-specific parsing functionality"""
@@ -196,39 +255,33 @@ def handle_pubsub_push_response(self, response):
196
255
197
256
def handle_push_response (self , response , ** kwargs ):
198
257
msg_type = response [0 ]
258
+ if isinstance (msg_type , bytes ):
259
+ msg_type = msg_type .decode ()
260
+
199
261
if msg_type not in (
200
- * _INVALIDATION_MESSAGE ,
262
+ _INVALIDATION_MESSAGE ,
201
263
* _MAINTENANCE_MESSAGES ,
202
- * _MOVING_MESSAGE ,
264
+ _MOVING_MESSAGE ,
203
265
):
204
266
return self .pubsub_push_handler_func (response )
205
267
206
268
try :
207
269
if (
208
- msg_type in _INVALIDATION_MESSAGE
270
+ msg_type == _INVALIDATION_MESSAGE
209
271
and self .invalidation_push_handler_func
210
272
):
211
273
return self .invalidation_push_handler_func (response )
212
274
213
- if msg_type in _MOVING_MESSAGE and self .node_moving_push_handler_func :
214
- # Expected message format is: MOVING <seq_number> <time> <endpoint>
215
- id = response [1 ]
216
- ttl = response [2 ]
217
- host , port = response [3 ].decode ().split (":" )
218
- notification = NodeMovingEvent (id , host , port , ttl )
275
+ if msg_type == _MOVING_MESSAGE and self .node_moving_push_handler_func :
276
+ parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][1 ]
277
+
278
+ notification = parser_function (response )
219
279
return self .node_moving_push_handler_func (notification )
220
280
221
281
if msg_type in _MAINTENANCE_MESSAGES and self .maintenance_push_handler_func :
222
- notification = None
223
-
224
- if msg_type in _MIGRATING_MESSAGE :
225
- # Expected message format is: MIGRATING <seq_number> <time> <shard_id-s>
226
- id = response [1 ]
227
- ttl = response [2 ]
228
- notification = NodeMigratingEvent (id , ttl )
229
- elif msg_type in _MIGRATED_MESSAGE :
230
- id = response [1 ]
231
- notification = NodeMigratedEvent (id )
282
+ parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][1 ]
283
+ notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][0 ]
284
+ notification = parser_function (response , notification_type )
232
285
233
286
if notification is not None :
234
287
return self .maintenance_push_handler_func (notification )
@@ -268,38 +321,35 @@ async def handle_push_response(self, response, **kwargs):
268
321
"""Handle push responses asynchronously"""
269
322
270
323
msg_type = response [0 ]
324
+ if isinstance (msg_type , bytes ):
325
+ msg_type = msg_type .decode ()
326
+
271
327
if msg_type not in (
272
- * _INVALIDATION_MESSAGE ,
328
+ _INVALIDATION_MESSAGE ,
273
329
* _MAINTENANCE_MESSAGES ,
274
- * _MOVING_MESSAGE ,
330
+ _MOVING_MESSAGE ,
275
331
):
276
332
return await self .pubsub_push_handler_func (response )
277
333
278
334
try :
279
335
if (
280
- msg_type in _INVALIDATION_MESSAGE
336
+ msg_type == _INVALIDATION_MESSAGE
281
337
and self .invalidation_push_handler_func
282
338
):
283
339
return await self .invalidation_push_handler_func (response )
284
340
285
- if msg_type in _MOVING_MESSAGE and self . node_moving_push_handler_func :
286
- # push notification from enterprise cluster for node moving
287
- id = response [ 1 ]
288
- ttl = response [ 2 ]
289
- host , port = response [ 3 ]. split ( ":" )
290
- notification = NodeMovingEvent ( id , host , port , ttl )
341
+ if isinstance ( msg_type , bytes ) :
342
+ msg_type = msg_type . decode ()
343
+
344
+ if msg_type == _MOVING_MESSAGE and self . node_moving_push_handler_func :
345
+ parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING [ msg_type ][ 1 ]
346
+ notification = parser_function ( response )
291
347
return await self .node_moving_push_handler_func (notification )
292
348
293
349
if msg_type in _MAINTENANCE_MESSAGES and self .maintenance_push_handler_func :
294
- notification = None
295
-
296
- if msg_type in _MIGRATING_MESSAGE :
297
- id = response [1 ]
298
- ttl = response [2 ]
299
- notification = NodeMigratingEvent (id , ttl )
300
- elif msg_type in _MIGRATED_MESSAGE :
301
- id = response [1 ]
302
- notification = NodeMigratedEvent (id )
350
+ parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][1 ]
351
+ notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][0 ]
352
+ notification = parser_function (response , notification_type )
303
353
304
354
if notification is not None :
305
355
return await self .maintenance_push_handler_func (notification )
0 commit comments