Skip to content

Commit 4ad601a

Browse files
authored
Merge pull request #405 from havaker/fallback
Load balancing fallback
2 parents 13c0b4d + 87453ba commit 4ad601a

File tree

4 files changed

+69
-44
lines changed

4 files changed

+69
-44
lines changed

scylla/src/transport/load_balancing/dc_aware_round_robin.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{ChildLoadBalancingPolicy, LoadBalancingPolicy, Statement};
1+
use super::{ChildLoadBalancingPolicy, LoadBalancingPolicy, Plan, Statement};
22
use crate::transport::{cluster::ClusterData, node::Node};
33
use std::sync::{
44
atomic::{AtomicUsize, Ordering},
@@ -51,11 +51,7 @@ const EMPTY_NODE_LIST: &Vec<Arc<Node>> = &vec![];
5151
const ORDER_TYPE: Ordering = Ordering::Relaxed;
5252

5353
impl LoadBalancingPolicy for DcAwareRoundRobinPolicy {
54-
fn plan<'a>(
55-
&self,
56-
_statement: &Statement,
57-
cluster: &'a ClusterData,
58-
) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a> {
54+
fn plan<'a>(&self, _statement: &Statement, cluster: &'a ClusterData) -> Plan<'a> {
5955
let index = self.index.fetch_add(1, ORDER_TYPE);
6056

6157
let local_nodes = self.retrieve_local_nodes(cluster);
@@ -123,6 +119,7 @@ mod tests {
123119
use super::*;
124120

125121
use crate::transport::load_balancing::tests;
122+
use std::collections::HashSet;
126123

127124
#[tokio::test]
128125
async fn test_dc_aware_round_robin_policy() {
@@ -131,23 +128,27 @@ mod tests {
131128
let local_dc = "eu".to_string();
132129
let policy = DcAwareRoundRobinPolicy::new(local_dc);
133130

134-
let plans = (0..4)
131+
let plans = (0..32)
135132
.map(|_| {
136133
tests::get_plan_and_collect_node_identifiers(
137134
&policy,
138135
&tests::EMPTY_STATEMENT,
139136
&cluster,
140137
)
141138
})
142-
.collect::<Vec<_>>();
139+
.collect::<HashSet<_>>();
143140

144141
let expected_plans = vec![
145142
vec![1, 2, 3, 4, 5],
143+
vec![1, 2, 3, 5, 4],
146144
vec![2, 3, 1, 5, 4],
145+
vec![2, 3, 1, 4, 5],
147146
vec![3, 1, 2, 4, 5],
148-
vec![1, 2, 3, 5, 4],
149-
];
147+
vec![3, 1, 2, 5, 4],
148+
]
149+
.into_iter()
150+
.collect::<HashSet<_>>();
150151

151-
assert_eq!(plans, expected_plans);
152+
assert_eq!(expected_plans, plans);
152153
}
153154
}

scylla/src/transport/load_balancing/mod.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use super::{cluster::ClusterData, node::Node};
77
use crate::routing::Token;
88

9-
use std::sync::Arc;
9+
use std::{collections::hash_map::DefaultHasher, hash::Hasher, sync::Arc};
1010

1111
mod dc_aware_round_robin;
1212
mod round_robin;
@@ -23,14 +23,21 @@ pub struct Statement<'a> {
2323
pub keyspace: Option<&'a str>,
2424
}
2525

26+
impl<'a> Statement<'a> {
27+
fn empty() -> Self {
28+
Self {
29+
token: None,
30+
keyspace: None,
31+
}
32+
}
33+
}
34+
35+
pub type Plan<'a> = Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a>;
36+
2637
/// Policy that decides which nodes to contact for each query
2738
pub trait LoadBalancingPolicy: Send + Sync {
2839
/// It is used for each query to find which nodes to query first
29-
fn plan<'a>(
30-
&self,
31-
statement: &Statement,
32-
cluster: &'a ClusterData,
33-
) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a>;
40+
fn plan<'a>(&self, statement: &Statement, cluster: &'a ClusterData) -> Plan<'a>;
3441

3542
/// Returns name of load balancing policy
3643
fn name(&self) -> String;
@@ -46,10 +53,21 @@ pub trait ChildLoadBalancingPolicy: LoadBalancingPolicy {
4653
) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync>;
4754
}
4855

