@@ -14,7 +14,11 @@ use itertools::Itertools;
1414use precomputed_replicas:: PrecomputedReplicas ;
1515use replicas:: { ReplicasArray , EMPTY_REPLICAS } ;
1616use replication_info:: ReplicationInfo ;
17- use std:: { cmp, collections:: HashMap , sync:: Arc } ;
17+ use std:: {
18+ cmp,
19+ collections:: { HashMap , HashSet } ,
20+ sync:: Arc ,
21+ } ;
1822use tracing:: debug;
1923
2024/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication
@@ -376,7 +380,7 @@ impl<'a> IntoIterator for ReplicaSet<'a> {
376380 locator,
377381 token,
378382 } => {
379- if let Some ( datacenter) = & locator. datacenters . first ( ) {
383+ if let Some ( datacenter) = locator. datacenters . first ( ) {
380384 let repfactor = * datacenter_repfactors. get ( datacenter. as_str ( ) ) . unwrap_or ( & 0 ) ;
381385 ReplicaSetIteratorInner :: ChainedNTS {
382386 replicas : locator
@@ -549,3 +553,255 @@ impl<'a> Iterator for ReplicaSetIterator<'a> {
549553 }
550554 }
551555}
556+
557+ impl < ' a > ReplicaSet < ' a > {
558+ pub fn into_replicas_ordered ( self ) -> ReplicasOrdered < ' a > {
559+ ReplicasOrdered { replica_set : self }
560+ }
561+ }
562+
563+ /// Represents a sequence of replicas for a given token and strategy,
564+ /// ordered according to the ring order.
565+ ///
566+ /// This container can only be created by calling `ReplicaSet::into_replicas_ordered()`,
567+ /// and either it can borrow precomputed replica lists living in the locator (in case of SimpleStrategy)
568+ /// or it must compute them on-demand (in case of NetworkTopologyStrategy).
569+ /// The computation is lazy (performed by `ReplicasOrderedIterator` upon call to `next()`).
570+ /// For obtaining the primary replica, no allocations are needed. Therefore, the first call
571+ /// to `next()` is optimised and doesn not allocate.
572+ /// For the remaining others, unfortunately, allocation is unevitable.
573+ pub struct ReplicasOrdered < ' a > {
574+ replica_set : ReplicaSet < ' a > ,
575+ }
576+
577+ /// Iterator that returns replicas from some replica sequence, ordered according to the ring order.
578+ pub struct ReplicasOrderedIterator < ' a > {
579+ inner : ReplicasOrderedIteratorInner < ' a > ,
580+ }
581+
582+ enum ReplicasOrderedIteratorInner < ' a > {
583+ AlreadyRingOrdered {
584+ // In case of Plain and FilteredSimple variants, ReplicaSetIterator respects ring order.
585+ replica_set_iter : ReplicaSetIterator < ' a > ,
586+ } ,
587+ PolyDatacenterNTS {
588+ // In case of ChainedNTS variant, ReplicaSetIterator does not respect ring order,
589+ // so specific code is needed to yield replicas according to that order.
590+ replicas_ordered_iter : ReplicasOrderedNTSIterator < ' a > ,
591+ } ,
592+ }
593+
594+ enum ReplicasOrderedNTSIterator < ' a > {
595+ FreshForPick {
596+ datacenter_repfactors : & ' a HashMap < String , usize > ,
597+ locator : & ' a ReplicaLocator ,
598+ token : Token ,
599+ } ,
600+ Picked {
601+ datacenter_repfactors : & ' a HashMap < String , usize > ,
602+ locator : & ' a ReplicaLocator ,
603+ token : Token ,
604+ picked : NodeRef < ' a > ,
605+ } ,
606+ ComputedFallback {
607+ replicas : ReplicasArray < ' a > ,
608+ idx : usize ,
609+ } ,
610+ }
611+
612+ impl < ' a > Iterator for ReplicasOrderedNTSIterator < ' a > {
613+ type Item = NodeRef < ' a > ;
614+
615+ fn next ( & mut self ) -> Option < Self :: Item > {
616+ match * self {
617+ Self :: FreshForPick {
618+ datacenter_repfactors,
619+ locator,
620+ token,
621+ } => {
622+ // We're going to find the primary replica for the given token.
623+ let nodes_on_ring = locator. replication_data . get_global_ring ( ) . ring_range ( token) ;
624+ for node in nodes_on_ring {
625+ // If this node's DC has some replicas in this NTS...
626+ if let Some ( dc) = & node. datacenter {
627+ if datacenter_repfactors. get ( dc) . is_some ( ) {
628+ // ...then this node must be the primary replica.
629+ * self = Self :: Picked {
630+ datacenter_repfactors,
631+ locator,
632+ token,
633+ picked : node,
634+ } ;
635+ return Some ( node) ;
636+ }
637+ }
638+ }
639+ None
640+ }
641+ Self :: Picked {
642+ datacenter_repfactors,
643+ locator,
644+ token,
645+ picked,
646+ } => {
647+ // Clippy can't check that in Eq and Hash impls we don't actually use any field with interior mutability
648+ // (in Node only `down_marker` is such, being an AtomicBool).
649+ // https://rust-lang.github.io/rust-clippy/master/index.html#mutable_key_type
650+ #[ allow( clippy:: mutable_key_type) ]
651+ let mut all_replicas: HashSet < & ' a Arc < Node > > = HashSet :: new ( ) ;
652+ for ( datacenter, repfactor) in datacenter_repfactors. iter ( ) {
653+ all_replicas. extend (
654+ locator
655+ . get_network_strategy_replicas ( token, datacenter, * repfactor)
656+ . iter ( ) ,
657+ ) ;
658+ }
659+ // It's no use returning a node that was already picked.
660+ all_replicas. remove ( picked) ;
661+
662+ let mut replicas_ordered = vec ! [ ] ;
663+ let nodes_on_ring = locator. replication_data . get_global_ring ( ) . ring_range ( token) ;
664+ for node in nodes_on_ring {
665+ if all_replicas. is_empty ( ) {
666+ // All replicas were put in order.
667+ break ;
668+ }
669+ if all_replicas. remove ( node) {
670+ replicas_ordered. push ( node) ;
671+ }
672+ }
673+ assert ! (
674+ all_replicas. is_empty( ) ,
675+ "all_replicas somehow contained a node that wasn't present in the global ring!"
676+ ) ;
677+
678+ * self = Self :: ComputedFallback {
679+ replicas : ReplicasArray :: Owned ( replicas_ordered) ,
680+ idx : 0 ,
681+ } ;
682+ self . next ( )
683+ }
684+ Self :: ComputedFallback {
685+ ref replicas,
686+ ref mut idx,
687+ } => {
688+ if let Some ( replica) = replicas. get ( * idx) {
689+ * idx += 1 ;
690+ Some ( replica)
691+ } else {
692+ None
693+ }
694+ }
695+ }
696+ }
697+ }
698+
699+ impl < ' a > Iterator for ReplicasOrderedIterator < ' a > {
700+ type Item = NodeRef < ' a > ;
701+
702+ fn next ( & mut self ) -> Option < Self :: Item > {
703+ match & mut self . inner {
704+ ReplicasOrderedIteratorInner :: AlreadyRingOrdered { replica_set_iter } => {
705+ replica_set_iter. next ( )
706+ }
707+ ReplicasOrderedIteratorInner :: PolyDatacenterNTS {
708+ replicas_ordered_iter,
709+ } => replicas_ordered_iter. next ( ) ,
710+ }
711+ }
712+ }
713+
714+ impl < ' a > IntoIterator for ReplicasOrdered < ' a > {
715+ type Item = NodeRef < ' a > ;
716+ type IntoIter = ReplicasOrderedIterator < ' a > ;
717+
718+ fn into_iter ( self ) -> Self :: IntoIter {
719+ let Self { replica_set } = self ;
720+ Self :: IntoIter {
721+ inner : match replica_set. inner {
722+ ReplicaSetInner :: Plain ( _) | ReplicaSetInner :: FilteredSimple { .. } => {
723+ ReplicasOrderedIteratorInner :: AlreadyRingOrdered {
724+ replica_set_iter : replica_set. into_iter ( ) ,
725+ }
726+ }
727+ ReplicaSetInner :: ChainedNTS {
728+ datacenter_repfactors,
729+ locator,
730+ token,
731+ } => ReplicasOrderedIteratorInner :: PolyDatacenterNTS {
732+ replicas_ordered_iter : ReplicasOrderedNTSIterator :: FreshForPick {
733+ datacenter_repfactors,
734+ locator,
735+ token,
736+ } ,
737+ } ,
738+ } ,
739+ }
740+ }
741+ }
742+
743+ #[ cfg( test) ]
744+ mod tests {
745+ use crate :: { routing:: Token , transport:: locator:: test:: * } ;
746+
747+ #[ tokio:: test]
748+ async fn test_replicas_ordered ( ) {
749+ let metadata = mock_metadata_for_token_aware_tests ( ) ;
750+ let locator = create_locator ( & metadata) ;
751+
752+ // For each case (token, limit_to_dc, strategy), we are checking
753+ // that ReplicasOrdered yields replicas in the expected order.
754+ let check = |token, limit_to_dc, strategy, expected| {
755+ let replica_set =
756+ locator. replicas_for_token ( Token { value : token } , strategy, limit_to_dc) ;
757+ let replicas_ordered = replica_set. into_replicas_ordered ( ) ;
758+ let ids: Vec < _ > = replicas_ordered
759+ . into_iter ( )
760+ . map ( |node| node. address . port ( ) )
761+ . collect ( ) ;
762+ assert_eq ! ( expected, ids) ;
763+ } ;
764+
765+ // In all these tests:
766+ // going through the ring, we get order: F , A , C , D , G , B , E
767+ // us eu eu us eu eu us
768+ // r2 r1 r1 r1 r2 r1 r1
769+ check (
770+ 160 ,
771+ None ,
772+ & metadata. keyspaces . get ( KEYSPACE_NTS_RF_3 ) . unwrap ( ) . strategy ,
773+ vec ! [ F , A , C , D , G , E ] ,
774+ ) ;
775+ check (
776+ 160 ,
777+ None ,
778+ & metadata. keyspaces . get ( KEYSPACE_NTS_RF_2 ) . unwrap ( ) . strategy ,
779+ vec ! [ F , A , D , G ] ,
780+ ) ;
781+ check (
782+ 160 ,
783+ None ,
784+ & metadata. keyspaces . get ( KEYSPACE_SS_RF_2 ) . unwrap ( ) . strategy ,
785+ vec ! [ F , A ] ,
786+ ) ;
787+
788+ check (
789+ 160 ,
790+ Some ( "eu" ) ,
791+ & metadata. keyspaces . get ( KEYSPACE_NTS_RF_3 ) . unwrap ( ) . strategy ,
792+ vec ! [ A , C , G ] ,
793+ ) ;
794+ check (
795+ 160 ,
796+ Some ( "us" ) ,
797+ & metadata. keyspaces . get ( KEYSPACE_NTS_RF_3 ) . unwrap ( ) . strategy ,
798+ vec ! [ F , D , E ] ,
799+ ) ;
800+ check (
801+ 160 ,
802+ Some ( "eu" ) ,
803+ & metadata. keyspaces . get ( KEYSPACE_SS_RF_2 ) . unwrap ( ) . strategy ,
804+ vec ! [ A ] ,
805+ ) ;
806+ }
807+ }
0 commit comments