@@ -1237,8 +1237,6 @@ def _execute_command(self, target_node, *args, **kwargs):
1237
1237
except AuthenticationError :
1238
1238
raise
1239
1239
except (ConnectionError , TimeoutError ) as e :
1240
- # Connection retries are being handled in the node's
1241
- # Retry object.
1242
1240
# ConnectionError can also be raised if we couldn't get a
1243
1241
# connection from the pool before timing out, so check that
1244
1242
# this is an actual connection before attempting to disconnect.
@@ -1854,7 +1852,7 @@ def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
1854
1852
1855
1853
def find_connection_owner (self , connection : Connection ) -> Optional [Redis ]:
1856
1854
node_name = get_node_name (connection .host , connection .port )
1857
- for node in self .nodes_cache .values ():
1855
+ for node in tuple ( self .nodes_cache .values () ):
1858
1856
if node .redis_connection :
1859
1857
conn_args = node .redis_connection .connection_pool .connection_kwargs
1860
1858
if node_name == get_node_name (
@@ -2550,7 +2548,7 @@ def multi(self):
2550
2548
"""
2551
2549
Starts transactional context.
2552
2550
2553
- See: ClusterPipeline.reset ()
2551
+ See: ClusterPipeline.multi ()
2554
2552
"""
2555
2553
pass
2556
2554
@@ -2706,9 +2704,9 @@ def send_cluster_commands(
2706
2704
self , stack , raise_on_error = True , allow_redirections = True
2707
2705
):
2708
2706
"""
2709
- Wrapper for CLUSTERDOWN error handling.
2707
+ Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
2710
2708
2711
- If the cluster reports it is down it is assumed that:
2709
+ If one of the retryable exceptions has been thrown we assume that:
2712
2710
- connection_pool was disconnected
2713
2711
- connection_pool was reseted
2714
2712
- refereh_table_asap set to True
@@ -2816,11 +2814,10 @@ def _send_cluster_commands(
2816
2814
# we write to all the open sockets for each node first,
2817
2815
# before reading anything
2818
2816
# this allows us to flush all the requests out across the
2819
- # network essentially in parallel
2820
- # so that we can read them all in parallel as they come back.
2817
+ # network
2818
+ # so that we can read them from different sockets as they come back.
2821
2819
# we dont' multiplex on the sockets as they come available,
2822
2820
# but that shouldn't make too much difference.
2823
- node_commands = nodes .values ()
2824
2821
try :
2825
2822
node_commands = nodes .values ()
2826
2823
for n in node_commands :
@@ -3024,7 +3021,7 @@ class TransactionStrategy(AbstractStrategy):
3024
3021
IMMEDIATE_EXECUTE_COMMANDS = {"WATCH" , "UNWATCH" }
3025
3022
UNWATCH_COMMANDS = {"DISCARD" , "EXEC" , "UNWATCH" }
3026
3023
SLOT_REDIRECT_ERRORS = (AskError , MovedError )
3027
- CONNECTION_ERRORS = (ConnectionError ,OSError ,ClusterDownError )
3024
+ CONNECTION_ERRORS = (ConnectionError , OSError , ClusterDownError )
3028
3025
3029
3026
def __init__ (self , pipe : ClusterPipeline ):
3030
3027
super ().__init__ (pipe )
@@ -3033,6 +3030,10 @@ def __init__(self, pipe: ClusterPipeline):
3033
3030
self ._pipeline_slots : Set [int ] = set ()
3034
3031
self ._transaction_connection : Optional [Connection ] = None
3035
3032
self ._executing = False
3033
+ self ._retry = copy (self ._pipe .retry )
3034
+ self ._retry .update_supported_errors (
3035
+ RedisCluster .ERRORS_ALLOW_RETRY + self .SLOT_REDIRECT_ERRORS
3036
+ )
3036
3037
3037
3038
def _get_client_and_connection_for_transaction (self ) -> Tuple [Redis , Connection ]:
3038
3039
"""
@@ -3105,9 +3106,7 @@ def _validate_watch(self):
3105
3106
self ._watching = True
3106
3107
3107
3108
def _immediate_execute_command (self , * args , ** options ):
3108
- retry = copy (self ._pipe .retry )
3109
- retry .update_supported_errors ([AskError , MovedError ])
3110
- return retry .call_with_retry (
3109
+ return self ._retry .call_with_retry (
3111
3110
lambda : self ._get_connection_and_send_command (* args , ** options ),
3112
3111
self ._reinitialize_on_error ,
3113
3112
)
@@ -3137,7 +3136,10 @@ def _reinitialize_on_error(self, error):
3137
3136
if type (error ) in self .SLOT_REDIRECT_ERRORS and self ._executing :
3138
3137
raise WatchError ("Slot rebalancing occurred while watching keys" )
3139
3138
3140
- if type (error ) in self .SLOT_REDIRECT_ERRORS or type (error ) in self .CONNECTION_ERRORS :
3139
+ if (
3140
+ type (error ) in self .SLOT_REDIRECT_ERRORS
3141
+ or type (error ) in self .CONNECTION_ERRORS
3142
+ ):
3141
3143
if self ._transaction_connection :
3142
3144
self ._transaction_connection = None
3143
3145
@@ -3169,9 +3171,7 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
3169
3171
def _execute_transaction_with_retries (
3170
3172
self , stack : List ["PipelineCommand" ], raise_on_error : bool
3171
3173
):
3172
- retry = copy (self ._pipe .retry )
3173
- retry .update_supported_errors ([AskError , MovedError ])
3174
- return retry .call_with_retry (
3174
+ return self ._retry .call_with_retry (
3175
3175
lambda : self ._execute_transaction (stack , raise_on_error ),
3176
3176
self ._reinitialize_on_error ,
3177
3177
)
0 commit comments