Skip to content

Commit 41a199e

Browse files
committed
Adding sequence id to the maintenance push notifications. Adding unit tests for maintenance_events.py file
1 parent 092e33b commit 41a199e

File tree

3 files changed

+665
-29
lines changed

3 files changed

+665
-29
lines changed

redis/_parsers/base.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,17 @@ def handle_push_response(self, response, **kwargs):
205205
if msg_type in _MOVING_MESSAGE:
206206
host, port = response[2].split(":")
207207
ttl = response[1]
208-
notification = NodeMovingEvent(host, port, ttl)
208+
id = 1 # Hardcoded value for sync parser
209+
notification = NodeMovingEvent(id, host, port, ttl)
209210
return self.node_moving_push_handler_func(notification)
210211
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
211212
if msg_type in _MIGRATING_MESSAGE:
212213
ttl = response[1]
213-
notification = NodeMigratingEvent(ttl)
214+
id = 2 # Hardcoded value for sync parser
215+
notification = NodeMigratingEvent(id, ttl)
214216
elif msg_type in _MIGRATED_MESSAGE:
215-
notification = NodeMigratedEvent()
217+
id = 3 # Hardcoded value for sync parser
218+
notification = NodeMigratedEvent(id)
216219
else:
217220
notification = None
218221
if notification is not None:
@@ -260,16 +263,16 @@ async def handle_push_response(self, response, **kwargs):
260263
# push notification from enterprise cluster for node moving
261264
host, port = response[2].split(":")
262265
ttl = response[1]
263-
id = 1 # TODO: get unique id from push notification
266+
id = 1 # Hardcoded value for async parser
264267
notification = NodeMovingEvent(id, host, port, ttl)
265268
return await self.node_moving_push_handler_func(notification)
266269
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
267270
if msg_type in _MIGRATING_MESSAGE:
268271
ttl = response[1]
269-
id = 1 # TODO: get unique id from push notification
272+
id = 2 # Hardcoded value for async parser
270273
notification = NodeMigratingEvent(id, ttl)
271274
elif msg_type in _MIGRATED_MESSAGE:
272-
id = 1 # TODO: get unique id from push notification
275+
id = 3 # Hardcoded value for async parser
273276
notification = NodeMigratedEvent(id)
274277
return await self.maintenance_push_handler_func(notification)
275278

redis/maintenance_events.py

Lines changed: 113 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,39 @@
11
import logging
22
import threading
33
import time
4-
from typing import TYPE_CHECKING
4+
from abc import ABC, abstractmethod
5+
from typing import TYPE_CHECKING, Optional
56

67
from redis.typing import Number
78

89
if TYPE_CHECKING:
910
from redis.connection import ConnectionInterface, ConnectionPool
1011

1112

12-
class MaintenanceEvent:
13+
class MaintenanceEvent(ABC):
1314
"""
1415
Base class for maintenance events sent through push messages by Redis server.
1516
16-
This class provides common TTL (Time-To-Live) functionality for all
17-
maintenance events.
17+
This class provides common functionality for all maintenance events including
18+
unique identification and TTL (Time-To-Live) functionality.
1819
1920
Attributes:
21+
id (int): Unique identifier for this event
2022
ttl (int): Time-to-live in seconds for this notification
2123
creation_time (float): Timestamp when the notification was created/read
2224
"""
2325

24-
def __init__(self, ttl: int):
26+
def __init__(self, id: int, ttl: int):
2527
"""
26-
Initialize a new MaintenanceEvent with TTL functionality.
28+
Initialize a new MaintenanceEvent with unique ID and TTL functionality.
2729
2830
Args:
31+
id (int): Unique identifier for this event
2932
ttl (int): Time-to-live in seconds for this notification
3033
"""
34+
self.id = id
3135
self.ttl = ttl
32-
self.creation_time = int(time.time())
36+
self.creation_time = time.monotonic()
3337
self.expire_at = self.creation_time + self.ttl
3438

