@@ -2455,14 +2455,38 @@ mod latency_awareness {
24552455 None => return Either :: Left ( fallback) , // noop, as no latency data has been collected yet
24562456 } ;
24572457
2458- Either :: Right ( IteratorWithSkippedNodes :: new (
2459- self . node_avgs . read ( ) . unwrap ( ) . deref ( ) ,
2460- fallback,
2461- self . exclusion_threshold ,
2462- self . retry_period ,
2463- self . minimum_measurements ,
2464- min_avg_latency,
2465- ) )
2458+ let average_latencies = self . node_avgs . read ( ) . unwrap ( ) ;
2459+ let targets = fallback;
2460+
2461+ let mut fast_targets = vec ! [ ] ;
2462+ let mut penalised_targets = vec ! [ ] ;
2463+
2464+ for node_and_shard @ ( node, _shard) in targets {
2465+ match fast_enough (
2466+ average_latencies. deref ( ) ,
2467+ node. host_id ,
2468+ self . exclusion_threshold ,
2469+ self . retry_period ,
2470+ self . minimum_measurements ,
2471+ min_avg_latency,
2472+ ) {
2473+ FastEnough :: Yes => fast_targets. push ( node_and_shard) ,
2474+ FastEnough :: No { average } => {
2475+ trace ! ( "Latency awareness: Penalising node {{address={}, datacenter={:?}, rack={:?}}} for being on average at least {} times slower (latency: {}ms) than the fastest ({}ms)." ,
2476+ node. address, node. datacenter, node. rack, self . exclusion_threshold, average. as_millis( ) , min_avg_latency. as_millis( ) ) ;
2477+ penalised_targets. push ( node_and_shard) ;
2478+ }
2479+ }
2480+ }
2481+
2482+ let mut fast_targets = fast_targets. into_iter ( ) ;
2483+ let mut penalised_targets = penalised_targets. into_iter ( ) ;
2484+
2485+ let skipping_penalised_targets_iterator = std:: iter:: from_fn ( move || {
2486+ fast_targets. next ( ) . or_else ( || penalised_targets. next ( ) )
2487+ } ) ;
2488+
2489+ Either :: Right ( skipping_penalised_targets_iterator)
24662490 }
24672491
24682492 pub ( super ) fn report_query ( & self , node : & Node , latency : Duration ) {
@@ -2768,71 +2792,6 @@ mod latency_awareness {
27682792 }
27692793 }
27702794
2771- struct IteratorWithSkippedNodes < ' a , Fast , Penalised >
2772- where
2773- Fast : Iterator < Item = ( NodeRef < ' a > , Option < Shard > ) > ,
2774- Penalised : Iterator < Item = ( NodeRef < ' a > , Option < Shard > ) > ,
2775- {
2776- fast_nodes : Fast ,
2777- penalised_nodes : Penalised ,
2778- }
2779-
2780- impl < ' a >
2781- IteratorWithSkippedNodes <
2782- ' a ,
2783- std:: vec:: IntoIter < ( NodeRef < ' a > , Option < Shard > ) > ,
2784- std:: vec:: IntoIter < ( NodeRef < ' a > , Option < Shard > ) > ,
2785- >
2786- {
2787- fn new (
2788- average_latencies : & HashMap < Uuid , RwLock < Option < TimestampedAverage > > > ,
2789- nodes : impl Iterator < Item = ( NodeRef < ' a > , Option < Shard > ) > ,
2790- exclusion_threshold : f64 ,
2791- retry_period : Duration ,
2792- minimum_measurements : usize ,
2793- min_avg : Duration ,
2794- ) -> Self {
2795- let mut fast_nodes = vec ! [ ] ;
2796- let mut penalised_nodes = vec ! [ ] ;
2797-
2798- for node_and_shard @ ( node, _shard) in nodes {
2799- match fast_enough (
2800- average_latencies,
2801- node. host_id ,
2802- exclusion_threshold,
2803- retry_period,
2804- minimum_measurements,
2805- min_avg,
2806- ) {
2807- FastEnough :: Yes => fast_nodes. push ( node_and_shard) ,
2808- FastEnough :: No { average } => {
2809- trace ! ( "Latency awareness: Penalising node {{address={}, datacenter={:?}, rack={:?}}} for being on average at least {} times slower (latency: {}ms) than the fastest ({}ms)." ,
2810- node. address, node. datacenter, node. rack, exclusion_threshold, average. as_millis( ) , min_avg. as_millis( ) ) ;
2811- penalised_nodes. push ( node_and_shard) ;
2812- }
2813- }
2814- }
2815-
2816- Self {
2817- fast_nodes : fast_nodes. into_iter ( ) ,
2818- penalised_nodes : penalised_nodes. into_iter ( ) ,
2819- }
2820- }
2821- }
2822-
2823- impl < ' a , Fast , Penalised > Iterator for IteratorWithSkippedNodes < ' a , Fast , Penalised >
2824- where
2825- Fast : Iterator < Item = ( NodeRef < ' a > , Option < Shard > ) > ,
2826- Penalised : Iterator < Item = ( NodeRef < ' a > , Option < Shard > ) > ,
2827- {
2828- type Item = ( NodeRef < ' a > , Option < Shard > ) ;
2829-
2830- fn next ( & mut self ) -> Option < Self :: Item > {
2831- self . fast_nodes
2832- . next ( )
2833- . or_else ( || self . penalised_nodes . next ( ) )
2834- }
2835- }
28362795 #[ cfg( test) ]
28372796 mod tests {
28382797 use scylla_cql:: Consistency ;
0 commit comments