Skip to content

Commit 9ecae0a

Browse files
committed
Applying review comments. Finilized the notifications format.
1 parent e3c8b3f commit 9ecae0a

File tree

4 files changed

+17
-67
lines changed

4 files changed

+17
-67
lines changed

redis/_parsers/base.py

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -184,38 +184,18 @@ class MaintenanceNotificationsParser:
184184
@staticmethod
185185
def parse_oss_maintenance_start_msg(response):
186186
# Expected message format is:
187-
# TODO Clarify which format will be the final one
188187
# SMIGRATING <seq_number> <slot, range1-range2,...>
189-
# or
190-
# (using the one below for testing until the server changes are done)
191-
# SMIGRATING <transaction_id> TO <endpoint> <list of slots and slot-ranges> ## received on source endpoint
192-
# SMIGRATING <transaction_id> FROM <list of slots and slot-ranges>. ## received on target endpoint
193188
id = response[1]
194-
195-
address_value = response[3]
196-
if isinstance(address_value, bytes):
197-
address_value = address_value.decode()
198-
if response[2] in ("TO", b"TO"):
199-
dest_node = address_value
200-
src_node = None
201-
else:
202-
dest_node = None
203-
src_node = address_value
204-
205-
slots = response[4]
206-
return OSSNodeMigratingNotification(id, src_node, dest_node, slots)
189+
slots = response[2]
190+
return OSSNodeMigratingNotification(id, slots)
207191

208192
@staticmethod
209193
def parse_oss_maintenance_completed_msg(response):
210194
# Expected message format is:
211-
# TODO Clarify which format will be the final one
212-
# SMIGRATED <seq_number> NodeA’<host:port> <slot, range1-range2,...>
213-
# or
214-
# SMIGRATED <transaction_id> <list of slots and slot-ranges>
215-
# received on source and target endpoints when migration is over
195+
# SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
216196
id = response[1]
217-
node_address = None
218-
slots = response[2]
197+
node_address = response[2]
198+
slots = response[3]
219199
return OSSNodeMigratedNotification(id, node_address, slots)
220200

221201
@staticmethod
@@ -453,18 +433,16 @@ async def handle_push_response(self, response, **kwargs):
453433

