Skip to content

Commit dcb1256

Browse files
committed
default_policy: extract code into MinAvgUpdater
This is a preparation step for allowing explicit control over update occurences, for test purposes.
1 parent 3e86592 commit dcb1256

File tree

1 file changed

+43
-23
lines changed

1 file changed

+43
-23
lines changed

scylla/src/transport/load_balancing/default.rs

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1533,37 +1533,22 @@ mod latency_awareness {
15331533
minimum_measurements: usize,
15341534
) -> Self {
15351535
let min_latency = Arc::new(AtomicDuration::new());
1536-
let mut update_scheduler = tokio::time::interval(update_rate);
15371536

15381537
let min_latency_clone = min_latency.clone();
15391538
let node_avgs = Arc::new(RwLock::new(HashMap::new()));
15401539
let node_avgs_clone = node_avgs.clone();
15411540

1541+
let updater = MinAvgUpdater {
1542+
node_avgs,
1543+
min_latency,
1544+
minimum_measurements,
1545+
};
1546+
15421547
let (updater_fut, updater_handle) = async move {
1548+
let mut update_scheduler = tokio::time::interval(update_rate);
15431549
loop {
15441550
update_scheduler.tick().await;
1545-
let averages: &HashMap<Uuid, RwLock<Option<TimestampedAverage>>> =
1546-
&node_avgs.read().unwrap();
1547-
if averages.is_empty() {
1548-
continue; // No nodes queries registered to LAP performed yet.
1549-
}
1550-
1551-
let min_avg = averages
1552-
.values()
1553-
.filter_map(|avg| {
1554-
avg.read().unwrap().and_then(|timestamped_average| {
1555-
(timestamped_average.num_measures >= minimum_measurements)
1556-
.then_some(timestamped_average.average)
1557-
})
1558-
})
1559-
.min();
1560-
if let Some(min_avg) = min_avg {
1561-
min_latency.store(min_avg);
1562-
trace!(
1563-
"Latency awareness: updated min average latency to {} ms",
1564-
min_avg.as_secs_f64() * 1000.
1565-
);
1566-
}
1551+
updater.tick().await;
15671552
}
15681553
}
15691554
.remote_handle();
@@ -1674,6 +1659,41 @@ mod latency_awareness {
16741659
}
16751660
}
16761661

1662+
/// Updates minimum average latency upon request each request to `tick()`.
1663+
/// The said average is a crucial criterium for penalising "too slow" nodes.
1664+
struct MinAvgUpdater {
1665+
node_avgs: Arc<RwLock<HashMap<Uuid, RwLock<Option<TimestampedAverage>>>>>,
1666+
min_latency: Arc<AtomicDuration>,
1667+
minimum_measurements: usize,
1668+
}
1669+
1670+
impl MinAvgUpdater {
1671+
async fn tick(&self) {
1672+
let averages: &HashMap<Uuid, RwLock<Option<TimestampedAverage>>> =
1673+
&self.node_avgs.read().unwrap();
1674+
if averages.is_empty() {
1675+
return; // No nodes queries registered to LAP performed yet.
1676+
}
1677+
1678+
let min_avg = averages
1679+
.values()
1680+
.filter_map(|avg| {
1681+
avg.read().unwrap().and_then(|timestamped_average| {
1682+
(timestamped_average.num_measures >= self.minimum_measurements)
1683+
.then_some(timestamped_average.average)
1684+
})
1685+
})
1686+
.min();
1687+
if let Some(min_avg) = min_avg {
1688+
self.min_latency.store(min_avg);
1689+
trace!(
1690+
"Latency awareness: updated min average latency to {} ms",
1691+
min_avg.as_secs_f64() * 1000.
1692+
);
1693+
}
1694+
}
1695+
}
1696+
16771697
/// The builder of LatencyAwareness module of DefaultPolicy.
16781698
///
16791699
/// (For more information about latency awareness, see [DefaultPolicyBuilder::latency_awareness()](super::DefaultPolicyBuilder::latency_awareness)).

0 commit comments

Comments
 (0)