@@ -13,6 +13,13 @@ use scylla_cql::{errors::QueryError, frame::types::SerialConsistency, Consistenc
1313use std:: { fmt, sync:: Arc , time:: Duration } ;
1414use tracing:: warn;
1515
16+ #[ derive( Clone , Copy ) ]
17+ enum ReplicaLocation {
18+ Any ,
19+ Datacenter ,
20+ DatacenterAndRack ,
21+ }
22+
1623// TODO: LWT optimisation
1724/// The default load balancing policy.
1825///
@@ -21,6 +28,7 @@ use tracing::warn;
2128/// Latency awareness is available, although not recommended.
2229pub struct DefaultPolicy {
2330 preferred_datacenter : Option < String > ,
31+ preferred_rack : Option < String > ,
2432 is_token_aware : bool ,
2533 permit_dc_failover : bool ,
2634 pick_predicate : Box < dyn Fn ( & NodeRef ) -> bool + Send + Sync > ,
@@ -32,6 +40,7 @@ impl fmt::Debug for DefaultPolicy {
3240 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
3341 f. debug_struct ( "DefaultPolicy" )
3442 . field ( "preferred_datacenter" , & self . preferred_datacenter )
43+ . field ( "preferred_rack" , & self . preferred_rack )
3544 . field ( "is_token_aware" , & self . is_token_aware )
3645 . field ( "permit_dc_failover" , & self . permit_dc_failover )
3746 . field ( "latency_awareness" , & self . latency_awareness )
@@ -60,17 +69,38 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
6069 }
6170 }
6271 if let Some ( ts) = & routing_info. token_with_strategy {
72+ // Try to pick some alive local rack random replica.
73+ // If preferred rack is not specified, try to pick local DC replica.
74+ if self . preferred_rack . is_some ( ) {
75+ let local_rack_picked = self . pick_replica (
76+ ts,
77+ ReplicaLocation :: DatacenterAndRack ,
78+ & self . pick_predicate ,
79+ cluster,
80+ ) ;
81+
82+ if let Some ( alive_local_rack_replica) = local_rack_picked {
83+ return Some ( alive_local_rack_replica) ;
84+ }
85+ }
86+
6387 // Try to pick some alive local random replica.
6488 // If preferred datacenter is not specified, all replicas are treated as local.
65- let picked = self . pick_replica ( ts, true , & self . pick_predicate , cluster) ;
89+ let picked = self . pick_replica (
90+ ts,
91+ ReplicaLocation :: Datacenter ,
92+ & self . pick_predicate ,
93+ cluster,
94+ ) ;
6695
6796 if let Some ( alive_local_replica) = picked {
6897 return Some ( alive_local_replica) ;
6998 }
7099
71100 // If datacenter failover is possible, loosen restriction about locality.
72101 if self . is_datacenter_failover_possible ( & routing_info) {
73- let picked = self . pick_replica ( ts, false , & self . pick_predicate , cluster) ;
102+ let picked =
103+ self . pick_replica ( ts, ReplicaLocation :: Any , & self . pick_predicate , cluster) ;
74104 if let Some ( alive_remote_replica) = picked {
75105 return Some ( alive_remote_replica) ;
76106 }
@@ -123,19 +153,31 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
123153
124154 // If token is available, get a shuffled list of alive replicas.
125155 let maybe_replicas = if let Some ( ts) = & routing_info. token_with_strategy {
126- let local_replicas = self . shuffled_replicas ( ts, true , Self :: is_alive, cluster) ;
156+ let local_rack_replicas = self . shuffled_replicas (
157+ ts,
158+ ReplicaLocation :: DatacenterAndRack ,
159+ Self :: is_alive,
160+ cluster,
161+ ) ;
162+ let local_replicas =
163+ self . shuffled_replicas ( ts, ReplicaLocation :: Datacenter , Self :: is_alive, cluster) ;
127164
128165 // If a datacenter failover is possible, loosen restriction about locality.
129166 let maybe_remote_replicas = if self . is_datacenter_failover_possible ( & routing_info) {
130- let remote_replicas = self . shuffled_replicas ( ts, false , Self :: is_alive, cluster) ;
167+ let remote_replicas =
168+ self . shuffled_replicas ( ts, ReplicaLocation :: Any , Self :: is_alive, cluster) ;
131169 Either :: Left ( remote_replicas)
132170 } else {
133171 Either :: Right ( std:: iter:: empty ( ) )
134172 } ;
135173
136174 // Produce an iterator, prioritizes local replicas.
137175 // If preferred datacenter is not specified, every replica is treated as a local one.
138- Either :: Left ( local_replicas. chain ( maybe_remote_replicas) )
176+ Either :: Left (
177+ local_rack_replicas
178+ . chain ( local_replicas)
179+ . chain ( maybe_remote_replicas) ,
180+ )
139181 } else {
140182 Either :: Right ( std:: iter:: empty :: < NodeRef < ' a > > ( ) )
141183 } ;
@@ -263,39 +305,59 @@ impl DefaultPolicy {
263305 fn replicas < ' a > (
264306 & ' a self ,
265307 ts : & TokenWithStrategy < ' a > ,
266- should_be_local : bool ,
308+ replica_location : ReplicaLocation ,
267309 predicate : impl Fn ( & NodeRef < ' a > ) -> bool ,
268310 cluster : & ' a ClusterData ,
269311 ) -> impl Iterator < Item = NodeRef < ' a > > {
312+ let predicate = move |node| match replica_location {
313+ ReplicaLocation :: Any | ReplicaLocation :: Datacenter => predicate ( & node) ,
314+ ReplicaLocation :: DatacenterAndRack => {
315+ predicate ( & node) && node. rack == self . preferred_rack
316+ }
317+ } ;
318+ let should_be_local = match replica_location {
319+ ReplicaLocation :: Any => false ,
320+ ReplicaLocation :: Datacenter | ReplicaLocation :: DatacenterAndRack => true ,
321+ } ;
270322 self . nonfiltered_replica_set ( ts, should_be_local, cluster)
271323 . into_iter ( )
272- . filter ( predicate)
324+ . filter ( move | node : & NodeRef < ' a > | predicate ( node ) )
273325 }
274326
275327 fn pick_replica < ' a > (
276328 & ' a self ,
277329 ts : & TokenWithStrategy < ' a > ,
278- should_be_local : bool ,
330+ replica_location : ReplicaLocation ,
279331 predicate : & impl Fn ( & NodeRef < ' a > ) -> bool ,
280332 cluster : & ' a ClusterData ,
281333 ) -> Option < NodeRef < ' a > > {
334+ let predicate = |node| match replica_location {
335+ ReplicaLocation :: Any | ReplicaLocation :: Datacenter => predicate ( & node) ,
336+ ReplicaLocation :: DatacenterAndRack => {
337+ predicate ( & node) && node. rack == self . preferred_rack
338+ }
339+ } ;
340+ let should_be_local = match replica_location {
341+ ReplicaLocation :: Any => false ,
342+ ReplicaLocation :: Datacenter | ReplicaLocation :: DatacenterAndRack => true ,
343+ } ;
282344 let replica_set = self . nonfiltered_replica_set ( ts, should_be_local, cluster) ;
283345 if let Some ( fixed) = self . fixed_shuffle_seed {
284346 let mut gen = Pcg32 :: new ( fixed, 0 ) ;
285- replica_set. choose_filtered ( & mut gen, |node| predicate ( & node ) )
347+ replica_set. choose_filtered ( & mut gen, predicate)
286348 } else {
287- replica_set. choose_filtered ( & mut thread_rng ( ) , |node| predicate ( & node ) )
349+ replica_set. choose_filtered ( & mut thread_rng ( ) , predicate)
288350 }
289351 }
290352
291353 fn shuffled_replicas < ' a > (
292354 & ' a self ,
293355 ts : & TokenWithStrategy < ' a > ,
294- should_be_local : bool ,
356+ replica_location : ReplicaLocation ,
295357 predicate : impl Fn ( & NodeRef < ' a > ) -> bool ,
296358 cluster : & ' a ClusterData ,
297359 ) -> impl Iterator < Item = NodeRef < ' a > > {
298- let replicas = self . replicas ( ts, should_be_local , predicate, cluster) ;
360+ let replicas = self . replicas ( ts, replica_location , predicate, cluster) ;
299361
300362 self . shuffle ( replicas)
301363 }
@@ -364,6 +426,7 @@ impl Default for DefaultPolicy {
364426 fn default ( ) -> Self {
365427 Self {
366428 preferred_datacenter : None ,
429+ preferred_rack : None ,
367430 is_token_aware : true ,
368431 permit_dc_failover : false ,
369432 pick_predicate : Box :: new ( Self :: is_alive) ,
@@ -390,6 +453,7 @@ impl Default for DefaultPolicy {
390453#[ derive( Clone , Debug ) ]
391454pub struct DefaultPolicyBuilder {
392455 preferred_datacenter : Option < String > ,
456+ preferred_rack : Option < String > ,
393457 is_token_aware : bool ,
394458 permit_dc_failover : bool ,
395459 latency_awareness : Option < LatencyAwarenessBuilder > ,
@@ -401,6 +465,7 @@ impl DefaultPolicyBuilder {
401465 pub fn new ( ) -> Self {
402466 Self {
403467 preferred_datacenter : None ,
468+ preferred_rack : None ,
404469 is_token_aware : true ,
405470 permit_dc_failover : false ,
406471 latency_awareness : None ,
@@ -421,6 +486,7 @@ impl DefaultPolicyBuilder {
421486
422487 Arc :: new ( DefaultPolicy {
423488 preferred_datacenter : self . preferred_datacenter ,
489+ preferred_rack : self . preferred_rack ,
424490 is_token_aware : self . is_token_aware ,
425491 permit_dc_failover : self . permit_dc_failover ,
426492 pick_predicate,
@@ -429,6 +495,19 @@ impl DefaultPolicyBuilder {
429495 } )
430496 }
431497
498+ /// Sets the rack to be preferred by this policy
499+ ///
500+ /// Allows the load balancing policy to prioritize nodes based on their availability zones
501+ /// in the preferred datacenter.
502+ /// When a preferred rack is set, the policy will first return replicas in the local rack
503+ /// in the preferred datacenter, and then the other replicas in the datacenter.
504+ ///
505+ /// When a preferred datacenter is not set, setting preferred rack will not have any effect.
506+ pub fn prefer_rack ( mut self , rack_name : String ) -> Self {
507+ self . preferred_rack = Some ( rack_name) ;
508+ self
509+ }
510+
432511 /// Sets the datacenter to be preferred by this policy.
433512 ///
434513 /// Allows the load balancing policy to prioritize nodes based on their location.
@@ -1234,6 +1313,102 @@ mod tests {
12341313 . group ( [ B , C , E ] ) // remote nodes
12351314 . build ( ) ,
12361315 } ,
1316+ // Keyspace SS with RF=2 with enabled DC failover and rack-awareness
1317+ Test {
1318+ policy : DefaultPolicy {
1319+ preferred_datacenter : Some ( "eu" . to_owned ( ) ) ,
1320+ preferred_rack : Some ( "r1" . to_owned ( ) ) ,
1321+ is_token_aware : true ,
1322+ permit_dc_failover : true ,
1323+ ..Default :: default ( )
1324+ } ,
1325+ routing_info : RoutingInfo {
1326+ token : Some ( Token { value : 160 } ) ,
1327+ keyspace : Some ( KEYSPACE_NTS_RF_3 ) ,
1328+ consistency : Consistency :: One ,
1329+ ..Default :: default ( )
1330+ } ,
1331+ // going though the ring, we get order: F , A , C , D , G , B , E
1332+ // us eu eu us eu eu us
1333+ // r2 r1 r1 r1 r2 r1 r1
1334+ expected_groups : ExpectedGroupsBuilder :: new ( )
1335+ . group ( [ A , C ] ) // pick local rack replicas
1336+ . group ( [ G ] ) // local DC replicas
1337+ . group ( [ F , D , E ] ) // remote replicas
1338+ . group ( [ B ] ) // local nodes
1339+ . build ( ) ,
1340+ } ,
1341+ // Keyspace SS with RF=2 with enabled rack-awareness, shuffling replicas disabled
1342+ Test {
1343+ policy : DefaultPolicy {
1344+ preferred_datacenter : Some ( "eu" . to_owned ( ) ) ,
1345+ preferred_rack : Some ( "r1" . to_owned ( ) ) ,
1346+ is_token_aware : true ,
1347+ permit_dc_failover : false ,
1348+ fixed_shuffle_seed : Some ( 123 ) ,
1349+ ..Default :: default ( )
1350+ } ,
1351+ routing_info : RoutingInfo {
1352+ token : Some ( Token { value : 560 } ) ,
1353+ keyspace : Some ( KEYSPACE_SS_RF_2 ) ,
1354+ consistency : Consistency :: Two ,
1355+ ..Default :: default ( )
1356+ } ,
1357+ // going though the ring, we get order: B , C , E , G , A , F , D
1358+ // eu eu us eu eu us us
1359+ // r1 r1 r1 r2 r1 r2 r1
1360+ expected_groups : ExpectedGroupsBuilder :: new ( )
1361+ . deterministic ( [ B ] ) // pick local rack replicas
1362+ . deterministic ( [ C ] ) // fallback replicas
1363+ . group ( [ A , G ] ) // local nodes
1364+ . build ( ) ,
1365+ } ,
1366+ // Keyspace SS with RF=2 with enabled rack-awareness and no local-rack replica
1367+ Test {
1368+ policy : DefaultPolicy {
1369+ preferred_datacenter : Some ( "eu" . to_owned ( ) ) ,
1370+ preferred_rack : Some ( "r2" . to_owned ( ) ) ,
1371+ is_token_aware : true ,
1372+ permit_dc_failover : false ,
1373+ ..Default :: default ( )
1374+ } ,
1375+ routing_info : RoutingInfo {
1376+ token : Some ( Token { value : 160 } ) ,
1377+ keyspace : Some ( KEYSPACE_SS_RF_2 ) ,
1378+ consistency : Consistency :: One ,
1379+ ..Default :: default ( )
1380+ } ,
1381+ // going though the ring, we get order: F , A , C , D , G , B , E
1382+ // us eu eu us eu eu us
1383+ // r2 r1 r1 r1 r2 r1 r1
1384+ expected_groups : ExpectedGroupsBuilder :: new ( )
1385+ . group ( [ A ] ) // pick local DC
1386+ . group ( [ C , G , B ] ) // local nodes
1387+ . build ( ) ,
1388+ } ,
1389+ // No preferred DC, preferred rack should be ignored, failover permitted
1390+ Test {
1391+ policy : DefaultPolicy {
1392+ preferred_datacenter : None ,
1393+ preferred_rack : Some ( "r2" . to_owned ( ) ) ,
1394+ is_token_aware : true ,
1395+ permit_dc_failover : true ,
1396+ ..Default :: default ( )
1397+ } ,
1398+ routing_info : RoutingInfo {
1399+ token : Some ( Token { value : 160 } ) ,
1400+ keyspace : Some ( KEYSPACE_NTS_RF_2 ) ,
1401+ consistency : Consistency :: Quorum ,
1402+ ..Default :: default ( )
1403+ } ,
1404+ // going though the ring, we get order: F , A , C , D , G , B , E
1405+ // us eu eu us eu eu us
1406+ // r2 r1 r1 r1 r2 r1 r1
1407+ expected_groups : ExpectedGroupsBuilder :: new ( )
1408+ . group ( [ A , D , F , G ] ) // remote replicas
1409+ . group ( [ B , C , E ] ) // remote nodes
1410+ . build ( ) ,
1411+ } ,
12371412 ] ;
12381413
12391414 for Test {
@@ -1786,6 +1961,7 @@ mod latency_awareness {
17861961
17871962 DefaultPolicy {
17881963 preferred_datacenter : Some ( "eu" . to_owned ( ) ) ,
1964+ preferred_rack : None ,
17891965 permit_dc_failover : true ,
17901966 is_token_aware : true ,
17911967 pick_predicate,
0 commit comments