Skip to content

Commit fe8e061

Browse files
authored
Merge branch 'master' into feat/active-active
2 parents 1f3e5c6 + 3471e08 commit fe8e061

File tree

11 files changed

+144
-49
lines changed

11 files changed

+144
-49
lines changed

.github/workflows/integration.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ env:
2727
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
2828
# this speeds up coverage with Python 3.12: https://github.com/nedbat/coveragepy/issues/1665
2929
COVERAGE_CORE: sysmon
30+
# patch releases get included in the base version image when they are published
31+
# for example after 8.2.1 is published, 8.2 image contains 8.2.1 content
3032
CURRENT_CLIENT_LIBS_TEST_STACK_IMAGE_TAG: '8.2'
3133
CURRENT_REDIS_VERSION: '8.2'
3234

@@ -74,7 +76,7 @@ jobs:
7476
max-parallel: 15
7577
fail-fast: false
7678
matrix:
77-
redis-version: ['8.2.1-pre', '${{ needs.redis_version.outputs.CURRENT }}', '8.0.2' ,'7.4.4', '7.2.9']
79+
redis-version: ['8.4-M01-pre', '${{ needs.redis_version.outputs.CURRENT }}', '8.0.2' ,'7.4.4', '7.2.9']
7880
python-version: ['3.9', '3.13']
7981
parser-backend: ['plain']
8082
event-loop: ['asyncio']

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ filterwarnings = [
8383
# Ignore a coverage warning when COVERAGE_CORE=sysmon for Pythons < 3.12.
8484
"ignore:sys.monitoring isn't available:coverage.exceptions.CoverageWarning",
8585
]
86+
log_cli_level = "INFO"
87+
log_cli_date_format = "%H:%M:%S:%f"
88+
log_cli = false
89+
log_cli_format = "%(asctime)s %(levelname)s %(threadName)s: %(message)s"
90+
log_level = "INFO"
91+
capture = "yes"
8692

8793
[tool.ruff]
8894
target-version = "py39"

