Skip to content

Commit 11b1463

Browse files
authored
Added support for MONITOR in clusters (#1756)
1 parent b7ffec0 commit 11b1463

File tree

7 files changed

+117
-45
lines changed

7 files changed

+117
-45
lines changed

redis/cluster.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,23 @@ def set_default_node(self, node):
670670
log.info(f"Changed the default cluster node to {node}")
671671
return True
672672

673+
def monitor(self, target_node=None):
674+
"""
675+
Returns a Monitor object for the specified target node.
676+
The default cluster node will be selected if no target node was
677+
specified.
678+
Monitor is useful for handling the MONITOR command to the redis server.
679+
next_command() method returns one command from monitor
680+
listen() method yields commands from monitor.
681+
"""
682+
if target_node is None:
683+
target_node = self.get_default_node()
684+
if target_node.redis_connection is None:
685+
raise RedisClusterException(
686+
f"Cluster Node {target_node.name} has no redis_connection"
687+
)
688+
return target_node.redis_connection.monitor()
689+
673690
def pubsub(self, node=None, host=None, port=None, **kwargs):
674691
"""
675692
Allows passing a ClusterNode, or host&port, to get a pubsub instance

tests/conftest.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,12 @@ def skip_ifmodversion_lt(min_version: str, module_name: str):
151151
raise AttributeError(f"No redis module named {module_name}")
152152

153153

154-
def skip_if_redis_enterprise(func):
154+
def skip_if_redis_enterprise():
155155
check = REDIS_INFO["enterprise"] is True
156156
return pytest.mark.skipif(check, reason="Redis enterprise")
157157

158158

159-
def skip_ifnot_redis_enterprise(func):
159+
def skip_ifnot_redis_enterprise():
160160
check = REDIS_INFO["enterprise"] is False
161161
return pytest.mark.skipif(check, reason="Not running in redis enterprise")
162162

@@ -324,16 +324,18 @@ def master_host(request):
324324
yield parts.hostname, parts.port
325325

326326

327-
def wait_for_command(client, monitor, command):
327+
def wait_for_command(client, monitor, command, key=None):
328328
# issue a command with a key name that's local to this process.
329329
# if we find a command with our key before the command we're waiting
330330
# for, something went wrong
331-
redis_version = REDIS_INFO["version"]
332-
if LooseVersion(redis_version) >= LooseVersion("5.0.0"):
333-
id_str = str(client.client_id())
334-
else:
335-
id_str = f"{random.randrange(2 ** 32):08x}"
336-
key = f"__REDIS-PY-{id_str}__"
331+
if key is None:
332+
# generate key
333+
redis_version = REDIS_INFO["version"]
334+
if LooseVersion(redis_version) >= LooseVersion("5.0.0"):
335+
id_str = str(client.client_id())
336+
else:
337+
id_str = f"{random.randrange(2 ** 32):08x}"
338+
key = f"__REDIS-PY-{id_str}__"
337339
client.get(key)
338340
while True:
339341
monitor_response = monitor.next_command()

tests/test_cluster.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
skip_if_redis_enterprise,
3737
skip_if_server_version_lt,
3838
skip_unless_arch_bits,
39+
wait_for_command,
3940
)
4041

4142
default_host = "127.0.0.1"
@@ -1774,7 +1775,7 @@ def test_cluster_randomkey(self, r):
17741775
assert r.randomkey(target_nodes=node) in (b"{foo}a", b"{foo}b", b"{foo}c")
17751776

17761777
@skip_if_server_version_lt("6.0.0")
1777-
@skip_if_redis_enterprise
1778+
@skip_if_redis_enterprise()
17781779
def test_acl_log(self, r, request):
17791780
key = "{cache}:"
17801781
node = r.get_node_from_key(key)
@@ -2631,3 +2632,54 @@ def test_readonly_pipeline_from_readonly_client(self, request):
26312632
if executed_on_replica:
26322633
break
26332634
assert executed_on_replica is True
2635+
2636+
2637+
@pytest.mark.onlycluster
2638+
class TestClusterMonitor:
2639+
def test_wait_command_not_found(self, r):
2640+
"Make sure the wait_for_command func works when command is not found"
2641+
key = "foo"
2642+
node = r.get_node_from_key(key)
2643+
with r.monitor(target_node=node) as m:
2644+
response = wait_for_command(r, m, "nothing", key=key)
2645+
assert response is None
2646+
2647+
def test_response_values(self, r):
2648+
db = 0
2649+
key = "foo"
2650+
node = r.get_node_from_key(key)
2651+
with r.monitor(target_node=node) as m:
2652+
r.ping(target_nodes=node)
2653+
response = wait_for_command(r, m, "PING", key=key)
2654+
assert isinstance(response["time"], float)
2655+
assert response["db"] == db
2656+
assert response["client_type"] in ("tcp", "unix")
2657+
assert isinstance(response["client_address"], str)
2658+
assert isinstance(response["client_port"], str)
2659+
assert response["command"] == "PING"
2660+
2661+
def test_command_with_quoted_key(self, r):
2662+
key = "{foo}1"
2663+
node = r.get_node_from_key(key)
2664+
with r.monitor(node) as m:
2665+
r.get('{foo}"bar')
2666+
response = wait_for_command(r, m, 'GET {foo}"bar', key=key)
2667+
assert response["command"] == 'GET {foo}"bar'
2668+
2669+
def test_command_with_binary_data(self, r):
2670+
key = "{foo}1"
2671+
node = r.get_node_from_key(key)
2672+
with r.monitor(target_node=node) as m:
2673+
byte_string = b"{foo}bar\x92"
2674+
r.get(byte_string)
2675+
response = wait_for_command(r, m, "GET {foo}bar\\x92", key=key)
2676+
assert response["command"] == "GET {foo}bar\\x92"
2677+
2678+
def test_command_with_escaped_data(self, r):
2679+
key = "{foo}1"
2680+
node = r.get_node_from_key(key)
2681+
with r.monitor(target_node=node) as m:
2682+
byte_string = b"{foo}bar\\x92"
2683+
r.get(byte_string)
2684+
response = wait_for_command(r, m, "GET {foo}bar\\\\x92", key=key)
2685+
assert response["command"] == "GET {foo}bar\\\\x92"

tests/test_commands.py

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def test_acl_cat_with_category(self, r):
8484
assert "get" in commands
8585

8686
@skip_if_server_version_lt("6.0.0")
87-
@skip_if_redis_enterprise
87+
@skip_if_redis_enterprise()
8888
def test_acl_deluser(self, r, request):
8989
username = "redis-py-user"
9090

@@ -109,7 +109,7 @@ def teardown():
109109
assert r.acl_getuser(users[4]) is None
110110

111111
@skip_if_server_version_lt("6.0.0")
112-
@skip_if_redis_enterprise
112+
@skip_if_redis_enterprise()
113113
def test_acl_genpass(self, r):
114114
password = r.acl_genpass()
115115
assert isinstance(password, str)
@@ -123,7 +123,7 @@ def test_acl_genpass(self, r):
123123
assert isinstance(password, str)
124124

125125
@skip_if_server_version_lt("6.0.0")
126-
@skip_if_redis_enterprise
126+
@skip_if_redis_enterprise()
127127
def test_acl_getuser_setuser(self, r, request):
128128
username = "redis-py-user"
129129

@@ -236,7 +236,7 @@ def test_acl_help(self, r):
236236
assert len(res) != 0
237237

238238
@skip_if_server_version_lt("6.0.0")
239-
@skip_if_redis_enterprise
239+
@skip_if_redis_enterprise()
240240
def test_acl_list(self, r, request):
241241
username = "redis-py-user"
242242

@@ -250,7 +250,8 @@ def teardown():
250250
assert len(users) == 2
251251

252252
@skip_if_server_version_lt("6.0.0")
253-
@skip_if_redis_enterprise
253+
@skip_if_redis_enterprise()
254+
@pytest.mark.onlynoncluster
254255
def test_acl_log(self, r, request):
255256
username = "redis-py-user"
256257

@@ -292,7 +293,7 @@ def teardown():
292293
assert r.acl_log_reset()
293294

294295
@skip_if_server_version_lt("6.0.0")
295-
@skip_if_redis_enterprise
296+
@skip_if_redis_enterprise()
296297
def test_acl_setuser_categories_without_prefix_fails(self, r, request):
297298
username = "redis-py-user"
298299

@@ -305,7 +306,7 @@ def teardown():
305306
r.acl_setuser(username, categories=["list"])
306307

307308
@skip_if_server_version_lt("6.0.0")
308-
@skip_if_redis_enterprise
309+
@skip_if_redis_enterprise()
309310
def test_acl_setuser_commands_without_prefix_fails(self, r, request):
310311
username = "redis-py-user"
311312

@@ -318,7 +319,7 @@ def teardown():
318319
r.acl_setuser(username, commands=["get"])
319320

320321
@skip_if_server_version_lt("6.0.0")
321-
@skip_if_redis_enterprise
322+
@skip_if_redis_enterprise()
322323
def test_acl_setuser_add_passwords_and_nopass_fails(self, r, request):
323324
username = "redis-py-user"
324325

@@ -363,7 +364,7 @@ def test_client_list_types_not_replica(self, r):
363364
clients = r.client_list(_type=client_type)
364365
assert isinstance(clients, list)
365366

366-
@skip_if_redis_enterprise
367+
@skip_if_redis_enterprise()
367368
def test_client_list_replica(self, r):
368369
clients = r.client_list(_type="replica")
369370
assert isinstance(clients, list)
@@ -529,7 +530,7 @@ def test_client_kill_filter_by_laddr(self, r, r2):
529530
assert r.client_kill_filter(laddr=client_2_addr)
530531

531532
@skip_if_server_version_lt("6.0.0")
532-
@skip_if_redis_enterprise
533+
@skip_if_redis_enterprise()
533534
def test_client_kill_filter_by_user(self, r, request):
534535
killuser = "user_to_kill"
535536
r.acl_setuser(
@@ -549,7 +550,7 @@ def test_client_kill_filter_by_user(self, r, request):
549550

550551
@pytest.mark.onlynoncluster
551552
@skip_if_server_version_lt("2.9.50")
552-
@skip_if_redis_enterprise
553+
@skip_if_redis_enterprise()
553554
def test_client_pause(self, r):
554555
assert r.client_pause(1)
555556
assert r.client_pause(timeout=1)
@@ -558,7 +559,7 @@ def test_client_pause(self, r):
558559

559560
@pytest.mark.onlynoncluster
560561
@skip_if_server_version_lt("6.2.0")
561-
@skip_if_redis_enterprise
562+
@skip_if_redis_enterprise()
562563
def test_client_unpause(self, r):
563564
assert r.client_unpause() == b"OK"
564565

@@ -578,7 +579,7 @@ def test_client_reply(self, r, r_timeout):
578579

579580
@pytest.mark.onlynoncluster
580581
@skip_if_server_version_lt("6.0.0")
581-
@skip_if_redis_enterprise
582+
@skip_if_redis_enterprise()
582583
def test_client_getredir(self, r):
583584
assert isinstance(r.client_getredir(), int)
584585
assert r.client_getredir() == -1
@@ -590,7 +591,7 @@ def test_config_get(self, r):
590591
# assert data['maxmemory'].isdigit()
591592

592593
@pytest.mark.onlynoncluster
593-
@skip_if_redis_enterprise
594+
@skip_if_redis_enterprise()
594595
def test_config_resetstat(self, r):
595596
r.ping()
596597
prior_commands_processed = int(r.info()["total_commands_processed"])
@@ -599,7 +600,7 @@ def test_config_resetstat(self, r):
599600
reset_commands_processed = int(r.info()["total_commands_processed"])
600601
assert reset_commands_processed < prior_commands_processed
601602

602-
@skip_if_redis_enterprise
603+
@skip_if_redis_enterprise()
603604
def test_config_set(self, r):
604605
r.config_set("timeout", 70)
605606
assert r.config_get()["timeout"] == "70"
@@ -626,7 +627,7 @@ def test_info(self, r):
626627
assert "redis_version" in info.keys()
627628

628629
@pytest.mark.onlynoncluster
629-
@skip_if_redis_enterprise
630+
@skip_if_redis_enterprise()
630631
def test_lastsave(self, r):
631632
assert isinstance(r.lastsave(), datetime.datetime)
632633

@@ -731,7 +732,7 @@ def test_time(self, r):
731732
assert isinstance(t[0], int)
732733
assert isinstance(t[1], int)
733734

734-
@skip_if_redis_enterprise
735+
@skip_if_redis_enterprise()
735736
def test_bgsave(self, r):
736737
assert r.bgsave()
737738
time.sleep(0.3)
@@ -1312,7 +1313,7 @@ def test_stralgo_lcs(self, r):
13121313
value2 = "mynewtext"
13131314
res = "mytext"
13141315

1315-
if skip_if_redis_enterprise(None).args[0] is True:
1316+
if skip_if_redis_enterprise().args[0] is True:
13161317
with pytest.raises(redis.exceptions.ResponseError):
13171318
assert r.stralgo("LCS", value1, value2) == res
13181319
return
@@ -1354,7 +1355,7 @@ def test_strlen(self, r):
13541355
def test_substr(self, r):
13551356
r["a"] = "0123456789"
13561357

1357-
if skip_if_redis_enterprise(None).args[0] is True:
1358+
if skip_if_redis_enterprise().args[0] is True:
13581359
with pytest.raises(redis.exceptions.ResponseError):
13591360
assert r.substr("a", 0) == b"0123456789"
13601361
return
@@ -2665,7 +2666,7 @@ def test_cluster_slaves(self, mock_cluster_resp_slaves):
26652666

26662667
@pytest.mark.onlynoncluster
26672668
@skip_if_server_version_lt("3.0.0")
2668-
@skip_if_redis_enterprise
2669+
@skip_if_redis_enterprise()
26692670
def test_readwrite(self, r):
26702671
assert r.readwrite()
26712672

@@ -4016,7 +4017,7 @@ def test_memory_doctor(self, r):
40164017

40174018
@skip_if_server_version_lt("4.0.0")
40184019
def test_memory_malloc_stats(self, r):
4019-
if skip_if_redis_enterprise(None).args[0] is True:
4020+
if skip_if_redis_enterprise().args[0] is True:
40204021
with pytest.raises(redis.exceptions.ResponseError):
40214022
assert r.memory_malloc_stats()
40224023
return
@@ -4029,7 +4030,7 @@ def test_memory_stats(self, r):
40294030
# has data
40304031
r.set("foo", "bar")
40314032

4032-
if skip_if_redis_enterprise(None).args[0] is True:
4033+
if skip_if_redis_enterprise().args[0] is True:
40334034
with pytest.raises(redis.exceptions.ResponseError):
40344035
stats = r.memory_stats()
40354036
return
@@ -4047,7 +4048,7 @@ def test_memory_usage(self, r):
40474048

40484049
@pytest.mark.onlynoncluster
40494050
@skip_if_server_version_lt("4.0.0")
4050-
@skip_if_redis_enterprise
4051+
@skip_if_redis_enterprise()
40514052
def test_module_list(self, r):
40524053
assert isinstance(r.module_list(), list)
40534054
for x in r.module_list():
@@ -4088,7 +4089,7 @@ def test_command(self, r):
40884089

40894090
@pytest.mark.onlynoncluster
40904091
@skip_if_server_version_lt("4.0.0")
4091-
@skip_if_redis_enterprise
4092+
@skip_if_redis_enterprise()
40924093
def test_module(self, r):
40934094
with pytest.raises(redis.exceptions.ModuleError) as excinfo:
40944095
r.module_load("/some/fake/path")
@@ -4144,7 +4145,7 @@ def test_restore_frequency(self, r):
41444145

41454146
@pytest.mark.onlynoncluster
41464147
@skip_if_server_version_lt("5.0.0")
4147-
@skip_if_redis_enterprise
4148+
@skip_if_redis_enterprise()
41484149
def test_replicaof(self, r):
41494150
with pytest.raises(redis.ResponseError):
41504151
assert r.replicaof("NO ONE")
@@ -4226,7 +4227,7 @@ def test_22_info(self, r):
42264227
assert "6" in parsed["allocation_stats"]
42274228
assert ">=256" in parsed["allocation_stats"]
42284229

4229-
@skip_if_redis_enterprise
4230+
@skip_if_redis_enterprise()
42304231
def test_large_responses(self, r):
42314232
"The PythonParser has some special cases for return values > 1MB"
42324233
# load up 5MB of data into a key

0 commit comments

Comments
 (0)