Skip to content

Commit 3a872bd

Browse files
committed
Adding laod balancing strategy configuration to cluster clients(replacement for 'read_from_replicas' config)
1 parent 9251650 commit 3a872bd

File tree

5 files changed

+341
-42
lines changed

5 files changed

+341
-42
lines changed

redis/asyncio/cluster.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
SLOT_ID,
3939
AbstractRedisCluster,
4040
LoadBalancer,
41+
LoadBalancingStrategy,
4142
block_pipeline_command,
4243
get_node_name,
4344
parse_cluster_slots,
@@ -65,6 +66,7 @@
6566
from redis.typing import AnyKeyT, EncodableT, KeyT
6667
from redis.utils import (
6768
SSL_AVAILABLE,
69+
deprecated_args,
6870
deprecated_function,
6971
get_lib_version,
7072
safe_str,
@@ -121,9 +123,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
121123
| See:
122124
https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters
123125
:param read_from_replicas:
126+
| @deprecated - please use load_balancing_strategy instead
124127
| Enable read from replicas in READONLY mode. You can read possibly stale data.
125128
When set to true, read commands will be assigned between the primary and
126129
its replications in a Round-Robin manner.
130+
:param load_balancing_strategy:
131+
| Enable read from replicas in READONLY mode and defines the load balancing
132+
strategy that will be used for cluster node selection.
133+
You can read possibly stale data (when reading from replicas).
127134
:param reinitialize_steps:
128135
| Specifies the number of MOVED errors that need to occur before reinitializing
129136
the whole cluster topology. If a MOVED error occurs and the cluster does not
@@ -216,6 +223,11 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
216223
"result_callbacks",
217224
)
218225

