Skip to content

Commit 0087b4f

Browse files
committed
Added test cases
1 parent 17e2118 commit 0087b4f

File tree

4 files changed

+636
-444
lines changed

4 files changed

+636
-444
lines changed

redis/cluster.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2116,6 +2116,7 @@ def __init__(
21162116
) if not transaction else TransactionStrategy(
21172117
self
21182118
)
2119+
self.command_stack = self._execution_strategy.command_queue
21192120

21202121
def __repr__(self):
21212122
""" """
@@ -2137,7 +2138,7 @@ def __del__(self):
21372138

21382139
def __len__(self):
21392140
""" """
2140-
return len(self._execution_strategy.command_queue)
2141+
return len(self.command_stack)
21412142

21422143
def __bool__(self):
21432144
"Pipeline instances should always evaluate to True on Python 3+"
@@ -2147,7 +2148,7 @@ def execute_command(self, *args, **kwargs):
21472148
"""
21482149
Wrapper function for pipeline_execute_command
21492150
"""
2150-
return self.pipeline_execute_command(*args, **kwargs)
2151+
return self._execution_strategy.execute_command(*args, **kwargs)
21512152

21522153
def pipeline_execute_command(self, *args, **options):
21532154
"""
@@ -2161,8 +2162,7 @@ def pipeline_execute_command(self, *args, **options):
21612162
At some other point, you can then run: pipe.execute(),
21622163
which will execute all commands queued in the pipe.
21632164
"""
2164-
self._execution_strategy.pipeline_execute_command(*args, **options)
2165-
return self
2165+
return self._execution_strategy.execute_command(*args, **options)
21662166

