Skip to content

Commit 920e6df

Browse files
committed
locator: implement ReplicasOrdered (ring-wise)
In order to be consistent with LWT optimisation implementation in other drivers, it is mandatory to return query plan consisting of replicas put in ring order. To accomplish that, the (abstractly unordered) ReplicaSet becomes convertible into a newly defined ReplicasOrdered struct. Iterating over ReplicasOrdered yields replicas in ring order, so policies can depend on it to perform LWT optimisation correctly. Note that the computation of ReplicasOrdered is lazy and performed in two steps. - the first step is relatively cheap and free of allocations and involves finding the primary replica by iterating over the global ring. It is triggered by the first call to ReplicasOrderedIterator::next(). - the second step is relatively expensive and involves allocations. It computes the whole ReplicaArray of all replicas, put in the ring over. Precomputation is not an option, as it would be very expensive to compute and store all precomputed arrays that would respect the global ring order (as opposed to only local, per-DC ring order).
1 parent 88f4605 commit 920e6df

File tree

1 file changed

+258
-2
lines changed
  • scylla/src/transport/locator

1 file changed

+258
-2
lines changed

scylla/src/transport/locator/mod.rs

Lines changed: 258 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ use itertools::Itertools;
1414
use precomputed_replicas::PrecomputedReplicas;
1515
use replicas::{ReplicasArray, EMPTY_REPLICAS};
1616
use replication_info::ReplicationInfo;
17-
use std::{cmp, collections::HashMap, sync::Arc};
17+
use std::{
18+
cmp,
19+
collections::{HashMap, HashSet},
20+
sync::Arc,
21+
};
1822
use tracing::debug;
1923

