22import collections
33import random
44import socket
5- import ssl
65import warnings
76from typing import (
87 Any ,
3938 SLOT_ID ,
4039 AbstractRedisCluster ,
4140 LoadBalancer ,
41+ LoadBalancingStrategy ,
4242 block_pipeline_command ,
4343 get_node_name ,
4444 parse_cluster_slots ,
6464 TryAgainError ,
6565)
6666from redis .typing import AnyKeyT , EncodableT , KeyT
67- from redis .utils import deprecated_function , get_lib_version , safe_str , str_if_bytes
67+ from redis .utils import (
68+ SSL_AVAILABLE ,
69+ deprecated_args ,
70+ deprecated_function ,
71+ get_lib_version ,
72+ safe_str ,
73+ str_if_bytes ,
74+ )
75+
76+ if SSL_AVAILABLE :
77+ from ssl import TLSVersion
78+ else :
79+ TLSVersion = None
6880
6981TargetNodesT = TypeVar (
7082 "TargetNodesT" , str , "ClusterNode" , List ["ClusterNode" ], Dict [Any , "ClusterNode" ]
@@ -111,9 +123,15 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
111123 | See:
112124 https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters
113125 :param read_from_replicas:
114- | Enable read from replicas in READONLY mode. You can read possibly stale data.
126+ | @deprecated - please use load_balancing_strategy instead
127+ | Enable read from replicas in READONLY mode.
115128 When set to true, read commands will be assigned between the primary and
116129 its replications in a Round-Robin manner.
130+ The data read from replicas is eventually consistent with the data in primary nodes.
131+ :param load_balancing_strategy:
132+ | Enable read from replicas in READONLY mode and defines the load balancing
133+ strategy that will be used for cluster node selection.
134+ The data read from replicas is eventually consistent with the data in primary nodes.
117135 :param reinitialize_steps:
118136 | Specifies the number of MOVED errors that need to occur before reinitializing
119137 the whole cluster topology. If a MOVED error occurs and the cluster does not
@@ -206,6 +224,11 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
206224 "result_callbacks" ,
207225 )
208226
227+ @deprecated_args (
228+ args_to_warn = ["read_from_replicas" ],
229+ reason = "Please configure the 'load_balancing_strategy' instead" ,
230+ version = "5.0.3" ,
231+ )
209232 def __init__ (
210233 self ,
211234 host : Optional [str ] = None ,
@@ -214,6 +237,7 @@ def __init__(
214237 startup_nodes : Optional [List ["ClusterNode" ]] = None ,
215238 require_full_coverage : bool = True ,
216239 read_from_replicas : bool = False ,
240+ load_balancing_strategy : Optional [LoadBalancingStrategy ] = None ,
217241 reinitialize_steps : int = 5 ,
218242 cluster_error_retry_attempts : int = 3 ,
219243 connection_error_retry_attempts : int = 3 ,
@@ -247,7 +271,7 @@ def __init__(
247271 ssl_certfile : Optional [str ] = None ,
248272 ssl_check_hostname : bool = False ,
249273 ssl_keyfile : Optional [str ] = None ,
250- ssl_min_version : Optional [ssl . TLSVersion ] = None ,
274+ ssl_min_version : Optional [TLSVersion ] = None ,
251275 ssl_ciphers : Optional [str ] = None ,
252276 protocol : Optional [int ] = 2 ,
253277 address_remap : Optional [Callable [[Tuple [str , int ]], Tuple [str , int ]]] = None ,
@@ -312,7 +336,7 @@ def __init__(
312336 }
313337 )
314338
315- if read_from_replicas :
339+ if read_from_replicas or load_balancing_strategy :
316340 # Call our on_connect function to configure READONLY mode
317341 kwargs ["redis_connect_func" ] = self .on_connect
318342
@@ -361,6 +385,7 @@ def __init__(
361385 )
362386 self .encoder = Encoder (encoding , encoding_errors , decode_responses )
363387 self .read_from_replicas = read_from_replicas
388+ self .load_balancing_strategy = load_balancing_strategy
364389 self .reinitialize_steps = reinitialize_steps
365390 self .cluster_error_retry_attempts = cluster_error_retry_attempts
366391 self .connection_error_retry_attempts = connection_error_retry_attempts
@@ -579,6 +604,7 @@ async def _determine_nodes(
579604 self .nodes_manager .get_node_from_slot (
580605 await self ._determine_slot (command , * args ),
581606 self .read_from_replicas and command in READ_COMMANDS ,
607+ self .load_balancing_strategy if command in READ_COMMANDS else None ,
582608 )
583609 ]
584610
@@ -759,7 +785,11 @@ async def _execute_command(
759785 # refresh the target node
760786 slot = await self ._determine_slot (* args )
761787 target_node = self .nodes_manager .get_node_from_slot (
762- slot , self .read_from_replicas and args [0 ] in READ_COMMANDS
788+ slot ,
789+ self .read_from_replicas and args [0 ] in READ_COMMANDS ,
790+ self .load_balancing_strategy
791+ if args [0 ] in READ_COMMANDS
792+ else None ,
763793 )
764794 moved = False
765795
@@ -1221,17 +1251,23 @@ def _update_moved_slots(self) -> None:
12211251 self ._moved_exception = None
12221252
12231253 def get_node_from_slot (
1224- self , slot : int , read_from_replicas : bool = False
1254+ self ,
1255+ slot : int ,
1256+ read_from_replicas : bool = False ,
1257+ load_balancing_strategy = None ,
12251258 ) -> "ClusterNode" :
12261259 if self ._moved_exception :
12271260 self ._update_moved_slots ()
12281261
1262+ if read_from_replicas is True and load_balancing_strategy is None :
1263+ load_balancing_strategy = LoadBalancingStrategy .ROUND_ROBIN
1264+
12291265 try :
1230- if read_from_replicas :
1231- # get the server index in a Round-Robin manner
1266+ if len ( self . slots_cache [ slot ]) > 1 and load_balancing_strategy :
1267+ # get the server index using the strategy defined in load_balancing_strategy
12321268 primary_name = self .slots_cache [slot ][0 ].name
12331269 node_idx = self .read_load_balancer .get_server_index (
1234- primary_name , len (self .slots_cache [slot ])
1270+ primary_name , len (self .slots_cache [slot ]), load_balancing_strategy
12351271 )
12361272 return self .slots_cache [slot ][node_idx ]
12371273 return self .slots_cache [slot ][0 ]
0 commit comments