49-
// Does safe modulo
50-
fn compute_rotation(index: usize, count: usize) -> usize {
51-
if count != 0 {
52-
index % count
56+
// Hashing round robin's index is a mitigation to problems that occur when a
57+
// `RoundRobin::apply_child_policy()` is called twice by a parent policy.
58+
fn round_robin_index_hash(index: usize) -> u64 {
59+
let mut hasher = DefaultHasher::new();
60+
hasher.write_usize(index);
61+
62+
hasher.finish()
63+
}
64+
65+
// Does safe modulo and additionally hashes the index
66+
fn compute_rotation(round_robin_index: usize, sequence_length: usize) -> usize {
67+
if sequence_length > 1 {
68+
let hash = round_robin_index_hash(round_robin_index);
69+
70+
(hash % sequence_length as u64) as usize
5371
} else {
5472
0
5573
}

scylla/src/transport/load_balancing/round_robin.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{ChildLoadBalancingPolicy, LoadBalancingPolicy, Statement};
1+
use super::{ChildLoadBalancingPolicy, LoadBalancingPolicy, Plan, Statement};
22
use crate::transport::{cluster::ClusterData, node::Node};
33
use std::sync::{
44
atomic::{AtomicUsize, Ordering},
@@ -28,11 +28,7 @@ impl Default for RoundRobinPolicy {
2828
const ORDER_TYPE: Ordering = Ordering::Relaxed;
2929

3030
impl LoadBalancingPolicy for RoundRobinPolicy {
31-
fn plan<'a>(
32-
&self,
33-
_statement: &Statement,
34-
cluster: &'a ClusterData,
35-
) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a> {
31+
fn plan<'a>(&self, _statement: &Statement, cluster: &'a ClusterData) -> Plan<'a> {
3632
let index = self.index.fetch_add(1, ORDER_TYPE);
3733

3834
let nodes_count = cluster.all_nodes.len();
@@ -75,6 +71,7 @@ mod tests {
7571
use super::*;
7672

7773
use crate::transport::load_balancing::tests;
74+
use std::collections::HashSet;
7875

7976
// ConnectionKeeper (which lives in Node) requires context of Tokio runtime
8077
#[tokio::test]
@@ -83,25 +80,27 @@ mod tests {
8380

8481
let policy = RoundRobinPolicy::new();
8582

86-
let plans = (0..6)
83+
let plans = (0..16)
8784
.map(|_| {
8885
tests::get_plan_and_collect_node_identifiers(
8986
&policy,
9087
&tests::EMPTY_STATEMENT,
9188
&cluster,
9289
)
9390
})
94-
.collect::<Vec<_>>();
91+
.collect::<HashSet<_>>();
9592

93+
// Check if `plans` contains all possible round robin plans
9694
let expected_plans = vec![
9795
vec![1, 2, 3, 4, 5],
9896
vec![2, 3, 4, 5, 1],
9997
vec![3, 4, 5, 1, 2],
10098
vec![4, 5, 1, 2, 3],
10199
vec![5, 1, 2, 3, 4],
102-
vec![1, 2, 3, 4, 5],
103-
];
100+
]
101+
.into_iter()
102+
.collect::<HashSet<Vec<_>>>();
104103

105-
assert_eq!(plans, expected_plans);
104+
assert_eq!(expected_plans, plans);
106105
}
107106
}

scylla/src/transport/load_balancing/token_aware.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
use super::{ChildLoadBalancingPolicy, LoadBalancingPolicy, Statement};
1+
use super::{ChildLoadBalancingPolicy, LoadBalancingPolicy, Plan, Statement};
22
use crate::routing::Token;
33
use crate::transport::topology::Strategy;
44
use crate::transport::{cluster::ClusterData, node::Node};
55
use itertools::Itertools;
6+
use std::collections::HashSet;
7+
use std::net::SocketAddr;
68
use std::{collections::HashMap, sync::Arc};
79
use tracing::trace;
810

@@ -103,11 +105,7 @@ impl TokenAwarePolicy {
103105
}
104106

105107
impl LoadBalancingPolicy for TokenAwarePolicy {
106-
fn plan<'a>(
107-
&self,
108-
statement: &Statement,
109-
cluster: &'a ClusterData,
110-
) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a> {
108+
fn plan<'a>(&self, statement: &Statement, cluster: &'a ClusterData) -> Plan<'a> {
111109
match statement.token {
112110
Some(token) => {
113111
let keyspace = statement.keyspace.and_then(|k| cluster.keyspaces.get(k));
@@ -142,7 +140,20 @@ impl LoadBalancingPolicy for TokenAwarePolicy {
142140
"TokenAware"
143141
);
144142

145-
self.child_policy.apply_child_policy(replicas)
143+
let fallback_plan = {
144+
let replicas_set: HashSet<SocketAddr> =
145+
replicas.iter().map(|node| node.address).collect();
146+
147+
self.child_policy
148+
.plan(&Statement::empty(), cluster)
149+
.filter(move |node| !replicas_set.contains(&node.address))
150+
};
151+
152+
let plan = self
153+
.child_policy
154+
.apply_child_policy(replicas)
155+
.chain(fallback_plan);
156+
Box::new(plan)
146157
}
147158
// fallback to child policy
148159
None => {
@@ -422,11 +433,7 @@ mod tests {
422433
struct DumbPolicy {}
423434

424435
impl LoadBalancingPolicy for DumbPolicy {
425-
fn plan<'a>(
426-
&self,
427-
_: &Statement,
428-
_: &'a ClusterData,
429-
) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a> {
436+
fn plan<'a>(&self, _: &Statement, _: &'a ClusterData) -> Plan<'a> {
430437
let empty_node_list: Vec<Arc<Node>> = Vec::new();
431438

432439
Box::new(empty_node_list.into_iter())

0 commit comments

Comments
 (0)