3539
def is_expired(self) -> bool:
@@ -40,7 +44,49 @@ def is_expired(self) -> bool:
4044
Returns:
4145
bool: True if the event has expired, False otherwise
4246
"""
43-
return int(time.time()) > (self.creation_time + self.ttl)
47+
return time.monotonic() > (self.creation_time + self.ttl)
48+
49+
@abstractmethod
50+
def __repr__(self) -> str:
51+
"""
52+
Return a string representation of the maintenance event.
53+
54+
This method must be implemented by all concrete subclasses.
55+
56+
Returns:
57+
str: String representation of the event
58+
"""
59+
pass
60+
61+
@abstractmethod
62+
def __eq__(self, other) -> bool:
63+
"""
64+
Compare two maintenance events for equality.
65+
66+
This method must be implemented by all concrete subclasses.
67+
Events are typically considered equal if they have the same id
68+
and are of the same type.
69+
70+
Args:
71+
other: The other object to compare with
72+
73+
Returns:
74+
bool: True if the events are equal, False otherwise
75+
"""
76+
pass
77+
78+
@abstractmethod
79+
def __hash__(self) -> int:
80+
"""
81+
Return a hash value for the maintenance event.
82+
83+
This method must be implemented by all concrete subclasses to allow
84+
instances to be used in sets and as dictionary keys.
85+
86+
Returns:
87+
int: Hash value for the event
88+
"""
89+
pass
4490

4591

4692
class NodeMovingEvent(MaintenanceEvent):
@@ -49,25 +95,27 @@ class NodeMovingEvent(MaintenanceEvent):
4995
during cluster rebalancing or maintenance operations.
5096
"""
5197

52-
def __init__(self, new_node_host: str, new_node_port: int, ttl: int):
98+
def __init__(self, id: int, new_node_host: str, new_node_port: int, ttl: int):
5399
"""
54100
Initialize a new NodeMovingEvent.
55101
56102
Args:
103+
id (int): Unique identifier for this event
57104
new_node_host (str): Hostname or IP address of the new replacement node
58105
new_node_port (int): Port number of the new replacement node
59106
ttl (int): Time-to-live in seconds for this notification
60107
"""
61-
super().__init__(ttl)
108+
super().__init__(id, ttl)
62109
self.new_node_host = new_node_host
63110
self.new_node_port = new_node_port
64111

65112
def __repr__(self) -> str:
66113
expiry_time = self.expire_at
67-
remaining = max(0, expiry_time - time.time())
114+
remaining = max(0, expiry_time - time.monotonic())
68115

