5
5
import time
6
6
from abc import ABC , abstractmethod
7
7
from collections import OrderedDict
8
+ from copy import copy
8
9
from enum import Enum
9
10
from itertools import chain
10
11
from typing import Any , Callable , Dict , List , Optional , Set , Tuple , Union
@@ -2166,7 +2167,7 @@ def __init__(
2166
2167
else :
2167
2168
self .retry = Retry (
2168
2169
backoff = ExponentialWithJitterBackoff (base = 1 , cap = 10 ),
2169
- retries = self . cluster_error_retry_attempts ,
2170
+ retries = cluster_error_retry_attempts ,
2170
2171
)
2171
2172
2172
2173
self .encoder = Encoder (
@@ -2178,10 +2179,8 @@ def __init__(
2178
2179
lock = threading .Lock ()
2179
2180
self ._lock = lock
2180
2181
self .parent_execute_command = super ().execute_command
2181
- self ._execution_strategy : ExecutionStrategy = PipelineStrategy (
2182
- self
2183
- ) if not transaction else TransactionStrategy (
2184
- self
2182
+ self ._execution_strategy : ExecutionStrategy = (
2183
+ PipelineStrategy (self ) if not transaction else TransactionStrategy (self )
2185
2184
)
2186
2185
self .command_stack = self ._execution_strategy .command_queue
2187
2186
@@ -2477,7 +2476,6 @@ def read(self):
2477
2476
2478
2477
2479
2478
class ExecutionStrategy (ABC ):
2480
-
2481
2479
@property
2482
2480
@abstractmethod
2483
2481
def command_queue (self ):
@@ -2520,7 +2518,9 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
2520
2518
pass
2521
2519
2522
2520
@abstractmethod
2523
- def send_cluster_commands (self , stack , raise_on_error = True , allow_redirections = True ):
2521
+ def send_cluster_commands (
2522
+ self , stack , raise_on_error = True , allow_redirections = True
2523
+ ):
2524
2524
"""
2525
2525
Sends commands according to current execution strategy.
2526
2526
@@ -2599,10 +2599,9 @@ def discard(self):
2599
2599
2600
2600
2601
2601
class AbstractStrategy (ExecutionStrategy ):
2602
-
2603
2602
def __init__ (
2604
- self ,
2605
- pipe : ClusterPipeline ,
2603
+ self ,
2604
+ pipe : ClusterPipeline ,
2606
2605
):
2607
2606
self ._command_queue : List [PipelineCommand ] = []
2608
2607
self ._pipe = pipe
@@ -2631,7 +2630,9 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
2631
2630
pass
2632
2631
2633
2632
@abstractmethod
2634
- def send_cluster_commands (self , stack , raise_on_error = True , allow_redirections = True ):
2633
+ def send_cluster_commands (
2634
+ self , stack , raise_on_error = True , allow_redirections = True
2635
+ ):
2635
2636
pass
2636
2637
2637
2638
@abstractmethod
@@ -2666,8 +2667,8 @@ def annotate_exception(self, exception, number, command):
2666
2667
)
2667
2668
exception .args = (msg ,) + exception .args [1 :]
2668
2669
2669
- class PipelineStrategy (AbstractStrategy ):
2670
2670
2671
+ class PipelineStrategy (AbstractStrategy ):
2671
2672
def __init__ (self , pipe : ClusterPipeline ):
2672
2673
super ().__init__ (pipe )
2673
2674
self .command_flags = pipe .command_flags
@@ -2702,10 +2703,7 @@ def reset(self):
2702
2703
self ._command_queue = []
2703
2704
2704
2705
def send_cluster_commands (
2705
- self ,
2706
- stack ,
2707
- raise_on_error = True ,
2708
- allow_redirections = True
2706
+ self , stack , raise_on_error = True , allow_redirections = True
2709
2707
):
2710
2708
"""
2711
2709
Wrapper for CLUSTERDOWN error handling.
@@ -2724,7 +2722,7 @@ def send_cluster_commands(
2724
2722
"""
2725
2723
if not stack :
2726
2724
return []
2727
- retry_attempts = self ._pipe .cluster_error_retry_attempts
2725
+ retry_attempts = self ._pipe .retry . get_retries ()
2728
2726
while True :
2729
2727
try :
2730
2728
return self ._send_cluster_commands (
@@ -2742,10 +2740,7 @@ def send_cluster_commands(
2742
2740
raise e
2743
2741
2744
2742
def _send_cluster_commands (
2745
- self ,
2746
- stack ,
2747
- raise_on_error = True ,
2748
- allow_redirections = True
2743
+ self , stack , raise_on_error = True , allow_redirections = True
2749
2744
):
2750
2745
"""
2751
2746
Send a bunch of cluster commands to the redis cluster.
@@ -2945,7 +2940,10 @@ def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
2945
2940
# Determine which nodes should be executed the command on.
2946
2941
# Returns a list of target nodes.
2947
2942
command = args [0 ].upper ()
2948
- if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self ._pipe .command_flags :
2943
+ if (
2944
+ len (args ) >= 2
2945
+ and f"{ args [0 ]} { args [1 ]} " .upper () in self ._pipe .command_flags
2946
+ ):
2949
2947
command = f"{ args [0 ]} { args [1 ]} " .upper ()
2950
2948
2951
2949
nodes_flag = kwargs .pop ("nodes_flag" , None )
@@ -2978,7 +2976,9 @@ def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
2978
2976
node = self ._nodes_manager .get_node_from_slot (
2979
2977
slot ,
2980
2978
self ._pipe .read_from_replicas and command in READ_COMMANDS ,
2981
- self ._pipe .load_balancing_strategy if command in READ_COMMANDS else None ,
2979
+ self ._pipe .load_balancing_strategy
2980
+ if command in READ_COMMANDS
2981
+ else None ,
2982
2982
)
2983
2983
return [node ]
2984
2984
@@ -3012,7 +3012,6 @@ def unlink(self, *names):
3012
3012
3013
3013
3014
3014
class TransactionStrategy (AbstractStrategy ):
3015
-
3016
3015
NO_SLOTS_COMMANDS = {"UNWATCH" }
3017
3016
IMMEDIATE_EXECUTE_COMMANDS = {"WATCH" , "UNWATCH" }
3018
3017
UNWATCH_COMMANDS = {"DISCARD" , "EXEC" , "UNWATCH" }
@@ -3066,7 +3065,7 @@ def execute_command(self, *args, **kwargs):
3066
3065
slot_number = self ._pipe .determine_slot (* args )
3067
3066
3068
3067
if (
3069
- self ._watching or args [0 ] in self .IMMEDIATE_EXECUTE_COMMANDS
3068
+ self ._watching or args [0 ] in self .IMMEDIATE_EXECUTE_COMMANDS
3070
3069
) and not self ._explicit_transaction :
3071
3070
if args [0 ] == "WATCH" :
3072
3071
self ._validate_watch ()
@@ -3098,10 +3097,7 @@ def _validate_watch(self):
3098
3097
self ._watching = True
3099
3098
3100
3099
def _immediate_execute_command (self , * args , ** options ):
3101
- retry = Retry (
3102
- default_backoff (),
3103
- self ._pipe .cluster_error_retry_attempts ,
3104
- )
3100
+ retry = copy (self ._pipe .retry )
3105
3101
retry .update_supported_errors ([AskError , MovedError ])
3106
3102
return retry .call_with_retry (
3107
3103
lambda : self ._get_connection_and_send_command (* args , ** options ),
@@ -3175,10 +3171,7 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
3175
3171
def _execute_transaction_with_retries (
3176
3172
self , stack : List ["PipelineCommand" ], raise_on_error : bool
3177
3173
):
3178
- retry = Retry (
3179
- default_backoff (),
3180
- self ._pipe .cluster_error_retry_attempts ,
3181
- )
3174
+ retry = copy (self ._pipe .retry )
3182
3175
retry .update_supported_errors ([AskError , MovedError ])
3183
3176
return retry .call_with_retry (
3184
3177
lambda : self ._execute_transaction (stack , raise_on_error ),
@@ -3284,7 +3277,9 @@ def _execute_transaction(
3284
3277
if not isinstance (r , Exception ):
3285
3278
command_name = cmd .args [0 ]
3286
3279
if command_name in self ._pipe .cluster_response_callbacks :
3287
- r = self ._pipe .cluster_response_callbacks [command_name ](r , ** cmd .options )
3280
+ r = self ._pipe .cluster_response_callbacks [command_name ](
3281
+ r , ** cmd .options
3282
+ )
3288
3283
data .append (r )
3289
3284
return data
3290
3285
@@ -3321,8 +3316,12 @@ def reset(self):
3321
3316
self ._cluster_error = False
3322
3317
self ._executing = False
3323
3318
3324
- def send_cluster_commands (self , stack , raise_on_error = True , allow_redirections = True ):
3325
- raise NotImplementedError ("send_cluster_commands cannot be executed in transactional context." )
3319
+ def send_cluster_commands (
3320
+ self , stack , raise_on_error = True , allow_redirections = True
3321
+ ):
3322
+ raise NotImplementedError (
3323
+ "send_cluster_commands cannot be executed in transactional context."
3324
+ )
3326
3325
3327
3326
def multi (self ):
3328
3327
if self ._explicit_transaction :
@@ -3356,4 +3355,4 @@ def delete(self, *names):
3356
3355
return self .execute_command ("DEL" , * names )
3357
3356
3358
3357
def unlink (self , * names ):
3359
- return self .execute_command ("UNLINK" , * names )
3358
+ return self .execute_command ("UNLINK" , * names )
0 commit comments