Skip to content

Hitless upgrade support implementation for synchronous Redis client. #3713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: feat/hitless-upgrade-sync-standalone
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
092e33b
Handling of topology update push notifications for Standalone Redis c…
petyaslavova Jun 27, 2025
41a199e
Adding sequence id to the maintenance push notifications. Adding unit…
petyaslavova Jul 11, 2025
63d0c45
Adding integration-like tests for migrating/migrated events handling
petyaslavova Jul 11, 2025
5c71733
Removed unused imports
petyaslavova Jul 11, 2025
96c6e5d
Revert changing of the default retry object initialization for connec…
petyaslavova Jul 11, 2025
8691475
Complete migrating/migrated integration-like tests
petyaslavova Jul 14, 2025
7b57a22
Adding moving integration-like tests
petyaslavova Jul 15, 2025
bed2e40
Fixed BlockingConnectionPool locking strategy. Removed debug logging.…
petyaslavova Jul 17, 2025
0744ee5
Fixing linters
petyaslavova Jul 17, 2025
4c536f3
Applying Copilot's comments
petyaslavova Jul 17, 2025
6768d5d
Fixed type annotations not compatible with older python versions
petyaslavova Jul 17, 2025
ce31ec7
Add a few more tests and fix pool mock for python 3.9
petyaslavova Jul 17, 2025
d73cd35
Adding maintenance state to connections. Migrating and Migrated are n…
petyaslavova Jul 18, 2025
788cf52
Refactored the tmp host address and timeout storing and the way to ap…
petyaslavova Jul 22, 2025
6d496f0
Apply review comments
petyaslavova Jul 24, 2025
2d3731f
Applying moving/moved only on connections to the same proxy.
petyaslavova Jul 26, 2025
a82cbfe
Merge branch 'master' into ps_hitless_upgrade_sync_redis
petyaslavova Aug 8, 2025
2cdfa75
Applying review comments.
petyaslavova Aug 8, 2025
822fccd
Refactor to have less methods in pool classes and made some of the ex…
petyaslavova Aug 11, 2025
2736aaa
Fixing lint errors
petyaslavova Aug 11, 2025
1e2b96d
Fixing tests
petyaslavova Aug 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions redis/_parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,22 @@ def handle_push_response(self, response, **kwargs):
if msg_type in _INVALIDATION_MESSAGE and self.invalidation_push_handler_func:
return self.invalidation_push_handler_func(response)
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
if msg_type in _MOVING_MESSAGE:
host, port = response[2].split(":")
ttl = response[1]
notification = NodeMovingEvent(host, port, ttl)
return self.node_moving_push_handler_func(notification)
# TODO: PARSE latest format when available
host, port = response[2].decode().split(":")
ttl = response[1]
id = 1 # Hardcoded value until the notification starts including the id
notification = NodeMovingEvent(id, host, port, ttl)
return self.node_moving_push_handler_func(notification)
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
if msg_type in _MIGRATING_MESSAGE:
# TODO: PARSE latest format when available
ttl = response[1]
notification = NodeMigratingEvent(ttl)
id = 2 # Hardcoded value until the notification starts including the id
notification = NodeMigratingEvent(id, ttl)
elif msg_type in _MIGRATED_MESSAGE:
notification = NodeMigratedEvent()
# TODO: PARSE latest format when available
id = 3 # Hardcoded value until the notification starts including the id
notification = NodeMigratedEvent(id)
else:
notification = None
if notification is not None:
Expand Down Expand Up @@ -258,18 +263,21 @@ async def handle_push_response(self, response, **kwargs):
return await self.invalidation_push_handler_func(response)
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
# push notification from enterprise cluster for node moving
# TODO: PARSE latest format when available
host, port = response[2].split(":")
ttl = response[1]
id = 1 # TODO: get unique id from push notification
id = 1 # Hardcoded value for async parser
notification = NodeMovingEvent(id, host, port, ttl)
return await self.node_moving_push_handler_func(notification)
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
if msg_type in _MIGRATING_MESSAGE:
# TODO: PARSE latest format when available
ttl = response[1]
id = 1 # TODO: get unique id from push notification
id = 2 # Hardcoded value for async parser
notification = NodeMigratingEvent(id, ttl)
elif msg_type in _MIGRATED_MESSAGE:
id = 1 # TODO: get unique id from push notification
# TODO: PARSE latest format when available
id = 3 # Hardcoded value for async parser
notification = NodeMigratedEvent(id)
return await self.maintenance_push_handler_func(notification)

Expand All @@ -281,7 +289,7 @@ def set_invalidation_push_handler(self, invalidation_push_handler_func):
"""Set the invalidation push handler function"""
self.invalidation_push_handler_func = invalidation_push_handler_func

def set_node_moving_push_handler_func(self, node_moving_push_handler_func):
def set_node_moving_push_handler(self, node_moving_push_handler_func):
self.node_moving_push_handler_func = node_moving_push_handler_func

def set_maintenance_push_handler(self, maintenance_push_handler_func):
Expand Down
1 change: 1 addition & 0 deletions redis/_parsers/hiredis.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def read_response(self, disable_decoding=False, push_request=False):
disable_decoding=disable_decoding,
push_request=push_request,
)
return response

if disable_decoding:
response = self._reader.gets(False)
Expand Down
2 changes: 2 additions & 0 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,8 @@ def __init__(
)
self._condition = asyncio.Condition()
self.timeout = timeout
self._in_maintenance = False
self._locked = False

@deprecated_args(
args_to_warn=["*"],
Expand Down
9 changes: 1 addition & 8 deletions redis/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import copy
import logging
import re
import threading
import time
Expand Down Expand Up @@ -668,9 +667,6 @@ def _execute_command(self, *args, **options):

finally:
if conn and conn.should_reconnect():
logging.debug(
f"***** Redis reconnect before exit _execute_command --> notification for {conn._sock.getpeername()}"
)
self._close_connection(conn)
conn.connect()
if self._single_connection_client:
Expand Down Expand Up @@ -963,9 +959,6 @@ def _execute(self, conn, command, *args, **kwargs):
lambda _: self._reconnect(conn),
)
if conn.should_reconnect():
logging.debug(
f"***** PubSub --> Reconnect on notification for {conn._sock.getpeername()}"
)
self._reconnect(conn)

return response
Expand Down Expand Up @@ -1669,7 +1662,7 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
lambda error: self._disconnect_raise_on_watching(conn, error),
)
finally:
# in reset() the connection is diconnected before returned to the pool if
# in reset() the connection is disconnected before returned to the pool if
# it is marked for reconnect.
self.reset()

Expand Down
Loading