@@ -14,10 +14,19 @@ use std::{fmt, sync::Arc, time::Duration};
1414use tracing:: warn;
1515
1616#[ derive( Clone , Copy ) ]
17- enum ReplicaLocationCriteria {
17+ enum ReplicaLocationCriteria < ' a > {
1818 Any ,
19- Datacenter ,
20- DatacenterAndRack ,
19+ Datacenter ( & ' a str ) ,
20+ DatacenterAndRack ( & ' a str , & ' a str ) ,
21+ }
22+
23+ impl < ' a > ReplicaLocationCriteria < ' a > {
24+ fn datacenter ( & self ) -> Option < & ' a str > {
25+ match self {
26+ Self :: Any => None ,
27+ Self :: Datacenter ( dc) | Self :: DatacenterAndRack ( dc, _) => Some ( dc) ,
28+ }
29+ }
2130}
2231
2332#[ derive( Debug , Clone ) ]
@@ -35,6 +44,7 @@ impl ReplicaLocationPreference {
3544 }
3645 }
3746
47+ #[ allow( unused) ]
3848 fn rack ( & self ) -> Option < & str > {
3949 match self {
4050 Self :: Any | Self :: Datacenter ( _) => None ,
@@ -90,12 +100,11 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
90100 }
91101 }
92102 if let Some ( ts) = & routing_info. token_with_strategy {
93- // Try to pick some alive local rack random replica.
94- // If preferred rack is not specified, try to pick local DC replica.
95- if self . preferences . rack ( ) . is_some ( ) {
103+ if let ReplicaLocationPreference :: DatacenterAndRack ( dc, rack) = & self . preferences {
104+ // Try to pick some alive local rack random replica.
96105 let local_rack_picked = self . pick_replica (
97106 ts,
98- ReplicaLocationCriteria :: DatacenterAndRack ,
107+ ReplicaLocationCriteria :: DatacenterAndRack ( dc , rack ) ,
99108 & self . pick_predicate ,
100109 cluster,
101110 ) ;
@@ -105,21 +114,27 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
105114 }
106115 }
107116
108- // Try to pick some alive local random replica.
109- // If preferred datacenter is not specified, all replicas are treated as local.
110- let picked = self . pick_replica (
111- ts,
112- ReplicaLocationCriteria :: Datacenter ,
113- & self . pick_predicate ,
114- cluster,
115- ) ;
117+ if let ReplicaLocationPreference :: DatacenterAndRack ( dc, _)
118+ | ReplicaLocationPreference :: Datacenter ( dc) = & self . preferences
119+ {
120+ // Try to pick some alive local random replica.
121+ let picked = self . pick_replica (
122+ ts,
123+ ReplicaLocationCriteria :: Datacenter ( dc) ,
124+ & self . pick_predicate ,
125+ cluster,
126+ ) ;
116127
117- if let Some ( alive_local_replica) = picked {
118- return Some ( alive_local_replica) ;
128+ if let Some ( alive_local_replica) = picked {
129+ return Some ( alive_local_replica) ;
130+ }
119131 }
120132
121- // If datacenter failover is possible, loosen restriction about locality.
122- if self . is_datacenter_failover_possible ( & routing_info) {
133+ // If preferred datacenter is not specified, or if datacenter failover is possible, loosen restriction about locality.
134+ if self . preferences . datacenter ( ) . is_none ( )
135+ || self . is_datacenter_failover_possible ( & routing_info)
136+ {
137+ // Try to pick some alive random replica.
123138 let picked = self . pick_replica (
124139 ts,
125140 ReplicaLocationCriteria :: Any ,
@@ -178,21 +193,38 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
178193
179194 // If token is available, get a shuffled list of alive replicas.
180195 let maybe_replicas = if let Some ( ts) = & routing_info. token_with_strategy {
181- let local_rack_replicas = self . shuffled_replicas (
182- ts,
183- ReplicaLocationCriteria :: DatacenterAndRack ,
184- Self :: is_alive,
185- cluster,
186- ) ;
187- let local_replicas = self . shuffled_replicas (
188- ts,
189- ReplicaLocationCriteria :: Datacenter ,
190- Self :: is_alive,
191- cluster,
192- ) ;
196+ let maybe_local_rack_replicas =
197+ if let ReplicaLocationPreference :: DatacenterAndRack ( dc, rack) = & self . preferences {
198+ let local_rack_replicas = self . shuffled_replicas (
199+ ts,
200+ ReplicaLocationCriteria :: DatacenterAndRack ( dc, rack) ,
201+ Self :: is_alive,
202+ cluster,
203+ ) ;
204+ Either :: Left ( local_rack_replicas)
205+ } else {
206+ Either :: Right ( std:: iter:: empty ( ) )
207+ } ;
208+
209+ let maybe_local_replicas = if let ReplicaLocationPreference :: DatacenterAndRack ( dc, _)
210+ | ReplicaLocationPreference :: Datacenter ( dc) =
211+ & self . preferences
212+ {
213+ let local_replicas = self . shuffled_replicas (
214+ ts,
215+ ReplicaLocationCriteria :: Datacenter ( dc) ,
216+ Self :: is_alive,
217+ cluster,
218+ ) ;
219+ Either :: Left ( local_replicas)
220+ } else {
221+ Either :: Right ( std:: iter:: empty ( ) )
222+ } ;
193223
194- // If a datacenter failover is possible, loosen restriction about locality.
195- let maybe_remote_replicas = if self . is_datacenter_failover_possible ( & routing_info) {
224+ // If no datacenter is preferred, or datacenter failover is possible, loosen restriction about locality.
225+ let maybe_remote_replicas = if self . preferences . datacenter ( ) . is_none ( )
226+ || self . is_datacenter_failover_possible ( & routing_info)
227+ {
196228 let remote_replicas = self . shuffled_replicas (
197229 ts,
198230 ReplicaLocationCriteria :: Any ,
@@ -204,11 +236,11 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
204236 Either :: Right ( std:: iter:: empty ( ) )
205237 } ;
206238
207- // Produce an iterator, prioritizes local replicas.
208- // If preferred datacenter is not specified, every replica is treated as a local one.
239+ // Produce an iterator, prioritizing local replicas.
240+ // If preferred datacenter is not specified, every replica is treated as a remote one.
209241 Either :: Left (
210- local_rack_replicas
211- . chain ( local_replicas )
242+ maybe_local_rack_replicas
243+ . chain ( maybe_local_replicas )
212244 . chain ( maybe_remote_replicas) ,
213245 )
214246 } else {
@@ -323,19 +355,10 @@ impl DefaultPolicy {
323355 fn nonfiltered_replica_set < ' a > (
324356 & ' a self ,
325357 ts : & TokenWithStrategy < ' a > ,
326- replica_location : ReplicaLocationCriteria ,
358+ replica_location : ReplicaLocationCriteria < ' a > ,
327359 cluster : & ' a ClusterData ,
328360 ) -> ReplicaSet < ' a > {
329- let should_be_local = match replica_location {
330- ReplicaLocationCriteria :: Any => false ,
331- ReplicaLocationCriteria :: Datacenter | ReplicaLocationCriteria :: DatacenterAndRack => {
332- true
333- }
334- } ;
335-
336- let datacenter = should_be_local
337- . then_some ( self . preferences . datacenter ( ) )
338- . flatten ( ) ;
361+ let datacenter = replica_location. datacenter ( ) ;
339362
340363 cluster
341364 . replica_locator ( )
@@ -345,25 +368,26 @@ impl DefaultPolicy {
345368 /// Wraps the provided predicate, adding the requirement for rack to match.
346369 fn make_rack_predicate < ' a > (
347370 predicate : impl Fn ( & NodeRef < ' a > ) -> bool + ' a ,
348- replica_location : ReplicaLocationCriteria ,
349- preferences : & ' a ReplicaLocationPreference ,
371+ replica_location : ReplicaLocationCriteria < ' a > ,
350372 ) -> impl Fn ( NodeRef < ' a > ) -> bool {
351373 move |node| match replica_location {
352- ReplicaLocationCriteria :: Any | ReplicaLocationCriteria :: Datacenter => predicate ( & node) ,
353- ReplicaLocationCriteria :: DatacenterAndRack => {
354- predicate ( & node) && node. rack . as_deref ( ) == preferences. rack ( )
374+ ReplicaLocationCriteria :: Any | ReplicaLocationCriteria :: Datacenter ( _) => {
375+ predicate ( & node)
376+ }
377+ ReplicaLocationCriteria :: DatacenterAndRack ( _, rack) => {
378+ predicate ( & node) && node. rack . as_deref ( ) == Some ( rack)
355379 }
356380 }
357381 }
358382
359383 fn replicas < ' a > (
360384 & ' a self ,
361385 ts : & TokenWithStrategy < ' a > ,
362- replica_location : ReplicaLocationCriteria ,
386+ replica_location : ReplicaLocationCriteria < ' a > ,
363387 predicate : impl Fn ( & NodeRef < ' a > ) -> bool + ' a ,
364388 cluster : & ' a ClusterData ,
365389 ) -> impl Iterator < Item = NodeRef < ' a > > {
366- let predicate = Self :: make_rack_predicate ( predicate, replica_location, & self . preferences ) ;
390+ let predicate = Self :: make_rack_predicate ( predicate, replica_location) ;
367391
368392 self . nonfiltered_replica_set ( ts, replica_location, cluster)
369393 . into_iter ( )
@@ -373,11 +397,11 @@ impl DefaultPolicy {
373397 fn pick_replica < ' a > (
374398 & ' a self ,
375399 ts : & TokenWithStrategy < ' a > ,
376- replica_location : ReplicaLocationCriteria ,
400+ replica_location : ReplicaLocationCriteria < ' a > ,
377401 predicate : & ' a impl Fn ( & NodeRef < ' a > ) -> bool ,
378402 cluster : & ' a ClusterData ,
379403 ) -> Option < NodeRef < ' a > > {
380- let predicate = Self :: make_rack_predicate ( predicate, replica_location, & self . preferences ) ;
404+ let predicate = Self :: make_rack_predicate ( predicate, replica_location) ;
381405
382406 let replica_set = self . nonfiltered_replica_set ( ts, replica_location, cluster) ;
383407
@@ -392,7 +416,7 @@ impl DefaultPolicy {
392416 fn shuffled_replicas < ' a > (
393417 & ' a self ,
394418 ts : & TokenWithStrategy < ' a > ,
395- replica_location : ReplicaLocationCriteria ,
419+ replica_location : ReplicaLocationCriteria < ' a > ,
396420 predicate : impl Fn ( & NodeRef < ' a > ) -> bool + ' a ,
397421 cluster : & ' a ClusterData ,
398422 ) -> impl Iterator < Item = NodeRef < ' a > > {
0 commit comments