Skip to content

Commit f72f9f2

Browse files
committed
load_balancing: add a fallback to token-aware policy
Fallback load balancing plan is chained to the primary plan. Generating this fallback is realized with a `plan` method of token-aware's child policy (instead of `apply_child_policy`). It was done to avoid unnecessary `Arc` cloning required to construct `plan` parameter for `apply_child_policy()`.
1 parent 6535c38 commit f72f9f2

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

scylla/src/transport/load_balancing/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@ pub struct Statement<'a> {
2323
pub keyspace: Option<&'a str>,
2424
}
2525

26+
impl<'a> Statement<'a> {
27+
fn empty() -> Self {
28+
Self {
29+
token: None,
30+
keyspace: None,
31+
}
32+
}
33+
}
34+
2635
pub type Plan<'a> = Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a>;
2736

2837
/// Policy that decides which nodes to contact for each query

scylla/src/transport/load_balancing/token_aware.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use crate::routing::Token;
33
use crate::transport::topology::Strategy;
44
use crate::transport::{cluster::ClusterData, node::Node};
55
use itertools::Itertools;
6+
use std::collections::HashSet;
7+
use std::net::SocketAddr;
68
use std::{collections::HashMap, sync::Arc};
79
use tracing::trace;
810

@@ -138,7 +140,20 @@ impl LoadBalancingPolicy for TokenAwarePolicy {
138140
"TokenAware"
139141
);
140142

141-
self.child_policy.apply_child_policy(replicas)
143+
let fallback_plan = {
144+
let replicas_set: HashSet<SocketAddr> =
145+
replicas.iter().map(|node| node.address).collect();
146+
147+
self.child_policy
148+
.plan(&Statement::empty(), cluster)
149+
.filter(move |node| !replicas_set.contains(&node.address))
150+
};
151+
152+
let plan = self
153+
.child_policy
154+
.apply_child_policy(replicas)
155+
.chain(fallback_plan);
156+
Box::new(plan)
142157
}
143158
// fallback to child policy
144159
None => {

0 commit comments

Comments
 (0)