21672167
def annotate_exception(self, exception, number, command):
21682168
"""
@@ -2557,6 +2557,7 @@ def pipeline_execute_command(self, *args, **options):
25572557
self._command_queue.append(
25582558
PipelineCommand(args, options, len(self._command_queue))
25592559
)
2560+
return self._pipe
25602561

25612562
@abstractmethod
25622563
def execute(self, raise_on_error: bool = True) -> List[Any]:
@@ -2605,7 +2606,7 @@ def __init__(self, pipe: ClusterPipeline):
26052606
self.command_flags = pipe.command_flags
26062607

26072608
def execute_command(self, *args, **kwargs):
2608-
self.pipeline_execute_command(*args, **kwargs)
2609+
return self.pipeline_execute_command(*args, **kwargs)
26092610

26102611
def _raise_first_error(self, stack):
26112612
"""
@@ -3150,6 +3151,10 @@ def _execute_transaction(
31503151
except ResponseError as e:
31513152
self.annotate_exception(e, 0, "MULTI")
31523153
errors.append(e)
3154+
except (ClusterDownError, ConnectionError) as cluster_error:
3155+
self._cluster_error = True
3156+
self.annotate_exception(cluster_error, 0, "MULTI")
3157+
raise
31533158

31543159
# and all the other commands
31553160
for i, command in enumerate(self._command_queue):

tests/test_cluster.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -265,12 +265,12 @@ def moved_redirection_helper(request, failover=False):
265265
slot = 12182
266266
redirect_node = None
267267
# Get the current primary that holds this slot
268-
prev_primary = rc._nodes_manager.get_node_from_slot(slot)
268+
prev_primary = rc.nodes_manager.get_node_from_slot(slot)
269269
if failover:
270-
if len(rc._nodes_manager.slots_cache[slot]) < 2:
270+
if len(rc.nodes_manager.slots_cache[slot]) < 2:
271271
warnings.warn("Skipping this test since it requires to have a replica")
272272
return
273-
redirect_node = rc._nodes_manager.slots_cache[slot][1]
273+
redirect_node = rc.nodes_manager.slots_cache[slot][1]
274274
else:
275275
# Use one of the primaries to be the redirected node
276276
redirect_node = rc.get_primaries()[0]
@@ -290,7 +290,7 @@ def ok_response(connection, *args, **options):
290290

291291
parse_response.side_effect = moved_redirect_effect
292292
assert rc.execute_command("SET", "foo", "bar") == "MOCK_OK"
293-
slot_primary = rc._nodes_manager.slots_cache[slot][0]
293+
slot_primary = rc.nodes_manager.slots_cache[slot][0]
294294
assert slot_primary == redirect_node
295295
if failover:
296296
assert rc.get_node(host=r_host, port=r_port).server_type == PRIMARY
@@ -722,7 +722,7 @@ def test_all_nodes(self, r):
722722
"""
723723
Set a list of nodes and it should be possible to iterate over all
724724
"""
725-
nodes = [node for node in r._nodes_manager.nodes_cache.values()]
725+
nodes = [node for node in r.nodes_manager.nodes_cache.values()]
726726

727727
for i, node in enumerate(r.get_nodes()):
728728
assert node in nodes
@@ -734,7 +734,7 @@ def test_all_nodes_masters(self, r):
734734
"""
735735
nodes = [
736736
node
737-
for node in r._nodes_manager.nodes_cache.values()
737+
for node in r.nodes_manager.nodes_cache.values()
738738
if node.server_type == PRIMARY
739739
]
740740

@@ -805,7 +805,7 @@ def test_get_node_from_key(self, r):
805805
"""
806806
key = "bar"
807807
slot = r.keyslot(key)
808-
slot_nodes = r._nodes_manager.slots_cache.get(slot)
808+
slot_nodes = r.nodes_manager.slots_cache.get(slot)
809809
primary = slot_nodes[0]
810810
assert r.get_node_from_key(key, replica=False) == primary
811811
replica = r.get_node_from_key(key, replica=True)
@@ -1003,8 +1003,8 @@ class TestClusterRedisCommands:
10031003

10041004
def test_case_insensitive_command_names(self, r):
10051005
assert (
1006-
r._cluster_response_callbacks["cluster slots"]
1007-
== r._cluster_response_callbacks["CLUSTER SLOTS"]
1006+
r.cluster_response_callbacks["cluster slots"]
1007+
== r.cluster_response_callbacks["CLUSTER SLOTS"]
10081008
)
10091009

10101010
def test_get_and_set(self, r):
@@ -1275,7 +1275,7 @@ def test_cluster_addslotsrange(self, r):
12751275

12761276
@skip_if_redis_enterprise()
12771277
def test_cluster_countkeysinslot(self, r):
1278-
node = r._nodes_manager.get_node_from_slot(1)
1278+
node = r.nodes_manager.get_node_from_slot(1)
12791279
mock_node_resp(node, 2)
12801280
assert r.cluster_countkeysinslot(1) == 2
12811281

@@ -1450,7 +1450,7 @@ def test_cluster_save_config(self, r):
14501450
@skip_if_redis_enterprise()
14511451
def test_cluster_get_keys_in_slot(self, r):
14521452
response = ["{foo}1", "{foo}2"]
1453-
node = r._nodes_manager.get_node_from_slot(12182)
1453+
node = r.nodes_manager.get_node_from_slot(12182)
14541454
mock_node_resp(node, response)
14551455
keys = r.cluster_get_keys_in_slot(12182, 4)
14561456
assert keys == response
@@ -1476,7 +1476,7 @@ def test_cluster_setslot(self, r):
14761476
r.cluster_failover(node, "STATE")
14771477

14781478
def test_cluster_setslot_stable(self, r):
1479-
node = r._nodes_manager.get_node_from_slot(12182)
1479+
node = r.nodes_manager.get_node_from_slot(12182)
14801480
mock_node_resp(node, "OK")
14811481
assert r.cluster_setslot_stable(12182) is True
14821482
assert node.redis_connection.connection.read_response.called
@@ -1562,7 +1562,7 @@ def test_info(self, r):
15621562
r.set("z{1}", 3)
15631563
# Get node that handles the slot
15641564
slot = r.keyslot("x{1}")
1565-
node = r._nodes_manager.get_node_from_slot(slot)
1565+
node = r.nodes_manager.get_node_from_slot(slot)
15661566
# Run info on that node
15671567
info = r.info(target_nodes=node)
15681568
assert isinstance(info, dict)
@@ -1618,7 +1618,7 @@ def test_slowlog_get_limit(self, r, slowlog):
16181618

16191619
def test_slowlog_length(self, r, slowlog):
16201620
r.get("foo")
1621-
node = r._nodes_manager.get_node_from_slot(key_slot(b"foo"))
1621+
node = r.nodes_manager.get_node_from_slot(key_slot(b"foo"))
16221622
slowlog_len = r.slowlog_len(target_nodes=node)
16231623
assert isinstance(slowlog_len, int)
16241624

@@ -1644,7 +1644,7 @@ def test_memory_stats(self, r):
16441644
# put a key into the current db to make sure that "db.<current-db>"
16451645
# has data
16461646
r.set("foo", "bar")
1647-
node = r._nodes_manager.get_node_from_slot(key_slot(b"foo"))
1647+
node = r.nodes_manager.get_node_from_slot(key_slot(b"foo"))
16481648
stats = r.memory_stats(target_nodes=node)
16491649
assert isinstance(stats, dict)
16501650
for key, value in stats.items():
@@ -2530,7 +2530,7 @@ class TestNodesManager:
25302530
"""
25312531

25322532
def test_load_balancer(self, r):
2533-
n_manager = r._nodes_manager
2533+
n_manager = r.nodes_manager
25342534
lb = n_manager.read_load_balancer
25352535
slot_1 = 1257
25362536
slot_2 = 8975
@@ -3428,7 +3428,7 @@ def test_readonly_pipeline_from_readonly_client(self, request):
34283428
mock_all_nodes_resp(ro, "MOCK_OK")
34293429
assert readonly_pipe.read_from_replicas is True
34303430
assert readonly_pipe.get(key).get(key).execute() == ["MOCK_OK", "MOCK_OK"]
3431-
slot_nodes = ro._nodes_manager.slots_cache[ro.keyslot(key)]
3431+
slot_nodes = ro.nodes_manager.slots_cache[ro.keyslot(key)]
34323432
if len(slot_nodes) > 1:
34333433
executed_on_replica = False
34343434
for node in slot_nodes:
@@ -3468,7 +3468,7 @@ def test_readonly_pipeline_with_reading_from_replicas_strategies(
34683468
mock_all_nodes_resp(ro, "MOCK_OK")
34693469
assert readonly_pipe.load_balancing_strategy == load_balancing_strategy
34703470
assert readonly_pipe.get(key).get(key).execute() == ["MOCK_OK", "MOCK_OK"]
3471-
slot_nodes = ro._nodes_manager.slots_cache[ro.keyslot(key)]
3471+
slot_nodes = ro.nodes_manager.slots_cache[ro.keyslot(key)]
34723472
executed_on_replicas_only = True
34733473
for node in slot_nodes:
34743474
if node.server_type == PRIMARY:

0 commit comments

Comments
 (0)