Skip to content

Commit 88f4605

Browse files
committed
{lb,plan}: call fallback() even if pick() is None
We want to keep regarding pick() as a cheap, happy path function, which does not allocate. In LWT optimisation, however, only the primary replica can be computed without allocations (due to specifics of Network Topology Strategy, replicas are not assigned greedily, but their distribution across racks is being balanced). If a policy (in our case, the default policy) recognizes that a picked replica is down, it would try to pick another, and this computation would be expensive for ReplicasOrdered. Instead, having recognized during LWT optimised case that a picked replica is down, the policy returns None from pick() to hint that further computation will be expensive. The plan logic is hence altered to call fallback() even if pick() returns None. In the non-LWT case, as subsequent calls to pick() are still cheap, pick() will still try to find next replicas if one is recognized to be down. A test is added. It asserts that `fallback()` is called if `pick()` returned None. For the test to be possible to be written, a convenience constructor is added for `Node` under `cfg(test)`.
1 parent c2091df commit 88f4605

File tree

3 files changed

+104
-6
lines changed

3 files changed

+104
-6
lines changed

scylla/src/transport/load_balancing/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,8 @@ pub type FallbackPlan<'a> = Box<dyn Iterator<Item = NodeRef<'a>> + Send + Sync +
5353
/// `pick` and `fallback`. `pick` returns a first node to contact for a given query, `fallback`
5454
/// returns the rest of the load balancing plan.
5555
///
56-
/// `fallback` is called only after a failed send to `pick`ed node (or when executing
57-
/// speculatively).
58-
/// If a `pick` returns `None`, `fallback` will not be called.
56+
/// `fallback` is called not only if a send to `pick`ed node failed (or when executing
57+
/// speculatively), but also if `pick` returns `None`.
5958
///
6059
/// Usually the driver needs only the first node from load balancing plan (most queries are send
6160
/// successfully, and there is no need to retry).

scylla/src/transport/load_balancing/plan.rs

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,24 @@ impl<'a> Iterator for Plan<'a> {
5454
self.state = PlanState::Picked(picked);
5555
Some(picked)
5656
} else {
57-
error!("Load balancing policy returned an empty plan! The query cannot be executed. Routing info: {:?}", self.routing_info);
58-
self.state = PlanState::PickedNone;
59-
None
57+
// `pick()` returned None, which semantically means that a first node cannot be computed _cheaply_.
58+
// This, however, does not imply that fallback would return an empty plan, too.
59+
// For instance, as a side effect of LWT optimisation in Default Policy, pick() may return None
60+
// when the primary replica is down. `fallback()` will nevertheless return the remaining replicas,
61+
// if there are such.
62+
let mut iter = self.policy.fallback(self.routing_info, self.cluster);
63+
let first_fallback_node = iter.next();
64+
if let Some(node) = first_fallback_node {
65+
self.state = PlanState::Fallback {
66+
iter,
67+
node_to_filter_out: node,
68+
};
69+
Some(node)
70+
} else {
71+
error!("Load balancing policy returned an empty plan! The query cannot be executed. Routing info: {:?}", self.routing_info);
72+
self.state = PlanState::PickedNone;
73+
None
74+
}
6075
}
6176
}
6277
PlanState::Picked(node) => {
@@ -85,3 +100,65 @@ impl<'a> Iterator for Plan<'a> {
85100
}
86101
}
87102
}
103+
104+
#[cfg(test)]
105+
mod tests {
106+
use std::{net::SocketAddr, str::FromStr, sync::Arc};
107+
108+
use crate::transport::{
109+
locator::test::{create_locator, mock_metadata_for_token_aware_tests},
110+
Node, NodeAddr,
111+
};
112+
113+
use super::*;
114+
115+
fn expected_nodes() -> Vec<Arc<Node>> {
116+
vec![Arc::new(Node::new_for_test(
117+
NodeAddr::Translatable(SocketAddr::from_str("127.0.0.1:9042").unwrap()),
118+
None,
119+
None,
120+
))]
121+
}
122+
123+
#[derive(Debug)]
124+
struct PickingNonePolicy {
125+
expected_nodes: Vec<Arc<Node>>,
126+
}
127+
impl LoadBalancingPolicy for PickingNonePolicy {
128+
fn pick<'a>(
129+
&'a self,
130+
_query: &'a RoutingInfo,
131+
_cluster: &'a ClusterData,
132+
) -> Option<NodeRef<'a>> {
133+
None
134+
}
135+
136+
fn fallback<'a>(
137+
&'a self,
138+
_query: &'a RoutingInfo,
139+
_cluster: &'a ClusterData,
140+
) -> FallbackPlan<'a> {
141+
Box::new(self.expected_nodes.iter())
142+
}
143+
144+
fn name(&self) -> String {
145+
"PickingNone".into()
146+
}
147+
}
148+
149+
#[tokio::test]
150+
async fn plan_calls_fallback_even_if_pick_returned_none() {
151+
let policy = PickingNonePolicy {
152+
expected_nodes: expected_nodes(),
153+
};
154+
let locator = create_locator(&mock_metadata_for_token_aware_tests());
155+
let cluster_data = ClusterData {
156+
known_peers: Default::default(),
157+
keyspaces: Default::default(),
158+
locator,
159+
};
160+
let routing_info = RoutingInfo::default();
161+
let plan = Plan::new(&policy, &routing_info, &cluster_data);
162+
assert_eq!(Vec::from_iter(plan.cloned()), policy.expected_nodes);
163+
}
164+
}

scylla/src/transport/node.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,3 +214,25 @@ impl Hash for Node {
214214
self.host_id.hash(state);
215215
}
216216
}
217+
218+
#[cfg(test)]
219+
mod tests {
220+
use super::*;
221+
222+
impl Node {
223+
pub(crate) fn new_for_test(
224+
address: NodeAddr,
225+
datacenter: Option<String>,
226+
rack: Option<String>,
227+
) -> Self {
228+
Self {
229+
host_id: Uuid::new_v4(),
230+
address,
231+
datacenter,
232+
rack,
233+
pool: None,
234+
down_marker: false.into(),
235+
}
236+
}
237+
}
238+
}

0 commit comments

Comments
 (0)