1111 NodeMigratedNotification ,
1212 NodeMigratingNotification ,
1313 NodeMovingNotification ,
14+ OSSNodeMigratedNotification ,
15+ OSSNodeMigratingNotification ,
1416)
1517
1618if sys .version_info .major >= 3 and sys .version_info .minor >= 11 :
@@ -179,16 +181,39 @@ async def read_response(
179181class MaintenanceNotificationsParser :
180182 """Protocol defining maintenance push notification parsing functionality"""
181183
184+ @staticmethod
185+ def parse_oss_maintenance_start_msg (response ):
186+ # Expected message format is:
187+ # SMIGRATING <seq_number> <slot, range1-range2,...>
188+ id = response [1 ]
189+ slots = response [2 ]
190+ return OSSNodeMigratingNotification (id , slots )
191+
192+ @staticmethod
193+ def parse_oss_maintenance_completed_msg (response ):
194+ # Expected message format is:
195+ # SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
196+ id = response [1 ]
197+ node_address = response [2 ]
198+ slots = response [3 ]
199+ return OSSNodeMigratedNotification (id , node_address , slots )
200+
182201 @staticmethod
183202 def parse_maintenance_start_msg (response , notification_type ):
184203 # Expected message format is: <notification_type> <seq_number> <time>
204+ # Examples:
205+ # MIGRATING 1 10
206+ # FAILING_OVER 2 20
185207 id = response [1 ]
186208 ttl = response [2 ]
187209 return notification_type (id , ttl )
188210
189211 @staticmethod
190212 def parse_maintenance_completed_msg (response , notification_type ):
191213 # Expected message format is: <notification_type> <seq_number>
214+ # Examples:
215+ # MIGRATED 1
216+ # FAILED_OVER 2
192217 id = response [1 ]
193218 return notification_type (id )
194219
@@ -215,12 +240,15 @@ def parse_moving_msg(response):
215240_MIGRATED_MESSAGE = "MIGRATED"
216241_FAILING_OVER_MESSAGE = "FAILING_OVER"
217242_FAILED_OVER_MESSAGE = "FAILED_OVER"
243+ _SMIGRATING_MESSAGE = "SMIGRATING"
244+ _SMIGRATED_MESSAGE = "SMIGRATED"
218245
219246_MAINTENANCE_MESSAGES = (
220247 _MIGRATING_MESSAGE ,
221248 _MIGRATED_MESSAGE ,
222249 _FAILING_OVER_MESSAGE ,
223250 _FAILED_OVER_MESSAGE ,
251+ _SMIGRATING_MESSAGE ,
224252)
225253
226254MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING : dict [
@@ -246,6 +274,14 @@ def parse_moving_msg(response):
246274 NodeMovingNotification ,
247275 MaintenanceNotificationsParser .parse_moving_msg ,
248276 ),
277+ _SMIGRATING_MESSAGE : (
278+ OSSNodeMigratingNotification ,
279+ MaintenanceNotificationsParser .parse_oss_maintenance_start_msg ,
280+ ),
281+ _SMIGRATED_MESSAGE : (
282+ OSSNodeMigratedNotification ,
283+ MaintenanceNotificationsParser .parse_oss_maintenance_completed_msg ,
284+ ),
249285}
250286
251287
@@ -256,6 +292,7 @@ class PushNotificationsParser(Protocol):
256292 invalidation_push_handler_func : Optional [Callable ] = None
257293 node_moving_push_handler_func : Optional [Callable ] = None
258294 maintenance_push_handler_func : Optional [Callable ] = None
295+ oss_cluster_maint_push_handler_func : Optional [Callable ] = None
259296
260297 def handle_pubsub_push_response (self , response ):
261298 """Handle pubsub push responses"""
@@ -270,6 +307,7 @@ def handle_push_response(self, response, **kwargs):
270307 _INVALIDATION_MESSAGE ,
271308 * _MAINTENANCE_MESSAGES ,
272309 _MOVING_MESSAGE ,
310+ _SMIGRATED_MESSAGE ,
273311 ):
274312 return self .pubsub_push_handler_func (response )
275313
@@ -292,13 +330,27 @@ def handle_push_response(self, response, **kwargs):
292330 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
293331 msg_type
294332 ][1 ]
295- notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
296- msg_type
297- ][0 ]
298- notification = parser_function (response , notification_type )
333+ if msg_type == _SMIGRATING_MESSAGE :
334+ notification = parser_function (response )
335+ else :
336+ notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
337+ msg_type
338+ ][0 ]
339+ notification = parser_function (response , notification_type )
299340
300341 if notification is not None :
301342 return self .maintenance_push_handler_func (notification )
343+ if (
344+ msg_type == _SMIGRATED_MESSAGE
345+ and self .oss_cluster_maint_push_handler_func
346+ ):
347+ parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
348+ msg_type
349+ ][1 ]
350+ notification = parser_function (response )
351+
352+ if notification is not None :
353+ return self .oss_cluster_maint_push_handler_func (notification )
302354 except Exception as e :
303355 logger .error (
304356 "Error handling {} message ({}): {}" .format (msg_type , response , e )
@@ -318,6 +370,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
318370 def set_maintenance_push_handler (self , maintenance_push_handler_func ):
319371 self .maintenance_push_handler_func = maintenance_push_handler_func
320372
373+ def set_oss_cluster_maint_push_handler (self , oss_cluster_maint_push_handler_func ):
374+ self .oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
375+
321376
322377class AsyncPushNotificationsParser (Protocol ):
323378 """Protocol defining async RESP3-specific parsing functionality"""
@@ -326,6 +381,7 @@ class AsyncPushNotificationsParser(Protocol):
326381 invalidation_push_handler_func : Optional [Callable ] = None
327382 node_moving_push_handler_func : Optional [Callable [..., Awaitable [None ]]] = None
328383 maintenance_push_handler_func : Optional [Callable [..., Awaitable [None ]]] = None
384+ oss_cluster_maint_push_handler_func : Optional [Callable [..., Awaitable [None ]]] = None
329385
330386 async def handle_pubsub_push_response (self , response ):
331387 """Handle pubsub push responses asynchronously"""
@@ -342,6 +398,7 @@ async def handle_push_response(self, response, **kwargs):
342398 _INVALIDATION_MESSAGE ,
343399 * _MAINTENANCE_MESSAGES ,
344400 _MOVING_MESSAGE ,
401+ _SMIGRATED_MESSAGE ,
345402 ):
346403 return await self .pubsub_push_handler_func (response )
347404
@@ -366,13 +423,26 @@ async def handle_push_response(self, response, **kwargs):
366423 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
367424 msg_type
368425 ][1 ]
369- notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
370- msg_type
371- ][0 ]
372- notification = parser_function (response , notification_type )
426+ if msg_type == _SMIGRATING_MESSAGE :
427+ notification = parser_function (response )
428+ else :
429+ notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
430+ msg_type
431+ ][0 ]
432+ notification = parser_function (response , notification_type )
373433
374434 if notification is not None :
375435 return await self .maintenance_push_handler_func (notification )
436+ if (
437+ msg_type == _SMIGRATED_MESSAGE
438+ and self .oss_cluster_maint_push_handler_func
439+ ):
440+ parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
441+ msg_type
442+ ][1 ]
443+ notification = parser_function (response )
444+ if notification is not None :
445+ return await self .oss_cluster_maint_push_handler_func (notification )
376446 except Exception as e :
377447 logger .error (
378448 "Error handling {} message ({}): {}" .format (msg_type , response , e )
@@ -394,6 +464,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
394464 def set_maintenance_push_handler (self , maintenance_push_handler_func ):
395465 self .maintenance_push_handler_func = maintenance_push_handler_func
396466
467+ def set_oss_cluster_maint_push_handler (self , oss_cluster_maint_push_handler_func ):
468+ self .oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
469+
397470
398471class _AsyncRESPBase (AsyncBaseParser ):
399472 """Base class for async resp parsing"""
0 commit comments