69116
return (
70117
f"{self.__class__.__name__}("
118+
f"id={self.id}, "
71119
f"new_node_host='{self.new_node_host}', "
72120
f"new_node_port={self.new_node_port}, "
73121
f"ttl={self.ttl}, "
@@ -81,12 +129,13 @@ def __repr__(self) -> str:
81129
def __eq__(self, other) -> bool:
82130
"""
83131
Two NodeMovingEvent events are considered equal if they have the same
84-
new_node_host and new_node_port.
132+
id, new_node_host, and new_node_port.
85133
"""
86134
if not isinstance(other, NodeMovingEvent):
87135
return False
88136
return (
89-
self.new_node_host == other.new_node_host
137+
self.id == other.id
138+
and self.new_node_host == other.new_node_host
90139
and self.new_node_port == other.new_node_port
91140
)
92141

@@ -96,9 +145,9 @@ def __hash__(self) -> int:
96145
instances to be used in sets and as dictionary keys.
97146
98147
Returns:
99-
int: Hash value based on new_node_host and new_node_port
148+
int: Hash value based on event type, id, new_node_host, and new_node_port
100149
"""
101-
return hash((self.__class__, self.new_node_host, self.new_node_port))
150+
return hash((self.__class__, self.id, self.new_node_host, self.new_node_port))
102151

103152

104153
class NodeMigratingEvent(MaintenanceEvent):
@@ -109,17 +158,19 @@ class NodeMigratingEvent(MaintenanceEvent):
109158
during cluster rebalancing or maintenance operations.
110159
111160
Args:
161+
id (int): Unique identifier for this event
112162
ttl (int): Time-to-live in seconds for this notification
113163
"""
114164

115-
def __init__(self, ttl: int):
116-
super().__init__(ttl)
165+
def __init__(self, id: int, ttl: int):
166+
super().__init__(id, ttl)
117167

118168
def __repr__(self) -> str:
119169
expiry_time = self.creation_time + self.ttl
120-
remaining = max(0, expiry_time - time.time())
170+
remaining = max(0, expiry_time - time.monotonic())
121171
return (
122172
f"{self.__class__.__name__}("
173+
f"id={self.id}, "
123174
f"ttl={self.ttl}, "
124175
f"creation_time={self.creation_time}, "
125176
f"expires_at={expiry_time}, "
@@ -128,6 +179,25 @@ def __repr__(self) -> str:
128179
f")"
129180
)
130181

182+
def __eq__(self, other) -> bool:
183+
"""
184+
Two NodeMigratingEvent events are considered equal if they have the same
185+
id and are of the same type.
186+
"""
187+
if not isinstance(other, NodeMigratingEvent):
188+
return False
189+
return self.id == other.id and type(self) is type(other)
190+
191+
def __hash__(self) -> int:
192+
"""
193+
Return a hash value for the event to allow
194+
instances to be used in sets and as dictionary keys.
195+
196+
Returns:
197+
int: Hash value based on event type and id
198+
"""
199+
return hash((self.__class__, self.id))
200+
131201

132202
class NodeMigratedEvent(MaintenanceEvent):
133203
"""
@@ -137,19 +207,20 @@ class NodeMigratedEvent(MaintenanceEvent):
137207
to other nodes during cluster rebalancing or maintenance operations.
138208
139209
Args:
140-
ttl (int): Time-to-live in seconds for this notification
210+
id (int): Unique identifier for this event
141211
"""
142212

143213
DEFAULT_TTL = 5
144214

145-
def __init__(self):
146-
super().__init__(NodeMigratedEvent.DEFAULT_TTL)
215+
def __init__(self, id: int):
216+
super().__init__(id, NodeMigratedEvent.DEFAULT_TTL)
147217

148218
def __repr__(self) -> str:
149219
expiry_time = self.creation_time + self.ttl
150-
remaining = max(0, expiry_time - time.time())
220+
remaining = max(0, expiry_time - time.monotonic())
151221
return (
152222
f"{self.__class__.__name__}("
223+
f"id={self.id}, "
153224
f"ttl={self.ttl}, "
154225
f"creation_time={self.creation_time}, "
155226
f"expires_at={expiry_time}, "
@@ -158,6 +229,25 @@ def __repr__(self) -> str:
158229
f")"
159230
)
160231

232+
def __eq__(self, other) -> bool:
233+
"""
234+
Two NodeMigratedEvent events are considered equal if they have the same
235+
id and are of the same type.
236+
"""
237+
if not isinstance(other, NodeMigratedEvent):
238+
return False
239+
return self.id == other.id and type(self) is type(other)
240+
241+
def __hash__(self) -> int:
242+
"""
243+
Return a hash value for the event to allow
244+
instances to be used in sets and as dictionary keys.
245+
246+
Returns:
247+
int: Hash value based on event type and id
248+
"""
249+
return hash((self.__class__, self.id))
250+
161251

162252
class MaintenanceEventsConfig:
163253
"""
@@ -173,7 +263,7 @@ def __init__(
173263
self,
174264
enabled: bool = False,
175265
proactive_reconnect: bool = True,
176-
relax_timeout: Number = 20,
266+
relax_timeout: Optional[Number] = 20,
177267
):
178268
"""
179269
Initialize a new MaintenanceEventsConfig.

0 commit comments

Comments
 (0)