redis/asyncio/client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,9 +1161,12 @@ async def get_message(
11611161
return await self.handle_message(response, ignore_subscribe_messages)
11621162
return None
11631163

1164-
def ping(self, message=None) -> Awaitable:
1164+
def ping(self, message=None) -> Awaitable[bool]:
11651165
"""
1166-
Ping the Redis server
1166+
Ping the Redis server to test connectivity.
1167+
1168+
Sends a PING command to the Redis server and returns True if the server
1169+
responds with "PONG".
11671170
"""
11681171
args = ["PING", message] if message is not None else ["PING"]
11691172
return self.execute_command(*args)

redis/client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,10 @@ def get_message(
11861186

11871187
def ping(self, message: Union[str, None] = None) -> bool:
11881188
"""
1189-
Ping the Redis server
1189+
Ping the Redis server to test connectivity.
1190+
1191+
Sends a PING command to the Redis server and returns True if the server
1192+
responds with "PONG".
11901193
"""
11911194
args = ["PING", message] if message is not None else ["PING"]
11921195
return self.execute_command(*args)

redis/commands/core.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,11 +1210,18 @@ def latency_reset(self, *events: str) -> ResponseT:
12101210
"""
12111211
return self.execute_command("LATENCY RESET", *events)
12121212

1213-
def ping(self, **kwargs) -> ResponseT:
1213+
def ping(self, **kwargs) -> Union[Awaitable[bool], bool]:
12141214
"""
1215-
Ping the Redis server
1215+
Ping the Redis server to test connectivity.
12161216
1217-
For more information, see https://redis.io/commands/ping
1217+
Sends a PING command to the Redis server and returns True if the server
1218+
responds with "PONG".
1219+
1220+
This command is useful for:
1221+
- Testing whether a connection is still alive
1222+
- Verifying the server's ability to serve data
1223+
1224+
For more information on the underlying ping command see https://redis.io/commands/ping
12181225
"""
12191226
return self.execute_command("PING", **kwargs)
12201227

tests/test_asyncio/test_commands.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3158,6 +3158,7 @@ async def test_xgroup_destroy(self, r: redis.Redis):
31583158
assert await r.xgroup_destroy(stream, group)
31593159

31603160
@skip_if_server_version_lt("7.0.0")
3161+
@skip_if_server_version_gte("8.2.2")
31613162
async def test_xgroup_setid(self, r: redis.Redis):
31623163
stream = "stream"
31633164
group = "group"
@@ -3178,6 +3179,28 @@ async def test_xgroup_setid(self, r: redis.Redis):
31783179
]
31793180
assert await r.xinfo_groups(stream) == expected
31803181

3182+
@skip_if_server_version_lt("8.2.2")
3183+
async def test_xgroup_setid_fixed_max_entries_read(self, r):
3184+
stream = "stream"
3185+
group = "group"
3186+
message_id = await r.xadd(stream, {"foo": "bar"})
3187+
await r.xadd(stream, {"foo1": "bar1"})
3188+
3189+
await r.xgroup_create(stream, group, 0)
3190+
# advance the last_delivered_id to the message_id
3191+
await r.xgroup_setid(stream, group, message_id, entries_read=2)
3192+
expected = [
3193+
{
3194+
"name": group.encode(),
3195+
"consumers": 0,
3196+
"pending": 0,
3197+
"last-delivered-id": message_id,
3198+
"entries-read": 2,
3199+
"lag": 0,
3200+
}
3201+
]
3202+
assert await r.xinfo_groups(stream) == expected
3203+
31813204
@skip_if_server_version_lt("7.2.0")
31823205
async def test_xinfo_consumers(self, r: redis.Redis):
31833206
stream = "stream"

tests/test_asyncio/test_vsets.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,8 +502,8 @@ async def test_vsim_epsilon(d_client):
502502
await d_client.vset().vadd("myset", [2, 1, 1], "a")
503503
await d_client.vset().vadd("myset", [2, 0, 1], "b")
504504
await d_client.vset().vadd("myset", [2, 0, 0], "c")
505-
await d_client.vset().vadd("myset", [2, 0, -1], "d")
506-
await d_client.vset().vadd("myset", [2, -1, -1], "e")
505+
await d_client.vset().vadd("myset", [2, 0, 2], "d")
506+
await d_client.vset().vadd("myset", [-2, -1, -1], "e")
507507

508508
res1 = await d_client.vset().vsim("myset", [2, 1, 1])
509509
assert 5 == len(res1)

tests/test_commands.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4607,6 +4607,7 @@ def test_xgroup_create_mkstream(self, r):
46074607
assert r.xinfo_groups(stream) == expected
46084608

46094609
@skip_if_server_version_lt("7.0.0")
4610+
@skip_if_server_version_gte("8.2.2")
46104611
def test_xgroup_create_entriesread(self, r: redis.Redis):
46114612
stream = "stream"
46124613
group = "group"
@@ -4628,6 +4629,33 @@ def test_xgroup_create_entriesread(self, r: redis.Redis):
46284629
]
46294630
assert r.xinfo_groups(stream) == expected
46304631

4632+
@skip_if_server_version_lt("8.2.2")
4633+
def test_xgroup_create_entriesread_fixed_max_entries_read(self, r: redis.Redis):
4634+
stream = "stream"
4635+
group = "group"
4636+
r.xadd(stream, {"foo": "bar"})
4637+
r.xadd(stream, {"foo1": "bar1"})
4638+
r.xadd(stream, {"foo2": "bar2"})
4639+
4640+
# no group is setup yet, no info to obtain
4641+
assert r.xinfo_groups(stream) == []
4642+
4643+
assert r.xgroup_create(stream, group, 0, entries_read=7)
4644+
# validate the entries-read is max the number of entries
4645+
# in the stream and lag shows the entries between
4646+
# last_delivered_id and entries_added
4647+
expected = [
4648+
{
4649+
"name": group.encode(),
4650+
"consumers": 0,
4651+
"pending": 0,
4652+
"last-delivered-id": b"0-0",
4653+
"entries-read": 3,
4654+
"lag": 3,
4655+
}
4656+
]
4657+
assert r.xinfo_groups(stream) == expected
4658+
46314659
@skip_if_server_version_lt("5.0.0")
46324660
def test_xgroup_delconsumer(self, r):
46334661
stream = "stream"
@@ -4675,6 +4703,7 @@ def test_xgroup_destroy(self, r):
46754703
assert r.xgroup_destroy(stream, group)
46764704

46774705
@skip_if_server_version_lt("7.0.0")
4706+
@skip_if_server_version_gte("8.2.2")
46784707
def test_xgroup_setid(self, r):
46794708
stream = "stream"
46804709
group = "group"
@@ -4695,6 +4724,28 @@ def test_xgroup_setid(self, r):
46954724
]
46964725
assert r.xinfo_groups(stream) == expected
46974726

4727+
@skip_if_server_version_lt("8.2.2")
4728+
def test_xgroup_setid_fixed_max_entries_read(self, r):
4729+
stream = "stream"
4730+
group = "group"
4731+
message_id = r.xadd(stream, {"foo": "bar"})
4732+
r.xadd(stream, {"foo1": "bar1"})
4733+
4734+
r.xgroup_create(stream, group, 0)
4735+
# advance the last_delivered_id to the message_id
4736+
r.xgroup_setid(stream, group, message_id, entries_read=5)
4737+
expected = [
4738+
{
4739+
"name": group.encode(),
4740+
"consumers": 0,
4741+
"pending": 0,
4742+
"last-delivered-id": message_id,
4743+
"entries-read": 2,
4744+
"lag": 0,
4745+
}
4746+
]
4747+
assert r.xinfo_groups(stream) == expected
4748+
46984749
@skip_if_server_version_lt("7.2.0")
46994750
def test_xinfo_consumers(self, r):
47004751
stream = "stream"

tests/test_scenario/maint_notifications_helpers.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,9 @@ def find_target_node_and_empty_node(
116116

117117
# Get all node IDs from CLUSTER NODES section
118118
all_nodes = set()
119-
nodes_with_shards = set()
120-
master_nodes = set()
119+
nodes_with_any_shards = set() # Nodes with shards from ANY database
120+
nodes_with_target_db_shards = set() # Nodes with shards from target database
121+
master_nodes = set() # Master nodes for target database only
121122

122123
for line in lines:
123124
line = line.strip()
@@ -146,31 +147,45 @@ def find_target_node_and_empty_node(
146147
# Parse shard line: db:1 m-standard redis:1 node:2 master 0-8191 1.4MB OK
147148
parts = line.split()
148149
if len(parts) >= 5:
150+
db_id = parts[0] # db:1, db:2, etc.
149151
node_id = parts[3] # node:2
150152
shard_role = parts[4] # master/slave - this is what matters
151153

152-
nodes_with_shards.add(node_id)
153-
if shard_role == "master":
154-
master_nodes.add(node_id)
154+
# Track ALL nodes with shards (for finding truly empty nodes)
155+
nodes_with_any_shards.add(node_id)
156+
157+
# Only track master nodes for the specific database we're testing
158+
bdb_id = endpoint_config.get("bdb_id")
159+
if db_id == f"db:{bdb_id}":
160+
nodes_with_target_db_shards.add(node_id)
161+
if shard_role == "master":
162+
master_nodes.add(node_id)
155163
elif line.startswith("ENDPOINTS:") or not line:
156164
shards_section_started = False
157165

158-
# Find empty node (node with no shards)
159-
empty_nodes = all_nodes - nodes_with_shards
166+
# Find empty node (node with no shards from ANY database)
167+
nodes_with_no_shards_target_bdb = all_nodes - nodes_with_target_db_shards
160168

161169
logging.debug(f"All nodes: {all_nodes}")
162-
logging.debug(f"Nodes with shards: {nodes_with_shards}")
163-
logging.debug(f"Master nodes: {master_nodes}")
164-
logging.debug(f"Empty nodes: {empty_nodes}")
170+
logging.debug(f"Nodes with shards from any database: {nodes_with_any_shards}")
171+
logging.debug(
172+
f"Nodes with target database shards: {nodes_with_target_db_shards}"
173+
)
174+
logging.debug(f"Master nodes (target database only): {master_nodes}")
175+
logging.debug(
176+
f"Nodes with no shards from target database: {nodes_with_no_shards_target_bdb}"
177+
)
165178

166-
if not empty_nodes:
167-
raise ValueError("No empty nodes (nodes without shards) found")
179+
if not nodes_with_no_shards_target_bdb:
180+
raise ValueError("All nodes have shards from target database")
168181

169182
if not master_nodes:
170-
raise ValueError("No nodes with master shards found")
183+
raise ValueError("No nodes with master shards from target database found")
171184

172185
# Return the first available empty node and master node (numeric part only)
173-
empty_node = next(iter(empty_nodes)).split(":")[1] # node:1 -> 1
186+
empty_node = next(iter(nodes_with_no_shards_target_bdb)).split(":")[
187+
1
188+
] # node:1 -> 1
174189
target_node = next(iter(master_nodes)).split(":")[1] # node:2 -> 2
175190

176191
return target_node, empty_node

tests/test_scenario/test_maint_notifications.py

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
datefmt="%Y-%m-%d %H:%M:%S:%f",
3737
)
3838

39-
BIND_TIMEOUT = 30
39+
BIND_TIMEOUT = 60
4040
MIGRATE_TIMEOUT = 60
4141
FAILOVER_TIMEOUT = 15
4242

@@ -109,29 +109,6 @@ def setup_and_cleanup(
109109
except Exception as e:
110110
logging.error(f"Failed to revert failover: {e}")
111111

112-
if self._migration_executed:
113-
try:
114-
if self.target_node and self.empty_node:
115-
self._execute_migration(
116-
fault_injector_client=fault_injector_client,
117-
endpoints_config=endpoints_config,
118-
target_node=self.empty_node,
119-
empty_node=self.target_node,
120-
)
121-
logging.info("Migration cleanup completed")
122-
except Exception as e:
123-
logging.error(f"Failed to revert migration: {e}")
124-
125-
if self._bind_executed:
126-
try:
127-
if self.endpoint_id:
128-
self._execute_bind(
129-
fault_injector_client, endpoints_config, self.endpoint_id
130-
)
131-
logging.info("Bind cleanup completed")
132-
except Exception as e:
133-
logging.error(f"Failed to revert bind endpoint: {e}")
134-
135112
logging.info("Cleanup finished")
136113

137114
def _execute_failover(
@@ -916,7 +893,15 @@ def test_new_connections_receive_migrating(
916893
)
917894

918895
migrate_thread.join()
896+
logging.info("Executing rladmin bind endpoint command for cleanup...")
919897

898+
bind_thread = Thread(
899+
target=self._execute_bind,
900+
name="bind_thread",
901+
args=(fault_injector_client, endpoints_config, self.endpoint_id),
902+
)
903+
bind_thread.start()
904+
bind_thread.join()
920905
client_maint_notifications.connection_pool.release(first_conn)
921906
client_maint_notifications.connection_pool.release(second_connection)
922907

0 commit comments

Comments
 (0)