17
17
ClusterCrossSlotError ,
18
18
ClusterDownError ,
19
19
ClusterError ,
20
+ ConnectionError ,
20
21
DataError ,
21
22
MasterDownError ,
22
23
MovedError ,
@@ -374,6 +375,12 @@ class RedisCluster(RedisClusterCommands):
374
375
),
375
376
)
376
377
378
+ ERRORS_ALLOW_RETRY = (
379
+ ConnectionError ,
380
+ TimeoutError ,
381
+ ClusterDownError ,
382
+ )
383
+
377
384
def __init__ (
378
385
self ,
379
386
host = None ,
@@ -385,8 +392,6 @@ def __init__(
385
392
reinitialize_steps = 10 ,
386
393
read_from_replicas = False ,
387
394
url = None ,
388
- retry_on_timeout = False ,
389
- retry = None ,
390
395
** kwargs ,
391
396
):
392
397
"""
@@ -417,11 +422,6 @@ def __init__(
417
422
:cluster_error_retry_attempts: 'int'
418
423
Retry command execution attempts when encountering ClusterDownError
419
424
or ConnectionError
420
- :retry_on_timeout: 'bool'
421
- To specify a retry policy, first set `retry_on_timeout` to `True`
422
- then set `retry` to a valid `Retry` object
423
- :retry: 'Retry'
424
- a `Retry` object
425
425
:reinitialize_steps: 'int'
426
426
Specifies the number of MOVED errors that need to occur before
427
427
reinitializing the whole cluster topology. If a MOVED error occurs
@@ -452,9 +452,6 @@ def __init__(
452
452
"Argument 'db' is not possible to use in cluster mode"
453
453
)
454
454
455
- if retry_on_timeout :
456
- kwargs .update ({"retry_on_timeout" : retry_on_timeout , "retry" : retry })
457
-
458
455
# Get the startup node/s
459
456
from_url = False
460
457
if url is not None :
@@ -850,7 +847,7 @@ def _parse_target_nodes(self, target_nodes):
850
847
851
848
def execute_command (self , * args , ** kwargs ):
852
849
"""
853
- Wrapper for ClusterDownError and ConnectionError error handling.
850
+ Wrapper for ERRORS_ALLOW_RETRY error handling.
854
851
855
852
It will try the number of times specified by the config option
856
853
"self.cluster_error_retry_attempts" which defaults to 3 unless manually
@@ -865,18 +862,19 @@ def execute_command(self, *args, **kwargs):
865
862
dict<Any, ClusterNode>
866
863
"""
867
864
target_nodes_specified = False
868
- target_nodes = kwargs .pop ("target_nodes" , None )
869
- if target_nodes is not None and not self ._is_nodes_flag (target_nodes ):
870
- target_nodes = self ._parse_target_nodes (target_nodes )
865
+ target_nodes = None
866
+ passed_targets = kwargs .pop ("target_nodes" , None )
867
+ if passed_targets is not None and not self ._is_nodes_flag (passed_targets ):
868
+ target_nodes = self ._parse_target_nodes (passed_targets )
871
869
target_nodes_specified = True
872
- # If ClusterDownError/ConnectionError were thrown, the nodes
873
- # and slots cache were reinitialized. We will retry executing the
874
- # command with the updated cluster setup only when the target nodes
875
- # can be determined again with the new cache tables. Therefore,
876
- # when target nodes were passed to this function, we cannot retry
877
- # the command execution since the nodes may not be valid anymore
878
- # after the tables were reinitialized. So in case of passed target
879
- # nodes, retry_attempts will be set to 1.
870
+ # If an error that allows retrying was thrown, the nodes and slots
871
+ # cache were reinitialized. We will retry executing the command with
872
+ # the updated cluster setup only when the target nodes can be
873
+ # determined again with the new cache tables. Therefore, when target
874
+ # nodes were passed to this function, we cannot retry the command
875
+ # execution since the nodes may not be valid anymore after the tables
876
+ # were reinitialized. So in case of passed target nodes,
877
+ # retry_attempts will be set to 1.
880
878
retry_attempts = (
881
879
1 if target_nodes_specified else self .cluster_error_retry_attempts
882
880
)
@@ -887,7 +885,7 @@ def execute_command(self, *args, **kwargs):
887
885
if not target_nodes_specified :
888
886
# Determine the nodes to execute the command on
889
887
target_nodes = self ._determine_nodes (
890
- * args , ** kwargs , nodes_flag = target_nodes
888
+ * args , ** kwargs , nodes_flag = passed_targets
891
889
)
892
890
if not target_nodes :
893
891
raise RedisClusterException (
@@ -897,11 +895,14 @@ def execute_command(self, *args, **kwargs):
897
895
res [node .name ] = self ._execute_command (node , * args , ** kwargs )
898
896
# Return the processed result
899
897
return self ._process_result (args [0 ], res , ** kwargs )
900
- except (ClusterDownError , ConnectionError ) as e :
901
- # The nodes and slots cache were reinitialized.
902
- # Try again with the new cluster setup. All other errors
903
- # should be raised.
904
- exception = e
898
+ except BaseException as e :
899
+ if type (e ) in RedisCluster .ERRORS_ALLOW_RETRY :
900
+ # The nodes and slots cache were reinitialized.
901
+ # Try again with the new cluster setup.
902
+ exception = e
903
+ else :
904
+ # All other errors should be raised.
905
+ raise e
905
906
906
907
# If it fails the configured number of times then raise exception back
907
908
# to caller of this method
@@ -953,11 +954,11 @@ def _execute_command(self, target_node, *args, **kwargs):
953
954
)
954
955
return response
955
956
956
- except (RedisClusterException , BusyLoadingError ):
957
- log .exception ("RedisClusterException || BusyLoadingError" )
957
+ except (RedisClusterException , BusyLoadingError ) as e :
958
+ log .exception (type ( e ) )
958
959
raise
959
- except ConnectionError :
960
- log .exception ("ConnectionError" )
960
+ except ( ConnectionError , TimeoutError ) as e :
961
+ log .exception (type ( e ) )
961
962
# ConnectionError can also be raised if we couldn't get a
962
963
# connection from the pool before timing out, so check that
963
964
# this is an actual connection before attempting to disconnect.
@@ -976,13 +977,6 @@ def _execute_command(self, target_node, *args, **kwargs):
976
977
# and try again with the new setup
977
978
self .nodes_manager .initialize ()
978
979
raise
979
- except TimeoutError :
980
- log .exception ("TimeoutError" )
981
- if connection is not None :
982
- connection .disconnect ()
983
-
984
- if ttl < self .RedisClusterRequestTTL / 2 :
985
- time .sleep (0.05 )
986
980
except MovedError as e :
987
981
# First, we will try to patch the slots/nodes cache with the
988
982
# redirected node output and try again. If MovedError exceeds
@@ -1016,7 +1010,7 @@ def _execute_command(self, target_node, *args, **kwargs):
1016
1010
# ClusterDownError can occur during a failover and to get
1017
1011
# self-healed, we will try to reinitialize the cluster layout
1018
1012
# and retry executing the command
1019
- time .sleep (0.05 )
1013
+ time .sleep (0.25 )
1020
1014
self .nodes_manager .initialize ()
1021
1015
raise e
1022
1016
except ResponseError as e :
@@ -1342,7 +1336,7 @@ def initialize(self):
1342
1336
raise RedisClusterException (
1343
1337
"Cluster mode is not enabled on this node"
1344
1338
)
1345
- cluster_slots = r .execute_command ("CLUSTER SLOTS" )
1339
+ cluster_slots = str_if_bytes ( r .execute_command ("CLUSTER SLOTS" ) )
1346
1340
startup_nodes_reachable = True
1347
1341
except (ConnectionError , TimeoutError ) as e :
1348
1342
msg = e .__str__
@@ -1631,29 +1625,28 @@ def get_redis_connection(self):
1631
1625
return self .node .redis_connection
1632
1626
1633
1627
1634
- ERRORS_ALLOW_RETRY = (
1635
- ConnectionError ,
1636
- TimeoutError ,
1637
- MovedError ,
1638
- AskError ,
1639
- TryAgainError ,
1640
- )
1641
-
1642
-
1643
1628
class ClusterPipeline (RedisCluster ):
1644
1629
"""
1645
1630
Support for Redis pipeline
1646
1631
in cluster mode
1647
1632
"""
1648
1633
1634
+ ERRORS_ALLOW_RETRY = (
1635
+ ConnectionError ,
1636
+ TimeoutError ,
1637
+ MovedError ,
1638
+ AskError ,
1639
+ TryAgainError ,
1640
+ )
1641
+
1649
1642
def __init__ (
1650
1643
self ,
1651
1644
nodes_manager ,
1652
1645
result_callbacks = None ,
1653
1646
cluster_response_callbacks = None ,
1654
1647
startup_nodes = None ,
1655
1648
read_from_replicas = False ,
1656
- cluster_error_retry_attempts = 3 ,
1649
+ cluster_error_retry_attempts = 5 ,
1657
1650
reinitialize_steps = 10 ,
1658
1651
** kwargs ,
1659
1652
):
@@ -1915,7 +1908,11 @@ def _send_cluster_commands(
1915
1908
# collect all the commands we are allowed to retry.
1916
1909
# (MOVED, ASK, or connection errors or timeout errors)
1917
1910
attempt = sorted (
1918
- (c for c in attempt if isinstance (c .result , ERRORS_ALLOW_RETRY )),
1911
+ (
1912
+ c
1913
+ for c in attempt
1914
+ if isinstance (c .result , ClusterPipeline .ERRORS_ALLOW_RETRY )
1915
+ ),
1919
1916
key = lambda x : x .position ,
1920
1917
)
1921
1918
if attempt and allow_redirections :
0 commit comments