2024
/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication
@@ -376,7 +380,7 @@ impl<'a> IntoIterator for ReplicaSet<'a> {
376380
locator,
377381
token,
378382
} => {
379-
if let Some(datacenter) = &locator.datacenters.first() {
383+
if let Some(datacenter) = locator.datacenters.first() {
380384
let repfactor = *datacenter_repfactors.get(datacenter.as_str()).unwrap_or(&0);
381385
ReplicaSetIteratorInner::ChainedNTS {
382386
replicas: locator
@@ -549,3 +553,255 @@ impl<'a> Iterator for ReplicaSetIterator<'a> {
549553
}
550554
}
551555
}
556+
557+
impl<'a> ReplicaSet<'a> {
558+
pub fn into_replicas_ordered(self) -> ReplicasOrdered<'a> {
559+
ReplicasOrdered { replica_set: self }
560+
}
561+
}
562+
563+
/// Represents a sequence of replicas for a given token and strategy,
564+
/// ordered according to the ring order.
565+
///
566+
/// This container can only be created by calling `ReplicaSet::into_replicas_ordered()`,
567+
/// and either it can borrow precomputed replica lists living in the locator (in case of SimpleStrategy)
568+
/// or it must compute them on-demand (in case of NetworkTopologyStrategy).
569+
/// The computation is lazy (performed by `ReplicasOrderedIterator` upon call to `next()`).
570+
/// For obtaining the primary replica, no allocations are needed. Therefore, the first call
571+
/// to `next()` is optimised and doesn not allocate.
572+
/// For the remaining others, unfortunately, allocation is unevitable.
573+
pub struct ReplicasOrdered<'a> {
574+
replica_set: ReplicaSet<'a>,
575+
}
576+
577+
/// Iterator that returns replicas from some replica sequence, ordered according to the ring order.
578+
pub struct ReplicasOrderedIterator<'a> {
579+
inner: ReplicasOrderedIteratorInner<'a>,
580+
}
581+
582+
enum ReplicasOrderedIteratorInner<'a> {
583+
AlreadyRingOrdered {
584+
// In case of Plain and FilteredSimple variants, ReplicaSetIterator respects ring order.
585+
replica_set_iter: ReplicaSetIterator<'a>,
586+
},
587+
PolyDatacenterNTS {
588+
// In case of ChainedNTS variant, ReplicaSetIterator does not respect ring order,
589+
// so specific code is needed to yield replicas according to that order.
590+
replicas_ordered_iter: ReplicasOrderedNTSIterator<'a>,
591+
},
592+
}
593+
594+
enum ReplicasOrderedNTSIterator<'a> {
595+
FreshForPick {
596+
datacenter_repfactors: &'a HashMap<String, usize>,
597+
locator: &'a ReplicaLocator,
598+
token: Token,
599+
},
600+
Picked {
601+
datacenter_repfactors: &'a HashMap<String, usize>,
602+
locator: &'a ReplicaLocator,
603+
token: Token,
604+
picked: NodeRef<'a>,
605+
},
606+
ComputedFallback {
607+
replicas: ReplicasArray<'a>,
608+
idx: usize,
609+
},
610+
}
611+
612+
impl<'a> Iterator for ReplicasOrderedNTSIterator<'a> {
613+
type Item = NodeRef<'a>;
614+
615+
fn next(&mut self) -> Option<Self::Item> {
616+
match *self {
617+
Self::FreshForPick {
618+
datacenter_repfactors,
619+
locator,
620+
token,
621+
} => {
622+
// We're going to find the primary replica for the given token.
623+
let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token);
624+
for node in nodes_on_ring {
625+
// If this node's DC has some replicas in this NTS...
626+
if let Some(dc) = &node.datacenter {
627+
if datacenter_repfactors.get(dc).is_some() {
628+
// ...then this node must be the primary replica.
629+
*self = Self::Picked {
630+
datacenter_repfactors,
631+
locator,
632+
token,
633+
picked: node,
634+
};
635+
return Some(node);
636+
}
637+
}
638+
}
639+
None
640+
}
641+
Self::Picked {
642+
datacenter_repfactors,
643+
locator,
644+
token,
645+
picked,
646+
} => {
647+
// Clippy can't check that in Eq and Hash impls we don't actually use any field with interior mutability
648+
// (in Node only `down_marker` is such, being an AtomicBool).
649+
// https://rust-lang.github.io/rust-clippy/master/index.html#mutable_key_type
650+
#[allow(clippy::mutable_key_type)]
651+
let mut all_replicas: HashSet<&'a Arc<Node>> = HashSet::new();
652+
for (datacenter, repfactor) in datacenter_repfactors.iter() {
653+
all_replicas.extend(
654+
locator
655+
.get_network_strategy_replicas(token, datacenter, *repfactor)
656+
.iter(),
657+
);
658+
}
659+
// It's no use returning a node that was already picked.
660+
all_replicas.remove(picked);
661+
662+
let mut replicas_ordered = vec![];
663+
let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token);
664+
for node in nodes_on_ring {
665+
if all_replicas.is_empty() {
666+
// All replicas were put in order.
667+
break;
668+
}
669+
if all_replicas.remove(node) {
670+
replicas_ordered.push(node);
671+
}
672+
}
673+
assert!(
674+
all_replicas.is_empty(),
675+
"all_replicas somehow contained a node that wasn't present in the global ring!"
676+
);
677+
678+
*self = Self::ComputedFallback {
679+
replicas: ReplicasArray::Owned(replicas_ordered),
680+
idx: 0,
681+
};
682+
self.next()
683+
}
684+
Self::ComputedFallback {
685+
ref replicas,
686+
ref mut idx,
687+
} => {
688+
if let Some(replica) = replicas.get(*idx) {
689+
*idx += 1;
690+
Some(replica)
691+
} else {
692+
None
693+
}
694+
}
695+
}
696+
}
697+
}
698+
699+
impl<'a> Iterator for ReplicasOrderedIterator<'a> {
700+
type Item = NodeRef<'a>;
701+
702+
fn next(&mut self) -> Option<Self::Item> {
703+
match &mut self.inner {
704+
ReplicasOrderedIteratorInner::AlreadyRingOrdered { replica_set_iter } => {
705+
replica_set_iter.next()
706+
}
707+
ReplicasOrderedIteratorInner::PolyDatacenterNTS {
708+
replicas_ordered_iter,
709+
} => replicas_ordered_iter.next(),
710+
}
711+
}
712+
}
713+
714+
impl<'a> IntoIterator for ReplicasOrdered<'a> {
715+
type Item = NodeRef<'a>;
716+
type IntoIter = ReplicasOrderedIterator<'a>;
717+
718+
fn into_iter(self) -> Self::IntoIter {
719+
let Self { replica_set } = self;
720+
Self::IntoIter {
721+
inner: match replica_set.inner {
722+
ReplicaSetInner::Plain(_) | ReplicaSetInner::FilteredSimple { .. } => {
723+
ReplicasOrderedIteratorInner::AlreadyRingOrdered {
724+
replica_set_iter: replica_set.into_iter(),
725+
}
726+
}
727+
ReplicaSetInner::ChainedNTS {
728+
datacenter_repfactors,
729+
locator,
730+
token,
731+
} => ReplicasOrderedIteratorInner::PolyDatacenterNTS {
732+
replicas_ordered_iter: ReplicasOrderedNTSIterator::FreshForPick {
733+
datacenter_repfactors,
734+
locator,
735+
token,
736+
},
737+
},
738+
},
739+
}
740+
}
741+
}
742+
743+
#[cfg(test)]
744+
mod tests {
745+
use crate::{routing::Token, transport::locator::test::*};
746+
747+
#[tokio::test]
748+
async fn test_replicas_ordered() {
749+
let metadata = mock_metadata_for_token_aware_tests();
750+
let locator = create_locator(&metadata);
751+
752+
// For each case (token, limit_to_dc, strategy), we are checking
753+
// that ReplicasOrdered yields replicas in the expected order.
754+
let check = |token, limit_to_dc, strategy, expected| {
755+
let replica_set =
756+
locator.replicas_for_token(Token { value: token }, strategy, limit_to_dc);
757+
let replicas_ordered = replica_set.into_replicas_ordered();
758+
let ids: Vec<_> = replicas_ordered
759+
.into_iter()
760+
.map(|node| node.address.port())
761+
.collect();
762+
assert_eq!(expected, ids);
763+
};
764+
765+
// In all these tests:
766+
// going through the ring, we get order: F , A , C , D , G , B , E
767+
// us eu eu us eu eu us
768+
// r2 r1 r1 r1 r2 r1 r1
769+
check(
770+
160,
771+
None,
772+
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
773+
vec![F, A, C, D, G, E],
774+
);
775+
check(
776+
160,
777+
None,
778+
&metadata.keyspaces.get(KEYSPACE_NTS_RF_2).unwrap().strategy,
779+
vec![F, A, D, G],
780+
);
781+
check(
782+
160,
783+
None,
784+
&metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy,
785+
vec![F, A],
786+
);
787+
788+
check(
789+
160,
790+
Some("eu"),
791+
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
792+
vec![A, C, G],
793+
);
794+
check(
795+
160,
796+
Some("us"),
797+
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
798+
vec![F, D, E],
799+
);
800+
check(
801+
160,
802+
Some("eu"),
803+
&metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy,
804+
vec![A],
805+
);
806+
}
807+
}

0 commit comments

Comments
 (0)