3
3
from asyncio import IncompleteReadError , StreamReader , TimeoutError
4
4
from typing import Callable , List , Optional , Protocol , Union
5
5
6
+ from redis .maintenance_events import (
7
+ NodeMigratedEvent ,
8
+ NodeMigratingEvent ,
9
+ NodeMovingEvent ,
10
+ )
11
+
6
12
if sys .version_info .major >= 3 and sys .version_info .minor >= 11 :
7
13
from asyncio import timeout as async_timeout
8
14
else :
@@ -158,48 +164,114 @@ async def read_response(
158
164
raise NotImplementedError ()
159
165
160
166
161
- _INVALIDATION_MESSAGE = [b"invalidate" , "invalidate" ]
167
+ _INVALIDATION_MESSAGE = (b"invalidate" , "invalidate" )
168
+ _MOVING_MESSAGE = (b"MOVING" , "MOVING" )
169
+ _MIGRATING_MESSAGE = (b"MIGRATING" , "MIGRATING" )
170
+ _MIGRATED_MESSAGE = (b"MIGRATED" , "MIGRATED" )
171
+ _FAILING_OVER_MESSAGE = (b"FAILING_OVER" , "FAILING_OVER" )
172
+ _FAILED_OVER_MESSAGE = (b"FAILED_OVER" , "FAILED_OVER" )
173
+
174
+ _MAINTENANCE_MESSAGES = (
175
+ * _MIGRATING_MESSAGE ,
176
+ * _MIGRATED_MESSAGE ,
177
+ * _FAILING_OVER_MESSAGE ,
178
+ * _FAILED_OVER_MESSAGE ,
179
+ )
162
180
163
181
164
182
class PushNotificationsParser (Protocol ):
165
183
"""Protocol defining RESP3-specific parsing functionality"""
166
184
167
185
pubsub_push_handler_func : Callable
168
186
invalidation_push_handler_func : Optional [Callable ] = None
187
+ node_moving_push_handler_func : Optional [Callable ] = None
188
+ maintenance_push_handler_func : Optional [Callable ] = None
169
189
170
190
def handle_pubsub_push_response (self , response ):
171
191
"""Handle pubsub push responses"""
172
192
raise NotImplementedError ()
173
193
174
194
def handle_push_response (self , response , ** kwargs ):
175
- if response [0 ] not in _INVALIDATION_MESSAGE :
195
+ msg_type = response [0 ]
196
+ if msg_type not in (
197
+ * _INVALIDATION_MESSAGE ,
198
+ * _MAINTENANCE_MESSAGES ,
199
+ * _MOVING_MESSAGE ,
200
+ ):
176
201
return self .pubsub_push_handler_func (response )
177
- if self .invalidation_push_handler_func :
202
+ if msg_type in _INVALIDATION_MESSAGE and self .invalidation_push_handler_func :
178
203
return self .invalidation_push_handler_func (response )
204
+ if msg_type in _MOVING_MESSAGE and self .node_moving_push_handler_func :
205
+ if msg_type in _MOVING_MESSAGE :
206
+ host , port = response [2 ].split (":" )
207
+ ttl = response [1 ]
208
+ notification = NodeMovingEvent (host , port , ttl )
209
+ return self .node_moving_push_handler_func (notification )
210
+ if msg_type in _MAINTENANCE_MESSAGES and self .maintenance_push_handler_func :
211
+ if msg_type in _MIGRATING_MESSAGE :
212
+ ttl = response [1 ]
213
+ notification = NodeMigratingEvent (ttl )
214
+ elif msg_type in _MIGRATED_MESSAGE :
215
+ notification = NodeMigratedEvent ()
216
+ else :
217
+ notification = None
218
+ if notification is not None :
219
+ return self .maintenance_push_handler_func (notification )
220
+ else :
221
+ return None
179
222
180
223
def set_pubsub_push_handler (self , pubsub_push_handler_func ):
181
224
self .pubsub_push_handler_func = pubsub_push_handler_func
182
225
183
226
def set_invalidation_push_handler (self , invalidation_push_handler_func ):
184
227
self .invalidation_push_handler_func = invalidation_push_handler_func
185
228
229
+ def set_node_moving_push_handler (self , node_moving_push_handler_func ):
230
+ self .node_moving_push_handler_func = node_moving_push_handler_func
231
+
232
+ def set_maintenance_push_handler (self , maintenance_push_handler_func ):
233
+ self .maintenance_push_handler_func = maintenance_push_handler_func
234
+
186
235
187
236
class AsyncPushNotificationsParser (Protocol ):
188
237
"""Protocol defining async RESP3-specific parsing functionality"""
189
238
190
239
pubsub_push_handler_func : Callable
191
240
invalidation_push_handler_func : Optional [Callable ] = None
241
+ node_moving_push_handler_func : Optional [Callable ] = None
242
+ maintenance_push_handler_func : Optional [Callable ] = None
192
243
193
244
async def handle_pubsub_push_response (self , response ):
194
245
"""Handle pubsub push responses asynchronously"""
195
246
raise NotImplementedError ()
196
247
197
248
async def handle_push_response (self , response , ** kwargs ):
198
249
"""Handle push responses asynchronously"""
199
- if response [0 ] not in _INVALIDATION_MESSAGE :
250
+ msg_type = response [0 ]
251
+ if msg_type not in (
252
+ * _INVALIDATION_MESSAGE ,
253
+ * _MAINTENANCE_MESSAGES ,
254
+ * _MOVING_MESSAGE ,
255
+ ):
200
256
return await self .pubsub_push_handler_func (response )
201
- if self .invalidation_push_handler_func :
257
+ if msg_type in _INVALIDATION_MESSAGE and self .invalidation_push_handler_func :
202
258
return await self .invalidation_push_handler_func (response )
259
+ if msg_type in _MOVING_MESSAGE and self .node_moving_push_handler_func :
260
+ # push notification from enterprise cluster for node moving
261
+ host , port = response [2 ].split (":" )
262
+ ttl = response [1 ]
263
+ id = 1 # TODO: get unique id from push notification
264
+ notification = NodeMovingEvent (id , host , port , ttl )
265
+ return await self .node_moving_push_handler_func (notification )
266
+ if msg_type in _MAINTENANCE_MESSAGES and self .maintenance_push_handler_func :
267
+ if msg_type in _MIGRATING_MESSAGE :
268
+ ttl = response [1 ]
269
+ id = 1 # TODO: get unique id from push notification
270
+ notification = NodeMigratingEvent (id , ttl )
271
+ elif msg_type in _MIGRATED_MESSAGE :
272
+ id = 1 # TODO: get unique id from push notification
273
+ notification = NodeMigratedEvent (id )
274
+ return await self .maintenance_push_handler_func (notification )
203
275
204
276
def set_pubsub_push_handler (self , pubsub_push_handler_func ):
205
277
"""Set the pubsub push handler function"""
@@ -209,6 +281,12 @@ def set_invalidation_push_handler(self, invalidation_push_handler_func):
209
281
"""Set the invalidation push handler function"""
210
282
self .invalidation_push_handler_func = invalidation_push_handler_func
211
283
284
+ def set_node_moving_push_handler_func (self , node_moving_push_handler_func ):
285
+ self .node_moving_push_handler_func = node_moving_push_handler_func
286
+
287
+ def set_maintenance_push_handler (self , maintenance_push_handler_func ):
288
+ self .maintenance_push_handler_func = maintenance_push_handler_func
289
+
212
290
213
291
class _AsyncRESPBase (AsyncBaseParser ):
214
292
"""Base class for async resp parsing"""
0 commit comments