@@ -1518,65 +1518,75 @@ mod latency_awareness {
15181518
15191519 node_avgs : Arc < RwLock < HashMap < Uuid , RwLock < Option < TimestampedAverage > > > > > ,
15201520
1521- _updater_handle : RemoteHandle < ( ) > ,
1521+ // This is Some iff there is an associated updater running on a separate Tokio task
1522+ // For some tests, not to rely on timing, this is None. The updater is then tick'ed
1523+ // explicitly from outside this struct.
1524+ _updater_handle : Option < RemoteHandle < ( ) > > ,
15221525 }
15231526
15241527 impl LatencyAwareness {
15251528 pub ( super ) fn builder ( ) -> LatencyAwarenessBuilder {
15261529 LatencyAwarenessBuilder :: new ( )
15271530 }
15281531
1529- fn new (
1532+ fn new_for_test (
15301533 exclusion_threshold : f64 ,
15311534 retry_period : Duration ,
15321535 update_rate : Duration ,
15331536 minimum_measurements : usize ,
1534- ) -> Self {
1537+ ) -> ( Self , MinAvgUpdater ) {
15351538 let min_latency = Arc :: new ( AtomicDuration :: new ( ) ) ;
1536- let mut update_scheduler = tokio:: time:: interval ( update_rate) ;
15371539
15381540 let min_latency_clone = min_latency. clone ( ) ;
15391541 let node_avgs = Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ;
15401542 let node_avgs_clone = node_avgs. clone ( ) ;
15411543
1544+ let updater = MinAvgUpdater {
1545+ node_avgs,
1546+ min_latency,
1547+ minimum_measurements,
1548+ } ;
1549+
1550+ (
1551+ Self {
1552+ exclusion_threshold,
1553+ retry_period,
1554+ _update_rate : update_rate,
1555+ minimum_measurements,
1556+ last_min_latency : min_latency_clone,
1557+ node_avgs : node_avgs_clone,
1558+ _updater_handle : None ,
1559+ } ,
1560+ updater,
1561+ )
1562+ }
1563+
1564+ fn new (
1565+ exclusion_threshold : f64 ,
1566+ retry_period : Duration ,
1567+ update_rate : Duration ,
1568+ minimum_measurements : usize ,
1569+ ) -> Self {
1570+ let ( self_, updater) = Self :: new_for_test (
1571+ exclusion_threshold,
1572+ retry_period,
1573+ update_rate,
1574+ minimum_measurements,
1575+ ) ;
1576+
15421577 let ( updater_fut, updater_handle) = async move {
1578+ let mut update_scheduler = tokio:: time:: interval ( update_rate) ;
15431579 loop {
15441580 update_scheduler. tick ( ) . await ;
1545- let averages: & HashMap < Uuid , RwLock < Option < TimestampedAverage > > > =
1546- & node_avgs. read ( ) . unwrap ( ) ;
1547- if averages. is_empty ( ) {
1548- continue ; // No nodes queries registered to LAP performed yet.
1549- }
1550-
1551- let min_avg = averages
1552- . values ( )
1553- . filter_map ( |avg| {
1554- avg. read ( ) . unwrap ( ) . and_then ( |timestamped_average| {
1555- ( timestamped_average. num_measures >= minimum_measurements)
1556- . then_some ( timestamped_average. average )
1557- } )
1558- } )
1559- . min ( ) ;
1560- if let Some ( min_avg) = min_avg {
1561- min_latency. store ( min_avg) ;
1562- trace ! (
1563- "Latency awareness: updated min average latency to {} ms" ,
1564- min_avg. as_secs_f64( ) * 1000.
1565- ) ;
1566- }
1581+ updater. tick ( ) . await ;
15671582 }
15681583 }
15691584 . remote_handle ( ) ;
15701585 tokio:: task:: spawn ( updater_fut. with_current_subscriber ( ) ) ;
15711586
15721587 Self {
1573- exclusion_threshold,
1574- retry_period,
1575- _update_rate : update_rate,
1576- minimum_measurements,
1577- last_min_latency : min_latency_clone,
1578- node_avgs : node_avgs_clone,
1579- _updater_handle : updater_handle,
1588+ _updater_handle : Some ( updater_handle) ,
1589+ ..self_
15801590 }
15811591 }
15821592
@@ -1674,6 +1684,41 @@ mod latency_awareness {
16741684 }
16751685 }
16761686
1687+ /// Updates minimum average latency upon request each request to `tick()`.
1688+ /// The said average is a crucial criterium for penalising "too slow" nodes.
1689+ struct MinAvgUpdater {
1690+ node_avgs : Arc < RwLock < HashMap < Uuid , RwLock < Option < TimestampedAverage > > > > > ,
1691+ min_latency : Arc < AtomicDuration > ,
1692+ minimum_measurements : usize ,
1693+ }
1694+
1695+ impl MinAvgUpdater {
1696+ async fn tick ( & self ) {
1697+ let averages: & HashMap < Uuid , RwLock < Option < TimestampedAverage > > > =
1698+ & self . node_avgs . read ( ) . unwrap ( ) ;
1699+ if averages. is_empty ( ) {
1700+ return ; // No nodes queries registered to LAP performed yet.
1701+ }
1702+
1703+ let min_avg = averages
1704+ . values ( )
1705+ . filter_map ( |avg| {
1706+ avg. read ( ) . unwrap ( ) . and_then ( |timestamped_average| {
1707+ ( timestamped_average. num_measures >= self . minimum_measurements )
1708+ . then_some ( timestamped_average. average )
1709+ } )
1710+ } )
1711+ . min ( ) ;
1712+ if let Some ( min_avg) = min_avg {
1713+ self . min_latency . store ( min_avg) ;
1714+ trace ! (
1715+ "Latency awareness: updated min average latency to {} ms" ,
1716+ min_avg. as_secs_f64( ) * 1000.
1717+ ) ;
1718+ }
1719+ }
1720+ }
1721+
16771722 /// The builder of LatencyAwareness module of DefaultPolicy.
16781723 ///
16791724 /// (For more information about latency awareness, see [DefaultPolicyBuilder::latency_awareness()](super::DefaultPolicyBuilder::latency_awareness)).
@@ -1792,6 +1837,22 @@ mod latency_awareness {
17921837 minimum_measurements,
17931838 )
17941839 }
1840+
1841+ #[ cfg( test) ]
1842+ fn build_for_test ( self ) -> ( LatencyAwareness , MinAvgUpdater ) {
1843+ let Self {
1844+ exclusion_threshold,
1845+ retry_period,
1846+ update_rate,
1847+ minimum_measurements,
1848+ } = self ;
1849+ LatencyAwareness :: new_for_test (
1850+ exclusion_threshold,
1851+ retry_period,
1852+ update_rate,
1853+ minimum_measurements,
1854+ )
1855+ }
17951856 }
17961857
17971858 impl Default for LatencyAwarenessBuilder {
@@ -1962,9 +2023,9 @@ mod latency_awareness {
19622023 }
19632024 }
19642025
1965- pub fn latency_aware_default_policy ( ) -> DefaultPolicy {
1966- let latency_awareness = LatencyAwareness :: builder ( ) . build ( ) ;
1967-
2026+ fn default_policy_with_given_latency_awareness (
2027+ latency_awareness : LatencyAwareness ,
2028+ ) -> DefaultPolicy {
19682029 let pick_predicate = {
19692030 let latency_predicate = latency_awareness. generate_predicate ( ) ;
19702031 Box :: new ( move |node : & NodeRef | {
@@ -1983,6 +2044,32 @@ mod latency_awareness {
19832044 }
19842045 }
19852046
2047+ fn latency_aware_default_policy_customised (
2048+ configurer : impl FnOnce ( LatencyAwarenessBuilder ) -> LatencyAwarenessBuilder ,
2049+ ) -> DefaultPolicy {
2050+ let latency_awareness = configurer ( LatencyAwareness :: builder ( ) ) . build ( ) ;
2051+ default_policy_with_given_latency_awareness ( latency_awareness)
2052+ }
2053+
2054+ fn latency_aware_default_policy ( ) -> DefaultPolicy {
2055+ latency_aware_default_policy_customised ( |b| b)
2056+ }
2057+
2058+ fn latency_aware_policy_with_explicit_updater_customised (
2059+ configurer : impl FnOnce ( LatencyAwarenessBuilder ) -> LatencyAwarenessBuilder ,
2060+ ) -> ( DefaultPolicy , MinAvgUpdater ) {
2061+ let ( latency_awareness, updater) =
2062+ configurer ( LatencyAwareness :: builder ( ) ) . build_for_test ( ) ;
2063+ (
2064+ default_policy_with_given_latency_awareness ( latency_awareness) ,
2065+ updater,
2066+ )
2067+ }
2068+
2069+ fn latency_aware_default_policy_with_explicit_updater ( ) -> ( DefaultPolicy , MinAvgUpdater ) {
2070+ latency_aware_policy_with_explicit_updater_customised ( |b| b)
2071+ }
2072+
19862073 #[ tokio:: test]
19872074 async fn latency_aware_default_policy_does_not_penalise_if_no_latency_info_available_yet ( ) {
19882075 let policy = latency_aware_default_policy ( ) ;
@@ -2124,8 +2211,9 @@ mod latency_awareness {
21242211
21252212 #[ tokio:: test]
21262213 async fn latency_aware_default_policy_does_not_penalise_if_retry_period_expired ( ) {
2127- let mut policy = latency_aware_default_policy ( ) ;
2128- policy. latency_awareness . as_mut ( ) . unwrap ( ) . retry_period = Duration :: from_millis ( 10 ) ;
2214+ let policy = latency_aware_default_policy_customised ( |b| {
2215+ b. retry_period ( Duration :: from_millis ( 10 ) )
2216+ } ) ;
21292217
21302218 let cluster = tests:: mock_cluster_data_for_token_unaware_tests ( ) . await ;
21312219
@@ -2191,7 +2279,7 @@ mod latency_awareness {
21912279 . with_env_filter ( tracing_subscriber:: EnvFilter :: from_default_env ( ) )
21922280 . without_time ( )
21932281 . try_init ( ) ;
2194- let policy = latency_aware_default_policy ( ) ;
2282+ let ( policy, updater ) = latency_aware_default_policy_with_explicit_updater ( ) ;
21952283 let cluster = tests:: mock_cluster_data_for_token_unaware_tests ( ) . await ;
21962284
21972285 let min_avg = Duration :: from_millis ( 10 ) ;
@@ -2256,8 +2344,7 @@ mod latency_awareness {
22562344 ) ;
22572345
22582346 // Await last min average updater.
2259- // policy.latency_awareness.as_ref().unwrap().refresh_last_min_avg_nodes(&cluster.all_nodes); FIXME:
2260- tokio:: time:: sleep ( policy. latency_awareness . as_ref ( ) . unwrap ( ) . _update_rate * 5 ) . await ;
2347+ updater. tick ( ) . await ;
22612348
22622349 let expected_groups = ExpectedGroupsBuilder :: new ( )
22632350 . group ( [ 2 , 3 ] ) // pick + fallback local nodes
@@ -2278,7 +2365,8 @@ mod latency_awareness {
22782365 #[ tokio:: test]
22792366 async fn latency_aware_default_policy_stops_penalising_after_min_average_increases_enough_only_after_update_rate_elapses (
22802367 ) {
2281- let policy = latency_aware_default_policy ( ) ;
2368+ let ( policy, updater) = latency_aware_default_policy_with_explicit_updater ( ) ;
2369+
22822370 let cluster = tests:: mock_cluster_data_for_token_unaware_tests ( ) . await ;
22832371
22842372 let min_avg = Duration :: from_millis ( 10 ) ;
@@ -2322,7 +2410,7 @@ mod latency_awareness {
23222410 ) ;
23232411
23242412 // Await last min average updater.
2325- tokio :: time :: sleep ( policy . latency_awareness . as_ref ( ) . unwrap ( ) . _update_rate ) . await ;
2413+ updater . tick ( ) . await ;
23262414 {
23272415 // min_avg is low enough to penalise node 1
23282416 let expected_groups = ExpectedGroupsBuilder :: new ( )
@@ -2380,7 +2468,7 @@ mod latency_awareness {
23802468 . await ;
23812469 }
23822470
2383- tokio :: time :: sleep ( policy . latency_awareness . as_ref ( ) . unwrap ( ) . _update_rate ) . await ;
2471+ updater . tick ( ) . await ;
23842472 {
23852473 // min_avg has been updated and is already high enough to stop penalising node 1
23862474 let expected_groups = ExpectedGroupsBuilder :: new ( )
@@ -2561,7 +2649,7 @@ mod latency_awareness {
25612649 ] ;
25622650
25632651 for test in & tests {
2564- let policy = latency_aware_default_policy ( ) ;
2652+ let ( policy, updater ) = latency_aware_default_policy_with_explicit_updater ( ) ;
25652653
25662654 if let Some ( preset_min_avg) = test. preset_min_avg {
25672655 policy. set_nodes_latency_stats (
@@ -2576,14 +2664,14 @@ mod latency_awareness {
25762664 ) ] ,
25772665 ) ;
25782666 // Await last min average updater for update with a forged min_avg.
2579- tokio :: time :: sleep ( latency_awareness_defaults . _update_rate ) . await ;
2667+ updater . tick ( ) . await ;
25802668 policy. set_nodes_latency_stats ( & cluster, & [ ( 1 , None ) ] ) ;
25812669 }
25822670 policy. set_nodes_latency_stats ( & cluster, test. latency_stats ) ;
25832671
25842672 if test. preset_min_avg . is_none ( ) {
25852673 // Await last min average updater for update with None min_avg.
2586- tokio :: time :: sleep ( latency_awareness_defaults . _update_rate ) . await ;
2674+ updater . tick ( ) . await ;
25872675 }
25882676
25892677 test_default_policy_with_given_cluster_and_routing_info (
0 commit comments