@@ -13,6 +13,13 @@ use scylla_cql::{errors::QueryError, frame::types::SerialConsistency, Consistenc
1313use std:: { fmt, sync:: Arc , time:: Duration } ;
1414use tracing:: warn;
1515
16+ #[ derive( Clone , Copy ) ]
17+ enum ReplicaLocation {
18+ Any ,
19+ Datacenter ,
20+ DatacenterAndRack ,
21+ }
22+
1623// TODO: LWT optimisation
1724/// The default load balancing policy.
1825///
@@ -62,17 +69,38 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
6269 }
6370 }
6471 if let Some ( ts) = & routing_info. token_with_strategy {
72+ // Try to pick some alive local rack random replica.
73+ // If preferred rack is not specified, try to pick local DC replica.
74+ if self . preferred_rack . is_some ( ) {
75+ let local_rack_picked = self . pick_replica (
76+ ts,
77+ ReplicaLocation :: DatacenterAndRack ,
78+ & self . pick_predicate ,
79+ cluster,
80+ ) ;
81+
82+ if let Some ( alive_local_rack_replica) = local_rack_picked {
83+ return Some ( alive_local_rack_replica) ;
84+ }
85+ }
86+
6587 // Try to pick some alive local random replica.
6688 // If preferred datacenter is not specified, all replicas are treated as local.
67- let picked = self . pick_replica ( ts, true , & self . pick_predicate , cluster) ;
89+ let picked = self . pick_replica (
90+ ts,
91+ ReplicaLocation :: Datacenter ,
92+ & self . pick_predicate ,
93+ cluster,
94+ ) ;
6895
6996 if let Some ( alive_local_replica) = picked {
7097 return Some ( alive_local_replica) ;
7198 }
7299
73100 // If datacenter failover is possible, loosen restriction about locality.
74101 if self . is_datacenter_failover_possible ( & routing_info) {
75- let picked = self . pick_replica ( ts, false , & self . pick_predicate , cluster) ;
102+ let picked =
103+ self . pick_replica ( ts, ReplicaLocation :: Any , & self . pick_predicate , cluster) ;
76104 if let Some ( alive_remote_replica) = picked {
77105 return Some ( alive_remote_replica) ;
78106 }
@@ -125,19 +153,31 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
125153
126154 // If token is available, get a shuffled list of alive replicas.
127155 let maybe_replicas = if let Some ( ts) = & routing_info. token_with_strategy {
128- let local_replicas = self . shuffled_replicas ( ts, true , Self :: is_alive, cluster) ;
156+ let local_rack_replicas = self . shuffled_replicas (
157+ ts,
158+ ReplicaLocation :: DatacenterAndRack ,
159+ Self :: is_alive,
160+ cluster,
161+ ) ;
162+ let local_replicas =
163+ self . shuffled_replicas ( ts, ReplicaLocation :: Datacenter , Self :: is_alive, cluster) ;
129164
130165 // If a datacenter failover is possible, loosen restriction about locality.
131166 let maybe_remote_replicas = if self . is_datacenter_failover_possible ( & routing_info) {
132- let remote_replicas = self . shuffled_replicas ( ts, false , Self :: is_alive, cluster) ;
167+ let remote_replicas =
168+ self . shuffled_replicas ( ts, ReplicaLocation :: Any , Self :: is_alive, cluster) ;
133169 Either :: Left ( remote_replicas)
134170 } else {
135171 Either :: Right ( std:: iter:: empty ( ) )
136172 } ;
137173
138174 // Produce an iterator, prioritizes local replicas.
139175 // If preferred datacenter is not specified, every replica is treated as a local one.
140- Either :: Left ( local_replicas. chain ( maybe_remote_replicas) )
176+ Either :: Left (
177+ local_rack_replicas
178+ . chain ( local_replicas)
179+ . chain ( maybe_remote_replicas) ,
180+ )
141181 } else {
142182 Either :: Right ( std:: iter:: empty :: < NodeRef < ' a > > ( ) )
143183 } ;
@@ -265,39 +305,59 @@ impl DefaultPolicy {
265305 fn replicas < ' a > (
266306 & ' a self ,
267307 ts : & TokenWithStrategy < ' a > ,
268- should_be_local : bool ,
308+ replica_location : ReplicaLocation ,
269309 predicate : impl Fn ( & NodeRef < ' a > ) -> bool ,
270310 cluster : & ' a ClusterData ,
271311 ) -> impl Iterator < Item = NodeRef < ' a > > {
312+ let predicate = move |node| match replica_location {
313+ ReplicaLocation :: Any | ReplicaLocation :: Datacenter => predicate ( & node) ,
314+ ReplicaLocation :: DatacenterAndRack => {
315+ predicate ( & node) && node. rack == self . preferred_rack
316+ }
317+ } ;
318+ let should_be_local = match replica_location {
319+ ReplicaLocation :: Any => false ,
320+ ReplicaLocation :: Datacenter | ReplicaLocation :: DatacenterAndRack => true ,
321+ } ;
272322 self . nonfiltered_replica_set ( ts, should_be_local, cluster)
273323 . into_iter ( )
274- . filter ( predicate)
324+ . filter ( move | node : & NodeRef < ' a > | predicate ( node ) )
275325 }
276326
277327 fn pick_replica < ' a > (
278328 & ' a self ,
279329 ts : & TokenWithStrategy < ' a > ,
280- should_be_local : bool ,
330+ replica_location : ReplicaLocation ,
281331 predicate : & impl Fn ( & NodeRef < ' a > ) -> bool ,
282332 cluster : & ' a ClusterData ,
283333 ) -> Option < NodeRef < ' a > > {
334+ let predicate = |node| match replica_location {
335+ ReplicaLocation :: Any | ReplicaLocation :: Datacenter => predicate ( & node) ,
336+ ReplicaLocation :: DatacenterAndRack => {
337+ predicate ( & node) && node. rack == self . preferred_rack
338+ }
339+ } ;
340+ let should_be_local = match replica_location {
341+ ReplicaLocation :: Any => false ,
342+ ReplicaLocation :: Datacenter | ReplicaLocation :: DatacenterAndRack => true ,
343+ } ;
284344 let replica_set = self . nonfiltered_replica_set ( ts, should_be_local, cluster) ;
285345 if let Some ( fixed) = self . fixed_shuffle_seed {
286346 let mut gen = Pcg32 :: new ( fixed, 0 ) ;
287- replica_set. choose_filtered ( & mut gen, |node| predicate ( & node ) )
347+ replica_set. choose_filtered ( & mut gen, predicate)
288348 } else {
289- replica_set. choose_filtered ( & mut thread_rng ( ) , |node| predicate ( & node ) )
349+ replica_set. choose_filtered ( & mut thread_rng ( ) , predicate)
290350 }
291351 }
292352
293353 fn shuffled_replicas < ' a > (
294354 & ' a self ,
295355 ts : & TokenWithStrategy < ' a > ,
296- should_be_local : bool ,
356+ replica_location : ReplicaLocation ,
297357 predicate : impl Fn ( & NodeRef < ' a > ) -> bool ,
298358 cluster : & ' a ClusterData ,
299359 ) -> impl Iterator < Item = NodeRef < ' a > > {
300- let replicas = self . replicas ( ts, should_be_local , predicate, cluster) ;
360+ let replicas = self . replicas ( ts, replica_location , predicate, cluster) ;
301361
302362 self . shuffle ( replicas)
303363 }
0 commit comments