@@ -59,7 +59,12 @@ enum ReplicaOrder {
5959 RingOrder ,
6060}
6161
62- // TODO: LWT optimisation
62+ #[ derive( Clone , Copy ) ]
63+ enum StatementType {
64+ Lwt ,
65+ NonLwt ,
66+ }
67+
6368/// The default load balancing policy.
6469///
6570/// It can be configured to be datacenter-aware and token-aware.
@@ -105,6 +110,11 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
105110 ) ;
106111 }
107112 }
113+ let statement_type = if query. is_confirmed_lwt {
114+ StatementType :: Lwt
115+ } else {
116+ StatementType :: NonLwt
117+ } ;
108118 if let Some ( ts) = & routing_info. token_with_strategy {
109119 if let ReplicaLocationPreference :: DatacenterAndRack ( dc, rack) = & self . preferences {
110120 // Try to pick some alive local rack random replica.
@@ -113,6 +123,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
113123 ReplicaLocationCriteria :: DatacenterAndRack ( dc, rack) ,
114124 & self . pick_predicate ,
115125 cluster,
126+ statement_type,
116127 ) ;
117128
118129 if let Some ( alive_local_rack_replica) = local_rack_picked {
@@ -129,6 +140,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
129140 ReplicaLocationCriteria :: Datacenter ( dc) ,
130141 & self . pick_predicate ,
131142 cluster,
143+ statement_type,
132144 ) ;
133145
134146 if let Some ( alive_local_replica) = picked {
@@ -146,6 +158,7 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
146158 ReplicaLocationCriteria :: Any ,
147159 & self . pick_predicate ,
148160 cluster,
161+ statement_type,
149162 ) ;
150163 if let Some ( alive_remote_replica) = picked {
151164 return Some ( alive_remote_replica) ;
@@ -196,16 +209,22 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
196209 cluster : & ' a ClusterData ,
197210 ) -> FallbackPlan < ' a > {
198211 let routing_info = self . routing_info ( query, cluster) ;
212+ let statement_type = if query. is_confirmed_lwt {
213+ StatementType :: Lwt
214+ } else {
215+ StatementType :: NonLwt
216+ } ;
199217
200218 // If token is available, get a shuffled list of alive replicas.
201219 let maybe_replicas = if let Some ( ts) = & routing_info. token_with_strategy {
202220 let maybe_local_rack_replicas =
203221 if let ReplicaLocationPreference :: DatacenterAndRack ( dc, rack) = & self . preferences {
204- let local_rack_replicas = self . shuffled_replicas (
222+ let local_rack_replicas = self . fallback_replicas (
205223 ts,
206224 ReplicaLocationCriteria :: DatacenterAndRack ( dc, rack) ,
207225 Self :: is_alive,
208226 cluster,
227+ statement_type,
209228 ) ;
210229 Either :: Left ( local_rack_replicas)
211230 } else {
@@ -216,11 +235,12 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
216235 | ReplicaLocationPreference :: Datacenter ( dc) =
217236 & self . preferences
218237 {
219- let local_replicas = self . shuffled_replicas (
238+ let local_replicas = self . fallback_replicas (
220239 ts,
221240 ReplicaLocationCriteria :: Datacenter ( dc) ,
222241 Self :: is_alive,
223242 cluster,
243+ statement_type,
224244 ) ;
225245 Either :: Left ( local_replicas)
226246 } else {
@@ -231,11 +251,12 @@ or refrain from preferring datacenters (which may ban all other datacenters, if
231251 let maybe_remote_replicas = if self . preferences . datacenter ( ) . is_none ( )
232252 || self . is_datacenter_failover_possible ( & routing_info)
233253 {
234- let remote_replicas = self . shuffled_replicas (
254+ let remote_replicas = self . fallback_replicas (
235255 ts,
236256 ReplicaLocationCriteria :: Any ,
237257 Self :: is_alive,
238258 cluster,
259+ statement_type,
239260 ) ;
240261 Either :: Left ( remote_replicas)
241262 } else {
@@ -416,6 +437,80 @@ impl DefaultPolicy {
416437 replica_location : ReplicaLocationCriteria < ' a > ,
417438 predicate : & ' a impl Fn ( & NodeRef < ' a > ) -> bool ,
418439 cluster : & ' a ClusterData ,
440+ statement_type : StatementType ,
441+ ) -> Option < NodeRef < ' a > > {
442+ match statement_type {
443+ StatementType :: Lwt => self . pick_first_replica ( ts, replica_location, predicate, cluster) ,
444+ StatementType :: NonLwt => {
445+ self . pick_random_replica ( ts, replica_location, predicate, cluster)
446+ }
447+ }
448+ }
449+
450+ // This is to be used for LWT optimisation: in order to reduce contention
451+ // caused by Paxos conflicts, we always try to query replicas in the same, ring order.
452+ //
453+ // If preferred rack and DC are set, then the first (encountered on the ring) replica
454+ // that resides in that rack in that DC **and** satisfies the `predicate` is returned.
455+ //
456+ // If preferred DC is set, then the first (encountered on the ring) replica
457+ // that resides in that DC **and** satisfies the `predicate` is returned.
458+ //
459+ // If no DC/rack preferences are set, then the only possible replica to be returned
460+ // (due to expensive computation of the others, and we avoid expensive computation in `pick()`)
461+ // is the primary replica. It is returned **iff** it satisfies the predicate, else None.
462+ fn pick_first_replica < ' a > (
463+ & ' a self ,
464+ ts : & TokenWithStrategy < ' a > ,
465+ replica_location : ReplicaLocationCriteria < ' a > ,
466+ predicate : & ' a impl Fn ( & NodeRef < ' a > ) -> bool ,
467+ cluster : & ' a ClusterData ,
468+ ) -> Option < NodeRef < ' a > > {
469+ match replica_location {
470+ ReplicaLocationCriteria :: Any => {
471+ // ReplicaSet returned by ReplicaLocator for this case:
472+ // 1) can be precomputed and lated used cheaply,
473+ // 2) returns replicas in the **non-ring order** (this because ReplicaSet chains
474+ // ring-ordered replicas sequences from different DCs, thus not preserving
475+ // the global ring order).
476+ // Because of 2), we can't use a precomputed ReplicaSet, but instead we need ReplicasOrdered.
477+ // As ReplicasOrdered can compute cheaply only the primary global replica
478+ // (computation of the remaining ones is expensive), in case that the primary replica
479+ // does not satisfy the `predicate`, None is returned. All expensive computation
480+ // is to be done only when `fallback()` is called.
481+ self . nonfiltered_replica_set ( ts, replica_location, cluster)
482+ . into_replicas_ordered ( )
483+ . into_iter ( )
484+ . next ( )
485+ . and_then ( |primary_replica| {
486+ predicate ( & primary_replica) . then_some ( primary_replica)
487+ } )
488+ }
489+ ReplicaLocationCriteria :: Datacenter ( _)
490+ | ReplicaLocationCriteria :: DatacenterAndRack ( _, _) => {
491+ // ReplicaSet returned by ReplicaLocator for this case:
492+ // 1) can be precomputed and lated used cheaply,
493+ // 2) returns replicas in the ring order (this is not true for the case
494+ // when multiple DCs are allowed, because ReplicaSet chains replicas sequences
495+ // from different DCs, thus not preserving the global ring order)
496+ self . replicas (
497+ ts,
498+ replica_location,
499+ move |node| predicate ( node) ,
500+ cluster,
501+ ReplicaOrder :: RingOrder ,
502+ )
503+ . next ( )
504+ }
505+ }
506+ }
507+
508+ fn pick_random_replica < ' a > (
509+ & ' a self ,
510+ ts : & TokenWithStrategy < ' a > ,
511+ replica_location : ReplicaLocationCriteria < ' a > ,
512+ predicate : & ' a impl Fn ( & NodeRef < ' a > ) -> bool ,
513+ cluster : & ' a ClusterData ,
419514 ) -> Option < NodeRef < ' a > > {
420515 let predicate = Self :: make_rack_predicate ( predicate, replica_location) ;
421516
@@ -429,22 +524,27 @@ impl DefaultPolicy {
429524 }
430525 }
431526
432- fn shuffled_replicas < ' a > (
527+ fn fallback_replicas < ' a > (
433528 & ' a self ,
434529 ts : & TokenWithStrategy < ' a > ,
435530 replica_location : ReplicaLocationCriteria < ' a > ,
436531 predicate : impl Fn ( & NodeRef < ' a > ) -> bool + ' a ,
437532 cluster : & ' a ClusterData ,
533+ statement_type : StatementType ,
438534 ) -> impl Iterator < Item = NodeRef < ' a > > {
439- let replicas = self . replicas (
440- ts,
441- replica_location,
442- predicate,
443- cluster,
444- ReplicaOrder :: Arbitrary ,
445- ) ;
535+ let order = match statement_type {
536+ StatementType :: Lwt => ReplicaOrder :: RingOrder ,
537+ StatementType :: NonLwt => ReplicaOrder :: Arbitrary ,
538+ } ;
446539
447- self . shuffle ( replicas)
540+ let replicas = self . replicas ( ts, replica_location, predicate, cluster, order) ;
541+
542+ match statement_type {
543+ // As an LWT optimisation: in order to reduce contention caused by Paxos conflicts,
544+ // we always try to query replicas in the same order.
545+ StatementType :: Lwt => Either :: Left ( replicas) ,
546+ StatementType :: NonLwt => Either :: Right ( self . shuffle ( replicas) ) ,
547+ }
448548 }
449549
450550 fn randomly_rotated_nodes ( nodes : & [ Arc < Node > ] ) -> impl Iterator < Item = NodeRef < ' _ > > {
0 commit comments