454434
if notification is not None:
455435
return await self.maintenance_push_handler_func(notification)
456-
if (
457-
msg_type == _SMIGRATED_MESSAGE
458-
and self.oss_cluster_maint_push_handler_func
459-
):
460-
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
461-
msg_type
462-
][1]
463-
notification = parser_function(response)
464-
if notification is not None:
465-
return await self.oss_cluster_maint_push_handler_func(
466-
notification
467-
)
436+
if (
437+
msg_type == _SMIGRATED_MESSAGE
438+
and self.oss_cluster_maint_push_handler_func
439+
):
440+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
441+
msg_type
442+
][1]
443+
notification = parser_function(response)
444+
if notification is not None:
445+
return await self.oss_cluster_maint_push_handler_func(notification)
468446
except Exception as e:
469447
logger.error(
470448
"Error handling {} message ({}): {}".format(msg_type, response, e)

redis/maint_notifications.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -406,12 +406,6 @@ class OSSNodeMigratingNotification(MaintenanceNotification):
406406
407407
Args:
408408
id (int): Unique identifier for this notification
409-
src_node (Optional[str]): Source node address - the notifications
410-
received by the connections to the src node will
411-
receive the dest node address
412-
dest_node (Optional[str]): Destination node address - the notifications
413-
received by the connections to the dst node will
414-
receive the src node address
415409
slots (Optional[List[int]]): List of slots being migrated
416410
"""
417411

@@ -420,23 +414,17 @@ class OSSNodeMigratingNotification(MaintenanceNotification):
420414
def __init__(
421415
self,
422416
id: int,
423-
src_node: Optional[str] = None,
424-
dest_node: Optional[str] = None,
425417
slots: Optional[List[int]] = None,
426418
):
427419
super().__init__(id, OSSNodeMigratingNotification.DEFAULT_TTL)
428420
self.slots = slots
429-
self.src_node = src_node
430-
self.dest_node = dest_node
431421

432422
def __repr__(self) -> str:
433423
expiry_time = self.creation_time + self.ttl
434424
remaining = max(0, expiry_time - time.monotonic())
435425
return (
436426
f"{self.__class__.__name__}("
437427
f"id={self.id}, "
438-
f"src_node={self.src_node}, "
439-
f"dest_node={self.dest_node}, "
440428
f"slots={self.slots}, "
441429
f"ttl={self.ttl}, "
442430
f"creation_time={self.creation_time}, "

tests/maint_notifications/test_cluster_maint_notifications_handling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ def _get_expected_node_state(
322322
) -> Optional[ConnectionStateExpectation]:
323323
"""Get the expected state for a node."""
324324
for expectation in expectations_list:
325-
if int(expectation.node_port) == (node_port):
325+
if expectation.node_port == node_port:
326326
return expectation
327327
return None
328328

@@ -397,7 +397,7 @@ def test_receive_oss_maintenance_notification(self):
397397
],
398398
)
399399

400-
# execute a commands that will receive the notification
400+
# execute a command that will receive the notification
401401
res = self.cluster.set("anyprefix:{3}:k", "VAL")
402402
assert res is True
403403

tests/maint_notifications/test_maint_notifications.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -393,8 +393,6 @@ def test_init_with_defaults(self):
393393
assert notification.id == 1
394394
assert notification.ttl == OSSNodeMigratingNotification.DEFAULT_TTL
395395
assert notification.creation_time == 1000
396-
assert notification.src_node is None
397-
assert notification.dest_node is None
398396
assert notification.slots is None
399397

400398
def test_init_with_all_parameters(self):
@@ -403,15 +401,11 @@ def test_init_with_all_parameters(self):
403401
slots = [1, 2, 3, 4, 5]
404402
notification = OSSNodeMigratingNotification(
405403
id=1,
406-
src_node="127.0.0.1:6379",
407-
dest_node="127.0.0.1:6380",
408404
slots=slots,
409405
)
410406
assert notification.id == 1
411407
assert notification.ttl == OSSNodeMigratingNotification.DEFAULT_TTL
412408
assert notification.creation_time == 1000
413-
assert notification.src_node == "127.0.0.1:6379"
414-
assert notification.dest_node == "127.0.0.1:6380"
415409
assert notification.slots == slots
416410

417411
def test_default_ttl(self):
@@ -425,8 +419,6 @@ def test_repr(self):
425419
with patch("time.monotonic", return_value=1000):
426420
notification = OSSNodeMigratingNotification(
427421
id=1,
428-
src_node="127.0.0.1:6379",
429-
dest_node="127.0.0.1:6380",
430422
slots=[1, 2, 3],
431423
)
432424

@@ -442,14 +434,10 @@ def test_equality_same_id_and_type(self):
442434
"""Test equality for notifications with same id and type."""
443435
notification1 = OSSNodeMigratingNotification(
444436
id=1,
445-
src_node="127.0.0.1:6379",
446-
dest_node="127.0.0.1:6380",
447437
slots=[1, 2, 3],
448438
)
449439
notification2 = OSSNodeMigratingNotification(
450440
id=1,
451-
src_node="127.0.0.1:6381",
452-
dest_node="127.0.0.1:6382",
453441
slots=[4, 5, 6],
454442
)
455443
# Should be equal because id and type are the same
@@ -471,14 +459,10 @@ def test_hash_same_id_and_type(self):
471459
"""Test hash for notifications with same id and type."""
472460
notification1 = OSSNodeMigratingNotification(
473461
id=1,
474-
src_node="127.0.0.1:6379",
475-
dest_node="127.0.0.1:6380",
476462
slots=[1, 2, 3],
477463
)
478464
notification2 = OSSNodeMigratingNotification(
479465
id=1,
480-
src_node="127.0.0.1:6381",
481-
dest_node="127.0.0.1:6382",
482466
slots=[4, 5, 6],
483467
)
484468
# Should have same hash because id and type are the same

0 commit comments

Comments
 (0)