Skip to content

Commit 4ebfda8

Browse files
committed
default_policy: allow explicit min avg latency updates
For tests purposes, LatencyAwareness can now be created using new_for_test(), which returns a MinAvgUpdater for explicit tick control instead of scheduling a Tokio task that triggers updates periodically.
1 parent dcb1256 commit 4ebfda8

File tree

1 file changed

+35
-10
lines changed

1 file changed

+35
-10
lines changed

scylla/src/transport/load_balancing/default.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,20 +1518,23 @@ mod latency_awareness {
15181518

15191519
node_avgs: Arc<RwLock<HashMap<Uuid, RwLock<Option<TimestampedAverage>>>>>,
15201520

1521-
_updater_handle: RemoteHandle<()>,
1521+
// This is Some iff there is an associated updater running on a separate Tokio task
1522+
// For some tests, not to rely on timing, this is None. The updater is then tick'ed
1523+
// explicitly from outside this struct.
1524+
_updater_handle: Option<RemoteHandle<()>>,
15221525
}
15231526

15241527
impl LatencyAwareness {
15251528
pub(super) fn builder() -> LatencyAwarenessBuilder {
15261529
LatencyAwarenessBuilder::new()
15271530
}
15281531

1529-
fn new(
1532+
fn new_for_test(
15301533
exclusion_threshold: f64,
15311534
retry_period: Duration,
15321535
update_rate: Duration,
15331536
minimum_measurements: usize,
1534-
) -> Self {
1537+
) -> (Self, MinAvgUpdater) {
15351538
let min_latency = Arc::new(AtomicDuration::new());
15361539

15371540
let min_latency_clone = min_latency.clone();
@@ -1544,6 +1547,33 @@ mod latency_awareness {
15441547
minimum_measurements,
15451548
};
15461549

1550+
(
1551+
Self {
1552+
exclusion_threshold,
1553+
retry_period,
1554+
_update_rate: update_rate,
1555+
minimum_measurements,
1556+
last_min_latency: min_latency_clone,
1557+
node_avgs: node_avgs_clone,
1558+
_updater_handle: None,
1559+
},
1560+
updater,
1561+
)
1562+
}
1563+
1564+
fn new(
1565+
exclusion_threshold: f64,
1566+
retry_period: Duration,
1567+
update_rate: Duration,
1568+
minimum_measurements: usize,
1569+
) -> Self {
1570+
let (self_, updater) = Self::new_for_test(
1571+
exclusion_threshold,
1572+
retry_period,
1573+
update_rate,
1574+
minimum_measurements,
1575+
);
1576+
15471577
let (updater_fut, updater_handle) = async move {
15481578
let mut update_scheduler = tokio::time::interval(update_rate);
15491579
loop {
@@ -1555,13 +1585,8 @@ mod latency_awareness {
15551585
tokio::task::spawn(updater_fut.with_current_subscriber());
15561586

15571587
Self {
1558-
exclusion_threshold,
1559-
retry_period,
1560-
_update_rate: update_rate,
1561-
minimum_measurements,
1562-
last_min_latency: min_latency_clone,
1563-
node_avgs: node_avgs_clone,
1564-
_updater_handle: updater_handle,
1588+
_updater_handle: Some(updater_handle),
1589+
..self_
15651590
}
15661591
}
15671592

0 commit comments

Comments
 (0)