226+
@deprecated_args(
227+
args_to_warn=["read_from_replicas"],
228+
reason="Please configure the 'load_balancing_strategy' instead",
229+
version="5.0.3",
230+
)
219231
def __init__(
220232
self,
221233
host: Optional[str] = None,
@@ -224,6 +236,7 @@ def __init__(
224236
startup_nodes: Optional[List["ClusterNode"]] = None,
225237
require_full_coverage: bool = True,
226238
read_from_replicas: bool = False,
239+
load_balancing_strategy: Union[LoadBalancingStrategy, None] = None,
227240
reinitialize_steps: int = 5,
228241
cluster_error_retry_attempts: int = 3,
229242
connection_error_retry_attempts: int = 3,
@@ -322,7 +335,7 @@ def __init__(
322335
}
323336
)
324337

325-
if read_from_replicas:
338+
if read_from_replicas or load_balancing_strategy:
326339
# Call our on_connect function to configure READONLY mode
327340
kwargs["redis_connect_func"] = self.on_connect
328341

@@ -371,6 +384,7 @@ def __init__(
371384
)
372385
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
373386
self.read_from_replicas = read_from_replicas
387+
self.load_balancing_strategy = load_balancing_strategy
374388
self.reinitialize_steps = reinitialize_steps
375389
self.cluster_error_retry_attempts = cluster_error_retry_attempts
376390
self.connection_error_retry_attempts = connection_error_retry_attempts
@@ -589,6 +603,7 @@ async def _determine_nodes(
589603
self.nodes_manager.get_node_from_slot(
590604
await self._determine_slot(command, *args),
591605
self.read_from_replicas and command in READ_COMMANDS,
606+
self.load_balancing_strategy if command in READ_COMMANDS else None,
592607
)
593608
]
594609

@@ -769,7 +784,11 @@ async def _execute_command(
769784
# refresh the target node
770785
slot = await self._determine_slot(*args)
771786
target_node = self.nodes_manager.get_node_from_slot(
772-
slot, self.read_from_replicas and args[0] in READ_COMMANDS
787+
slot,
788+
self.read_from_replicas and args[0] in READ_COMMANDS,
789+
self.load_balancing_strategy
790+
if args[0] in READ_COMMANDS
791+
else None,
773792
)
774793
moved = False
775794

@@ -1231,17 +1250,23 @@ def _update_moved_slots(self) -> None:
12311250
self._moved_exception = None
12321251

12331252
def get_node_from_slot(
1234-
self, slot: int, read_from_replicas: bool = False
1253+
self,
1254+
slot: int,
1255+
read_from_replicas: bool = False,
1256+
load_balancing_strategy=None,
12351257
) -> "ClusterNode":
12361258
if self._moved_exception:
12371259
self._update_moved_slots()
12381260

1261+
if read_from_replicas is True and load_balancing_strategy is None:
1262+
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1263+
12391264
try:
1240-
if read_from_replicas:
1241-
# get the server index in a Round-Robin manner
1265+
if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1266+
# get the server index using the strategy defined in load_balancing_strategy
12421267
primary_name = self.slots_cache[slot][0].name
12431268
node_idx = self.read_load_balancer.get_server_index(
1244-
primary_name, len(self.slots_cache[slot])
1269+
primary_name, len(self.slots_cache[slot]), load_balancing_strategy
12451270
)
12461271
return self.slots_cache[slot][node_idx]
12471272
return self.slots_cache[slot][0]

redis/cluster.py

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from enum import Enum
12
import random
23
import socket
34
import sys
@@ -482,6 +483,11 @@ class initializer. In the case of conflicting arguments, querystring
482483
"""
483484
return cls(url=url, **kwargs)
484485

486+
@deprecated_args(
487+
args_to_warn=["read_from_replicas"],
488+
reason="Please configure the 'load_balancing_strategy' instead",
489+
version="5.0.3",
490+
)
485491
def __init__(
486492
self,
487493
host: Optional[str] = None,
@@ -492,6 +498,7 @@ def __init__(
492498
require_full_coverage: bool = False,
493499
reinitialize_steps: int = 5,
494500
read_from_replicas: bool = False,
501+
load_balancing_strategy: Union["LoadBalancingStrategy", None] = None,
495502
dynamic_startup_nodes: bool = True,
496503
url: Optional[str] = None,
497504
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
@@ -520,11 +527,16 @@ def __init__(
520527
cluster client. If not all slots are covered, RedisClusterException
521528
will be thrown.
522529
:param read_from_replicas:
530+
@deprecated - please use load_balancing_strategy instead
523531
Enable read from replicas in READONLY mode. You can read possibly
524532
stale data.
525533
When set to true, read commands will be assigned between the
526534
primary and its replications in a Round-Robin manner.
527-
:param dynamic_startup_nodes:
535+
:param load_balancing_strategy:
536+
Enable read from replicas in READONLY mode and defines the load balancing
537+
strategy that will be used for cluster node selection.
538+
You can read possibly stale data (when reading from replicas).
539+
:param dynamic_startup_nodes:
528540
Set the RedisCluster's startup nodes to all of the discovered nodes.
529541
If true (default value), the cluster's discovered nodes will be used to
530542
determine the cluster nodes-slots mapping in the next topology refresh.
@@ -629,6 +641,7 @@ def __init__(
629641
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
630642
self.node_flags = self.__class__.NODE_FLAGS.copy()
631643
self.read_from_replicas = read_from_replicas
644+
self.load_balancing_strategy = load_balancing_strategy
632645
self.reinitialize_counter = 0
633646
self.reinitialize_steps = reinitialize_steps
634647
if event_dispatcher is None:
@@ -683,7 +696,7 @@ def on_connect(self, connection):
683696
"""
684697
connection.on_connect()
685698

686-
if self.read_from_replicas:
699+
if self.read_from_replicas or self.load_balancing_strategy:
687700
# Sending READONLY command to server to configure connection as
688701
# readonly. Since each cluster node may change its server type due
689702
# to a failover, we should establish a READONLY connection
@@ -810,6 +823,7 @@ def pipeline(self, transaction=None, shard_hint=None):
810823
cluster_response_callbacks=self.cluster_response_callbacks,
811824
cluster_error_retry_attempts=self.cluster_error_retry_attempts,
812825
read_from_replicas=self.read_from_replicas,
826+
load_balancing_strategy=self.load_balancing_strategy,
813827
reinitialize_steps=self.reinitialize_steps,
814828
lock=self._lock,
815829
)
@@ -934,7 +948,9 @@ def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
934948
# get the node that holds the key's slot
935949
slot = self.determine_slot(*args)
936950
node = self.nodes_manager.get_node_from_slot(
937-
slot, self.read_from_replicas and command in READ_COMMANDS
951+
slot,
952+
self.read_from_replicas and command in READ_COMMANDS,
953+
self.load_balancing_strategy if command in READ_COMMANDS else None,
938954
)
939955
return [node]
940956

@@ -1158,7 +1174,11 @@ def _execute_command(self, target_node, *args, **kwargs):
11581174
# refresh the target node
11591175
slot = self.determine_slot(*args)
11601176
target_node = self.nodes_manager.get_node_from_slot(
1161-
slot, self.read_from_replicas and command in READ_COMMANDS
1177+
slot,
1178+
self.read_from_replicas and command in READ_COMMANDS,
1179+
self.load_balancing_strategy
1180+
if command in READ_COMMANDS
1181+
else None,
11621182
)
11631183
moved = False
11641184

@@ -1307,6 +1327,12 @@ def __del__(self):
13071327
self.redis_connection.close()
13081328

13091329

1330+
class LoadBalancingStrategy(Enum):
1331+
ROUND_ROBIN = "round_robin"
1332+
ROUND_ROBIN_REPLICAS = "round_robin_replicas"
1333+
RANDOM_REPLICA = "random_replica"
1334+
1335+
13101336
class LoadBalancer:
13111337
"""
13121338
Round-Robin Load Balancing
@@ -1316,15 +1342,38 @@ def __init__(self, start_index: int = 0) -> None:
13161342
self.primary_to_idx = {}
13171343
self.start_index = start_index
13181344

1319-
def get_server_index(self, primary: str, list_size: int) -> int:
1320-
server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1321-
# Update the index
1322-
self.primary_to_idx[primary] = (server_index + 1) % list_size
1323-
return server_index
1345+
def get_server_index(
1346+
self,
1347+
primary: str,
1348+
list_size: int,
1349+
load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
1350+
) -> int:
1351+
if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
1352+
return self._get_random_replica_index(list_size)
1353+
else:
1354+
return self._get_round_robin_index(
1355+
primary,
1356+
list_size,
1357+
load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
1358+
)
13241359

13251360
def reset(self) -> None:
13261361
self.primary_to_idx.clear()
13271362

1363+
def _get_random_replica_index(self, list_size: int) -> int:
1364+
return random.randint(1, list_size - 1)
1365+
1366+
def _get_round_robin_index(
1367+
self, primary: str, list_size: int, replicas_only: bool
1368+
) -> int:
1369+
server_index = self.primary_to_idx.setdefault(primary, self.start_index)
1370+
if replicas_only and server_index == 0:
1371+
# skip the primary node index
1372+
server_index = 1
1373+
# Update the index for the next round
1374+
self.primary_to_idx[primary] = (server_index + 1) % list_size
1375+
return server_index
1376+
13281377

13291378
class NodesManager:
13301379
def __init__(
@@ -1428,7 +1477,21 @@ def _update_moved_slots(self):
14281477
# Reset moved_exception
14291478
self._moved_exception = None
14301479

1431-
def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
1480+
@deprecated_args(
1481+
args_to_warn=["server_type"],
1482+
reason=(
1483+
"In case you need select some load balancing strategy "
1484+
"that will use replicas, please set it through 'load_balancing_strategy'"
1485+
),
1486+
version="5.0.3",
1487+
)
1488+
def get_node_from_slot(
1489+
self,
1490+
slot,
1491+
read_from_replicas=False,
1492+
load_balancing_strategy=None,
1493+
server_type=None,
1494+
):
14321495
"""
14331496
Gets a node that servers this hash slot
14341497
"""
@@ -1443,11 +1506,14 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
14431506
f'"require_full_coverage={self._require_full_coverage}"'
14441507
)
14451508

1446-
if read_from_replicas is True:
1447-
# get the server index in a Round-Robin manner
1509+
if read_from_replicas is True and load_balancing_strategy is None:
1510+
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
1511+
1512+
if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
1513+
# get the server index using the strategy defined in load_balancing_strategy
14481514
primary_name = self.slots_cache[slot][0].name
14491515
node_idx = self.read_load_balancer.get_server_index(
1450-
primary_name, len(self.slots_cache[slot])
1516+
primary_name, len(self.slots_cache[slot]), load_balancing_strategy
14511517
)
14521518
elif (
14531519
server_type is None
@@ -1730,7 +1796,7 @@ def __init__(
17301796
first command execution. The node will be determined by:
17311797
1. Hashing the channel name in the request to find its keyslot
17321798
2. Selecting a node that handles the keyslot: If read_from_replicas is
1733-
set to true, a replica can be selected.
1799+
set to true or load_balancing_strategy is set, a replica can be selected.
17341800
17351801
:type redis_cluster: RedisCluster
17361802
:type node: ClusterNode
@@ -1826,7 +1892,9 @@ def execute_command(self, *args):
18261892
channel = args[1]
18271893
slot = self.cluster.keyslot(channel)
18281894
node = self.cluster.nodes_manager.get_node_from_slot(
1829-
slot, self.cluster.read_from_replicas
1895+
slot,
1896+
self.cluster.read_from_replicas,
1897+
self.cluster.load_balancing_strategy,
18301898
)
18311899
else:
18321900
# Get a random node
@@ -1969,6 +2037,7 @@ def __init__(
19692037
cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
19702038
startup_nodes: Optional[List["ClusterNode"]] = None,
19712039
read_from_replicas: bool = False,
2040+
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
19722041
cluster_error_retry_attempts: int = 3,
19732042
reinitialize_steps: int = 5,
19742043
lock=None,
@@ -1984,6 +2053,7 @@ def __init__(
19842053
)
19852054
self.startup_nodes = startup_nodes if startup_nodes else []
19862055
self.read_from_replicas = read_from_replicas
2056+
self.load_balancing_strategy = load_balancing_strategy
19872057
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
19882058
self.cluster_response_callbacks = cluster_response_callbacks
19892059
self.cluster_error_retry_attempts = cluster_error_retry_attempts

0 commit comments

Comments
 (0)