Skip to content

Commit 10ded34

Browse files
committed
Adding handling of FAILING_OVER and FAILED_OVER events/push notifications
1 parent 4c6eb44 commit 10ded34

File tree

3 files changed

+379
-32
lines changed

3 files changed

+379
-32
lines changed

redis/maintenance_events.py

Lines changed: 110 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class MaintenanceState(enum.Enum):
1212
NONE = "none"
1313
MOVING = "moving"
1414
MIGRATING = "migrating"
15+
FAILING_OVER = "failing_over"
1516

1617

1718
if TYPE_CHECKING:
@@ -261,6 +262,105 @@ def __hash__(self) -> int:
261262
return hash((self.__class__, self.id))
262263

263264

265+
class NodeFailingOverEvent(MaintenanceEvent):
266+
"""
267+
Event for when a Redis cluster node is in the process of failing over.
268+
269+
This event is received when a node starts a failover process during
270+
cluster maintenance operations or when handling node failures.
271+
272+
Args:
273+
id (int): Unique identifier for this event
274+
ttl (int): Time-to-live in seconds for this notification
275+
"""
276+
277+
def __init__(self, id: int, ttl: int):
278+
super().__init__(id, ttl)
279+
280+
def __repr__(self) -> str:
281+
expiry_time = self.creation_time + self.ttl
282+
remaining = max(0, expiry_time - time.monotonic())
283+
return (
284+
f"{self.__class__.__name__}("
285+
f"id={self.id}, "
286+
f"ttl={self.ttl}, "
287+
f"creation_time={self.creation_time}, "
288+
f"expires_at={expiry_time}, "
289+
f"remaining={remaining:.1f}s, "
290+
f"expired={self.is_expired()}"
291+
f")"
292+
)
293+
294+
def __eq__(self, other) -> bool:
295+
"""
296+
Two NodeFailingOverEvent events are considered equal if they have the same
297+
id and are of the same type.
298+
"""
299+
if not isinstance(other, NodeFailingOverEvent):
300+
return False
301+
return self.id == other.id and type(self) is type(other)
302+
303+
def __hash__(self) -> int:
304+
"""
305+
Return a hash value for the event to allow
306+
instances to be used in sets and as dictionary keys.
307+
308+
Returns:
309+
int: Hash value based on event type and id
310+
"""
311+
return hash((self.__class__, self.id))
312+
313+
314+
class NodeFailedOverEvent(MaintenanceEvent):
315+
"""
316+
Event for when a Redis cluster node has completed a failover.
317+
318+
This event is received when a node has finished the failover process
319+
during cluster maintenance operations or after handling node failures.
320+
321+
Args:
322+
id (int): Unique identifier for this event
323+
"""
324+
325+
DEFAULT_TTL = 5
326+
327+
def __init__(self, id: int):
328+
super().__init__(id, NodeFailedOverEvent.DEFAULT_TTL)
329+
330+
def __repr__(self) -> str:
331+
expiry_time = self.creation_time + self.ttl
332+
remaining = max(0, expiry_time - time.monotonic())
333+
return (
334+
f"{self.__class__.__name__}("
335+
f"id={self.id}, "
336+
f"ttl={self.ttl}, "
337+
f"creation_time={self.creation_time}, "
338+
f"expires_at={expiry_time}, "
339+
f"remaining={remaining:.1f}s, "
340+
f"expired={self.is_expired()}"
341+
f")"
342+
)
343+
344+
def __eq__(self, other) -> bool:
345+
"""
346+
Two NodeFailedOverEvent events are considered equal if they have the same
347+
id and are of the same type.
348+
"""
349+
if not isinstance(other, NodeFailedOverEvent):
350+
return False
351+
return self.id == other.id and type(self) is type(other)
352+
353+
def __hash__(self) -> int:
354+
"""
355+
Return a hash value for the event to allow
356+
instances to be used in sets and as dictionary keys.
357+
358+
Returns:
359+
int: Hash value based on event type and id
360+
"""
361+
return hash((self.__class__, self.id))
362+
363+
264364
class MaintenanceEventsConfig:
265365
"""
266366
Configuration class for maintenance events handling behaviour. Events are received through
@@ -465,32 +565,36 @@ def __init__(
465565

466566
def handle_event(self, event: MaintenanceEvent):
467567
if isinstance(event, NodeMigratingEvent):
468-
return self.handle_migrating_event(event)
568+
return self.handle_maintenance_start_event(MaintenanceState.MIGRATING)
469569
elif isinstance(event, NodeMigratedEvent):
470-
return self.handle_migration_completed_event(event)
570+
return self.handle_maintenance_completed_event()
571+
elif isinstance(event, NodeFailingOverEvent):
572+
return self.handle_maintenance_start_event(MaintenanceState.FAILING_OVER)
573+
elif isinstance(event, NodeFailedOverEvent):
574+
return self.handle_maintenance_completed_event()
471575
else:
472576
logging.error(f"Unhandled event type: {event}")
473577

474-
def handle_migrating_event(self, notification: NodeMigratingEvent):
578+
def handle_maintenance_start_event(self, maintenance_state: MaintenanceState):
475579
if (
476580
self.connection.maintenance_state == MaintenanceState.MOVING
477581
or not self.config.is_relax_timeouts_enabled()
478582
):
479583
return
480-
self.connection.maintenance_state = MaintenanceState.MIGRATING
584+
self.connection.maintenance_state = maintenance_state
481585
self.connection.set_tmp_settings(tmp_relax_timeout=self.config.relax_timeout)
482586
# extend the timeout for all created connections
483587
self.connection.update_current_socket_timeout(self.config.relax_timeout)
484588

485-
def handle_migration_completed_event(self, notification: "NodeMigratedEvent"):
589+
def handle_maintenance_completed_event(self):
486590
# Only reset timeouts if state is not MOVING and relax timeouts are enabled
487591
if (
488592
self.connection.maintenance_state == MaintenanceState.MOVING
489593
or not self.config.is_relax_timeouts_enabled()
490594
):
491595
return
492596
self.connection.reset_tmp_settings(reset_relax_timeout=True)
493-
# Node migration completed - reset the connection
597+
# Maintenance completed - reset the connection
494598
# timeouts by providing -1 as the relax timeout
495599
self.connection.update_current_socket_timeout(-1)
496600
self.connection.maintenance_state = MaintenanceState.NONE

tests/test_maintenance_events.py

Lines changed: 151 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
NodeMovingEvent,
88
NodeMigratingEvent,
99
NodeMigratedEvent,
10+
NodeFailingOverEvent,
11+
NodeFailedOverEvent,
1012
MaintenanceEventsConfig,
1113
MaintenanceEventPoolHandler,
1214
MaintenanceEventConnectionHandler,
15+
MaintenanceState,
1316
)
1417

1518

@@ -281,6 +284,84 @@ def test_equality_and_hash(self):
281284
assert hash(event1) != hash(event3)
282285

283286

287+
class TestNodeFailingOverEvent:
288+
"""Test the NodeFailingOverEvent class."""
289+
290+
def test_init(self):
291+
"""Test NodeFailingOverEvent initialization."""
292+
with patch("time.monotonic", return_value=1000):
293+
event = NodeFailingOverEvent(id=1, ttl=5)
294+
assert event.id == 1
295+
assert event.ttl == 5
296+
assert event.creation_time == 1000
297+
298+
def test_repr(self):
299+
"""Test NodeFailingOverEvent string representation."""
300+
with patch("time.monotonic", return_value=1000):
301+
event = NodeFailingOverEvent(id=1, ttl=5)
302+
303+
with patch("time.monotonic", return_value=1002): # 2 seconds later
304+
repr_str = repr(event)
305+
assert "NodeFailingOverEvent" in repr_str
306+
assert "id=1" in repr_str
307+
assert "ttl=5" in repr_str
308+
assert "remaining=3.0s" in repr_str
309+
assert "expired=False" in repr_str
310+
311+
def test_equality_and_hash(self):
312+
"""Test equality and hash for NodeFailingOverEvent."""
313+
event1 = NodeFailingOverEvent(id=1, ttl=5)
314+
event2 = NodeFailingOverEvent(id=1, ttl=10) # Same id, different ttl
315+
event3 = NodeFailingOverEvent(id=2, ttl=5) # Different id
316+
317+
assert event1 == event2
318+
assert event1 != event3
319+
assert hash(event1) == hash(event2)
320+
assert hash(event1) != hash(event3)
321+
322+
323+
class TestNodeFailedOverEvent:
324+
"""Test the NodeFailedOverEvent class."""
325+
326+
def test_init(self):
327+
"""Test NodeFailedOverEvent initialization."""
328+
with patch("time.monotonic", return_value=1000):
329+
event = NodeFailedOverEvent(id=1)
330+
assert event.id == 1
331+
assert event.ttl == NodeFailedOverEvent.DEFAULT_TTL
332+
assert event.creation_time == 1000
333+
334+
def test_default_ttl(self):
335+
"""Test that DEFAULT_TTL is used correctly."""
336+
assert NodeFailedOverEvent.DEFAULT_TTL == 5
337+
event = NodeFailedOverEvent(id=1)
338+
assert event.ttl == 5
339+
340+
def test_repr(self):
341+
"""Test NodeFailedOverEvent string representation."""
342+
with patch("time.monotonic", return_value=1000):
343+
event = NodeFailedOverEvent(id=1)
344+
345+
with patch("time.monotonic", return_value=1001): # 1 second later
346+
repr_str = repr(event)
347+
assert "NodeFailedOverEvent" in repr_str
348+
assert "id=1" in repr_str
349+
assert "ttl=5" in repr_str
350+
assert "remaining=4.0s" in repr_str
351+
assert "expired=False" in repr_str
352+
353+
def test_equality_and_hash(self):
354+
"""Test equality and hash for NodeFailedOverEvent."""
355+
event1 = NodeFailedOverEvent(id=1)
356+
event2 = NodeFailedOverEvent(id=1) # Same id
357+
event3 = NodeFailedOverEvent(id=2) # Different id
358+
359+
assert event1 == event2
360+
assert event1 != event3
361+
assert hash(event1) == hash(event2)
362+
assert hash(event1) != hash(event3)
363+
364+
284365
class TestMaintenanceEventsConfig:
285366
"""Test the MaintenanceEventsConfig class."""
286367

@@ -477,19 +558,41 @@ def test_handle_event_migrating(self):
477558
"""Test handling of NodeMigratingEvent."""
478559
event = NodeMigratingEvent(id=1, ttl=5)
479560

480-
with patch.object(self.handler, "handle_migrating_event") as mock_handle:
561+
with patch.object(
562+
self.handler, "handle_maintenance_start_event"
563+
) as mock_handle:
481564
self.handler.handle_event(event)
482-
mock_handle.assert_called_once_with(event)
565+
mock_handle.assert_called_once_with(MaintenanceState.MIGRATING)
483566

484567
def test_handle_event_migrated(self):
485568
"""Test handling of NodeMigratedEvent."""
486569
event = NodeMigratedEvent(id=1)
487570

488571
with patch.object(
489-
self.handler, "handle_migration_completed_event"
572+
self.handler, "handle_maintenance_completed_event"
490573
) as mock_handle:
491574
self.handler.handle_event(event)
492-
mock_handle.assert_called_once_with(event)
575+
mock_handle.assert_called_once_with()
576+
577+
def test_handle_event_failing_over(self):
578+
"""Test handling of NodeFailingOverEvent."""
579+
event = NodeFailingOverEvent(id=1, ttl=5)
580+
581+
with patch.object(
582+
self.handler, "handle_maintenance_start_event"
583+
) as mock_handle:
584+
self.handler.handle_event(event)
585+
mock_handle.assert_called_once_with(MaintenanceState.FAILING_OVER)
586+
587+
def test_handle_event_failed_over(self):
588+
"""Test handling of NodeFailedOverEvent."""
589+
event = NodeFailedOverEvent(id=1)
590+
591+
with patch.object(
592+
self.handler, "handle_maintenance_completed_event"
593+
) as mock_handle:
594+
self.handler.handle_event(event)
595+
mock_handle.assert_called_once_with()
493596

494597
def test_handle_event_unknown_type(self):
495598
"""Test handling of unknown event type."""
@@ -500,43 +603,71 @@ def test_handle_event_unknown_type(self):
500603
result = self.handler.handle_event(event)
501604
assert result is None
502605

503-
def test_handle_migrating_event_disabled(self):
504-
"""Test migrating event handling when relax timeouts are disabled."""
606+
def test_handle_maintenance_start_event_disabled(self):
607+
"""Test maintenance start event handling when relax timeouts are disabled."""
505608
config = MaintenanceEventsConfig(relax_timeout=-1)
506609
handler = MaintenanceEventConnectionHandler(self.mock_connection, config)
507-
event = NodeMigratingEvent(id=1, ttl=5)
508610

509-
result = handler.handle_migrating_event(event)
611+
result = handler.handle_maintenance_start_event(MaintenanceState.MIGRATING)
510612
assert result is None
511613
self.mock_connection.update_current_socket_timeout.assert_not_called()
512614

513-
def test_handle_migrating_event_success(self):
514-
"""Test successful migrating event handling."""
515-
event = NodeMigratingEvent(id=1, ttl=5)
615+
def test_handle_maintenance_start_event_moving_state(self):
616+
"""Test maintenance start event handling when connection is in MOVING state."""
617+
self.mock_connection.maintenance_state = MaintenanceState.MOVING
516618

517-
self.handler.handle_migrating_event(event)
619+
result = self.handler.handle_maintenance_start_event(MaintenanceState.MIGRATING)
620+
assert result is None
621+
self.mock_connection.update_current_socket_timeout.assert_not_called()
518622

623+
def test_handle_maintenance_start_event_migrating_success(self):
624+
"""Test successful maintenance start event handling for migrating."""
625+
self.mock_connection.maintenance_state = MaintenanceState.NONE
626+
627+
self.handler.handle_maintenance_start_event(MaintenanceState.MIGRATING)
628+
629+
assert self.mock_connection.maintenance_state == MaintenanceState.MIGRATING
519630
self.mock_connection.update_current_socket_timeout.assert_called_once_with(20)
520631
self.mock_connection.set_tmp_settings.assert_called_once_with(
521632
tmp_relax_timeout=20
522633
)
523634

524-
def test_handle_migration_completed_event_disabled(self):
525-
"""Test migration completed event handling when relax timeouts are disabled."""
635+
def test_handle_maintenance_start_event_failing_over_success(self):
636+
"""Test successful maintenance start event handling for failing over."""
637+
self.mock_connection.maintenance_state = MaintenanceState.NONE
638+
639+
self.handler.handle_maintenance_start_event(MaintenanceState.FAILING_OVER)
640+
641+
assert self.mock_connection.maintenance_state == MaintenanceState.FAILING_OVER
642+
self.mock_connection.update_current_socket_timeout.assert_called_once_with(20)
643+
self.mock_connection.set_tmp_settings.assert_called_once_with(
644+
tmp_relax_timeout=20
645+
)
646+
647+
def test_handle_maintenance_completed_event_disabled(self):
648+
"""Test maintenance completed event handling when relax timeouts are disabled."""
526649
config = MaintenanceEventsConfig(relax_timeout=-1)
527650
handler = MaintenanceEventConnectionHandler(self.mock_connection, config)
528-
event = NodeMigratedEvent(id=1)
529651

530-
result = handler.handle_migration_completed_event(event)
652+
result = handler.handle_maintenance_completed_event()
531653
assert result is None
532654
self.mock_connection.update_current_socket_timeout.assert_not_called()
533655

534-
def test_handle_migration_completed_event_success(self):
535-
"""Test successful migration completed event handling."""
536-
event = NodeMigratedEvent(id=1)
656+
def test_handle_maintenance_completed_event_moving_state(self):
657+
"""Test maintenance completed event handling when connection is in MOVING state."""
658+
self.mock_connection.maintenance_state = MaintenanceState.MOVING
659+
660+
result = self.handler.handle_maintenance_completed_event()
661+
assert result is None
662+
self.mock_connection.update_current_socket_timeout.assert_not_called()
663+
664+
def test_handle_maintenance_completed_event_success(self):
665+
"""Test successful maintenance completed event handling."""
666+
self.mock_connection.maintenance_state = MaintenanceState.MIGRATING
537667

538-
self.handler.handle_migration_completed_event(event)
668+
self.handler.handle_maintenance_completed_event()
539669

670+
assert self.mock_connection.maintenance_state == MaintenanceState.NONE
540671
self.mock_connection.update_current_socket_timeout.assert_called_once_with(-1)
541672
self.mock_connection.reset_tmp_settings.assert_called_once_with(
542673
reset_relax_timeout=True

0 commit comments

Comments
 (0)