diff --git a/packages/metrics/README.md b/packages/metrics/README.md index 9f3883fba..3d1d94c5f 100644 --- a/packages/metrics/README.md +++ b/packages/metrics/README.md @@ -67,7 +67,7 @@ println!("{}", prometheus_output); ### Metric Aggregation ```rust -use torrust_tracker_metrics::metric_collection::aggregate::Sum; +use torrust_tracker_metrics::metric_collection::aggregate::{Sum, Avg}; // Sum all counter values matching specific labels let total_requests = metrics.sum( @@ -76,6 +76,14 @@ let total_requests = metrics.sum( ); println!("Total requests: {:?}", total_requests); + +// Calculate average of gauge values matching specific labels +let avg_response_time = metrics.avg( + &metric_name!("response_time_seconds"), + &[("endpoint", "/announce")].into(), +); + +println!("Average response time: {:?}", avg_response_time); ``` ## Architecture diff --git a/packages/metrics/src/metric/aggregate/avg.rs b/packages/metrics/src/metric/aggregate/avg.rs new file mode 100644 index 000000000..95628450b --- /dev/null +++ b/packages/metrics/src/metric/aggregate/avg.rs @@ -0,0 +1,294 @@ +use crate::counter::Counter; +use crate::gauge::Gauge; +use crate::label::LabelSet; +use crate::metric::aggregate::sum::Sum; +use crate::metric::Metric; + +pub trait Avg { + type Output; + fn avg(&self, label_set_criteria: &LabelSet) -> Self::Output; +} + +impl Avg for Metric { + type Output = f64; + + fn avg(&self, label_set_criteria: &LabelSet) -> Self::Output { + let matching_samples = self.collect_matching_samples(label_set_criteria); + + if matching_samples.is_empty() { + return 0.0; + } + + let sum = self.sum(label_set_criteria); + + #[allow(clippy::cast_precision_loss)] + (sum as f64 / matching_samples.len() as f64) + } +} + +impl Avg for Metric { + type Output = f64; + + fn avg(&self, label_set_criteria: &LabelSet) -> Self::Output { + let matching_samples = self.collect_matching_samples(label_set_criteria); + + if matching_samples.is_empty() { + return 0.0; + } + + let sum = self.sum(label_set_criteria); + + #[allow(clippy::cast_precision_loss)] + (sum / matching_samples.len() as f64) + } +} + +#[cfg(test)] +mod tests { + + use torrust_tracker_primitives::DurationSinceUnixEpoch; + + use crate::counter::Counter; + use crate::gauge::Gauge; + use crate::label::LabelSet; + use crate::metric::aggregate::avg::Avg; + use crate::metric::{Metric, MetricName}; + use crate::metric_name; + use crate::sample::Sample; + use crate::sample_collection::SampleCollection; + + struct MetricBuilder { + sample_time: DurationSinceUnixEpoch, + name: MetricName, + samples: Vec>, + } + + impl Default for MetricBuilder { + fn default() -> Self { + Self { + sample_time: DurationSinceUnixEpoch::from_secs(1_743_552_000), + name: metric_name!("test_metric"), + samples: vec![], + } + } + } + + impl MetricBuilder { + fn with_sample(mut self, value: T, label_set: &LabelSet) -> Self { + let sample = Sample::new(value, self.sample_time, label_set.clone()); + self.samples.push(sample); + self + } + + fn build(self) -> Metric { + Metric::new( + self.name, + None, + None, + SampleCollection::new(self.samples).expect("invalid samples"), + ) + } + } + + fn counter_cases() -> Vec<(Metric, LabelSet, f64)> { + // (metric, label set criteria, expected_average_value) + vec![ + // Metric with one sample without label set + ( + MetricBuilder::default().with_sample(1.into(), &LabelSet::empty()).build(), + LabelSet::empty(), + 1.0, + ), + // Metric with one sample with a label set + ( + MetricBuilder::default() + .with_sample(1.into(), &[("l1", "l1_value")].into()) + .build(), + [("l1", "l1_value")].into(), + 1.0, + ), + // Metric with two samples, different label sets, average all + ( + MetricBuilder::default() + .with_sample(1.into(), &[("l1", "l1_value")].into()) + .with_sample(3.into(), &[("l2", "l2_value")].into()) + .build(), + LabelSet::empty(), + 2.0, // (1 + 3) / 2 = 2.0 + ), + // Metric with two samples, different label sets, average one + ( + MetricBuilder::default() + .with_sample(1.into(), &[("l1", "l1_value")].into()) + .with_sample(2.into(), &[("l2", "l2_value")].into()) + .build(), + [("l1", "l1_value")].into(), + 1.0, + ), + // Metric with three samples, same label key, different label values, average by key + ( + MetricBuilder::default() + .with_sample(2.into(), &[("l1", "l1_value"), ("la", "la_value")].into()) + .with_sample(4.into(), &[("l1", "l1_value"), ("lb", "lb_value")].into()) + .with_sample(6.into(), &[("l1", "l1_value"), ("lc", "lc_value")].into()) + .build(), + [("l1", "l1_value")].into(), + 4.0, // (2 + 4 + 6) / 3 = 4.0 + ), + // Metric with two samples, different label values, average by subkey + ( + MetricBuilder::default() + .with_sample(5.into(), &[("l1", "l1_value"), ("la", "la_value")].into()) + .with_sample(7.into(), &[("l1", "l1_value"), ("lb", "lb_value")].into()) + .build(), + [("la", "la_value")].into(), + 5.0, + ), + // Edge: Metric with no samples at all + (MetricBuilder::default().build(), LabelSet::empty(), 0.0), + // Edge: Metric with samples but no matching labels + ( + MetricBuilder::default() + .with_sample(5.into(), &[("foo", "bar")].into()) + .build(), + [("not", "present")].into(), + 0.0, + ), + // Edge: Metric with zero value + ( + MetricBuilder::default() + .with_sample(0.into(), &[("l3", "l3_value")].into()) + .build(), + [("l3", "l3_value")].into(), + 0.0, + ), + // Edge: Metric with a very large value + ( + MetricBuilder::default() + .with_sample((u64::MAX / 2).into(), &[("edge", "large1")].into()) + .with_sample((u64::MAX / 2).into(), &[("edge", "large2")].into()) + .build(), + LabelSet::empty(), + #[allow(clippy::cast_precision_loss)] + (u64::MAX as f64 / 2.0), // Average of (max/2) and (max/2) + ), + ] + } + + fn gauge_cases() -> Vec<(Metric, LabelSet, f64)> { + // (metric, label set criteria, expected_average_value) + vec![ + // Metric with one sample without label set + ( + MetricBuilder::default().with_sample(1.0.into(), &LabelSet::empty()).build(), + LabelSet::empty(), + 1.0, + ), + // Metric with one sample with a label set + ( + MetricBuilder::default() + .with_sample(1.0.into(), &[("l1", "l1_value")].into()) + .build(), + [("l1", "l1_value")].into(), + 1.0, + ), + // Metric with two samples, different label sets, average all + ( + MetricBuilder::default() + .with_sample(1.0.into(), &[("l1", "l1_value")].into()) + .with_sample(3.0.into(), &[("l2", "l2_value")].into()) + .build(), + LabelSet::empty(), + 2.0, // (1.0 + 3.0) / 2 = 2.0 + ), + // Metric with two samples, different label sets, average one + ( + MetricBuilder::default() + .with_sample(1.0.into(), &[("l1", "l1_value")].into()) + .with_sample(2.0.into(), &[("l2", "l2_value")].into()) + .build(), + [("l1", "l1_value")].into(), + 1.0, + ), + // Metric with three samples, same label key, different label values, average by key + ( + MetricBuilder::default() + .with_sample(2.0.into(), &[("l1", "l1_value"), ("la", "la_value")].into()) + .with_sample(4.0.into(), &[("l1", "l1_value"), ("lb", "lb_value")].into()) + .with_sample(6.0.into(), &[("l1", "l1_value"), ("lc", "lc_value")].into()) + .build(), + [("l1", "l1_value")].into(), + 4.0, // (2.0 + 4.0 + 6.0) / 3 = 4.0 + ), + // Metric with two samples, different label values, average by subkey + ( + MetricBuilder::default() + .with_sample(5.0.into(), &[("l1", "l1_value"), ("la", "la_value")].into()) + .with_sample(7.0.into(), &[("l1", "l1_value"), ("lb", "lb_value")].into()) + .build(), + [("la", "la_value")].into(), + 5.0, + ), + // Edge: Metric with no samples at all + (MetricBuilder::default().build(), LabelSet::empty(), 0.0), + // Edge: Metric with samples but no matching labels + ( + MetricBuilder::default() + .with_sample(5.0.into(), &[("foo", "bar")].into()) + .build(), + [("not", "present")].into(), + 0.0, + ), + // Edge: Metric with zero value + ( + MetricBuilder::default() + .with_sample(0.0.into(), &[("l3", "l3_value")].into()) + .build(), + [("l3", "l3_value")].into(), + 0.0, + ), + // Edge: Metric with negative values + ( + MetricBuilder::default() + .with_sample((-2.0).into(), &[("l4", "l4_value")].into()) + .with_sample(4.0.into(), &[("l5", "l5_value")].into()) + .build(), + LabelSet::empty(), + 1.0, // (-2.0 + 4.0) / 2 = 1.0 + ), + // Edge: Metric with decimal values + ( + MetricBuilder::default() + .with_sample(1.5.into(), &[("l6", "l6_value")].into()) + .with_sample(2.5.into(), &[("l7", "l7_value")].into()) + .build(), + LabelSet::empty(), + 2.0, // (1.5 + 2.5) / 2 = 2.0 + ), + ] + } + + #[test] + fn test_counter_cases() { + for (idx, (metric, criteria, expected_value)) in counter_cases().iter().enumerate() { + let avg = metric.avg(criteria); + + assert!( + (avg - expected_value).abs() <= f64::EPSILON, + "at case {idx}, expected avg to be {expected_value}, got {avg}" + ); + } + } + + #[test] + fn test_gauge_cases() { + for (idx, (metric, criteria, expected_value)) in gauge_cases().iter().enumerate() { + let avg = metric.avg(criteria); + + assert!( + (avg - expected_value).abs() <= f64::EPSILON, + "at case {idx}, expected avg to be {expected_value}, got {avg}" + ); + } + } +} diff --git a/packages/metrics/src/metric/aggregate/mod.rs b/packages/metrics/src/metric/aggregate/mod.rs index dce785d95..1224a1f52 100644 --- a/packages/metrics/src/metric/aggregate/mod.rs +++ b/packages/metrics/src/metric/aggregate/mod.rs @@ -1 +1,2 @@ +pub mod avg; pub mod sum; diff --git a/packages/metrics/src/metric/mod.rs b/packages/metrics/src/metric/mod.rs index d1aa01b94..6bc1a6075 100644 --- a/packages/metrics/src/metric/mod.rs +++ b/packages/metrics/src/metric/mod.rs @@ -78,6 +78,17 @@ impl Metric { pub fn is_empty(&self) -> bool { self.sample_collection.is_empty() } + + #[must_use] + pub fn collect_matching_samples( + &self, + label_set_criteria: &LabelSet, + ) -> Vec<(&crate::label::LabelSet, &crate::sample::Measurement)> { + self.sample_collection + .iter() + .filter(|(label_set, _measurement)| label_set.matches(label_set_criteria)) + .collect() + } } impl Metric { diff --git a/packages/metrics/src/metric_collection/aggregate/avg.rs b/packages/metrics/src/metric_collection/aggregate/avg.rs new file mode 100644 index 000000000..0aef4e325 --- /dev/null +++ b/packages/metrics/src/metric_collection/aggregate/avg.rs @@ -0,0 +1,212 @@ +use crate::counter::Counter; +use crate::gauge::Gauge; +use crate::label::LabelSet; +use crate::metric::aggregate::avg::Avg as MetricAvgTrait; +use crate::metric::MetricName; +use crate::metric_collection::{MetricCollection, MetricKindCollection}; + +pub trait Avg { + fn avg(&self, metric_name: &MetricName, label_set_criteria: &LabelSet) -> Option; +} + +impl Avg for MetricCollection { + fn avg(&self, metric_name: &MetricName, label_set_criteria: &LabelSet) -> Option { + if let Some(value) = self.counters.avg(metric_name, label_set_criteria) { + return Some(value); + } + + if let Some(value) = self.gauges.avg(metric_name, label_set_criteria) { + return Some(value); + } + + None + } +} + +impl Avg for MetricKindCollection { + fn avg(&self, metric_name: &MetricName, label_set_criteria: &LabelSet) -> Option { + self.metrics.get(metric_name).map(|metric| metric.avg(label_set_criteria)) + } +} + +impl Avg for MetricKindCollection { + fn avg(&self, metric_name: &MetricName, label_set_criteria: &LabelSet) -> Option { + self.metrics.get(metric_name).map(|metric| metric.avg(label_set_criteria)) + } +} + +#[cfg(test)] +mod tests { + + mod it_should_allow_averaging_all_metric_samples_containing_some_given_labels { + + use torrust_tracker_primitives::DurationSinceUnixEpoch; + + use crate::label::LabelValue; + use crate::label_name; + use crate::metric_collection::aggregate::avg::Avg; + + #[test] + fn type_counter_with_two_samples() { + use crate::label::LabelSet; + use crate::metric_collection::MetricCollection; + use crate::metric_name; + + let metric_name = metric_name!("test_counter"); + + let mut collection = MetricCollection::default(); + + collection + .increment_counter( + &metric_name!("test_counter"), + &(label_name!("label_1"), LabelValue::new("value_1")).into(), + DurationSinceUnixEpoch::from_secs(1), + ) + .unwrap(); + + collection + .increment_counter( + &metric_name!("test_counter"), + &(label_name!("label_2"), LabelValue::new("value_2")).into(), + DurationSinceUnixEpoch::from_secs(1), + ) + .unwrap(); + + // Two samples with value 1 each, average should be 1.0 + assert_eq!(collection.avg(&metric_name, &LabelSet::empty()), Some(1.0)); + assert_eq!( + collection.avg(&metric_name, &(label_name!("label_1"), LabelValue::new("value_1")).into()), + Some(1.0) + ); + } + + #[test] + fn type_counter_with_different_values() { + use crate::label::LabelSet; + use crate::metric_collection::MetricCollection; + use crate::metric_name; + + let metric_name = metric_name!("test_counter"); + + let mut collection = MetricCollection::default(); + + // First increment: value goes from 0 to 1 + collection + .increment_counter( + &metric_name!("test_counter"), + &(label_name!("label_1"), LabelValue::new("value_1")).into(), + DurationSinceUnixEpoch::from_secs(1), + ) + .unwrap(); + + // Second increment on the same label: value goes from 1 to 2 + collection + .increment_counter( + &metric_name!("test_counter"), + &(label_name!("label_1"), LabelValue::new("value_1")).into(), + DurationSinceUnixEpoch::from_secs(2), + ) + .unwrap(); + + // Create another counter with a different value + collection + .set_counter( + &metric_name!("test_counter"), + &(label_name!("label_2"), LabelValue::new("value_2")).into(), + 4, + DurationSinceUnixEpoch::from_secs(3), + ) + .unwrap(); + + // Average of 2 and 4 should be 3.0 + assert_eq!(collection.avg(&metric_name, &LabelSet::empty()), Some(3.0)); + assert_eq!( + collection.avg(&metric_name, &(label_name!("label_1"), LabelValue::new("value_1")).into()), + Some(2.0) + ); + assert_eq!( + collection.avg(&metric_name, &(label_name!("label_2"), LabelValue::new("value_2")).into()), + Some(4.0) + ); + } + + #[test] + fn type_gauge_with_two_samples() { + use crate::label::LabelSet; + use crate::metric_collection::MetricCollection; + use crate::metric_name; + + let metric_name = metric_name!("test_gauge"); + + let mut collection = MetricCollection::default(); + + collection + .set_gauge( + &metric_name!("test_gauge"), + &(label_name!("label_1"), LabelValue::new("value_1")).into(), + 2.0, + DurationSinceUnixEpoch::from_secs(1), + ) + .unwrap(); + + collection + .set_gauge( + &metric_name!("test_gauge"), + &(label_name!("label_2"), LabelValue::new("value_2")).into(), + 4.0, + DurationSinceUnixEpoch::from_secs(1), + ) + .unwrap(); + + // Average of 2.0 and 4.0 should be 3.0 + assert_eq!(collection.avg(&metric_name, &LabelSet::empty()), Some(3.0)); + assert_eq!( + collection.avg(&metric_name, &(label_name!("label_1"), LabelValue::new("value_1")).into()), + Some(2.0) + ); + } + + #[test] + fn type_gauge_with_negative_values() { + use crate::label::LabelSet; + use crate::metric_collection::MetricCollection; + use crate::metric_name; + + let metric_name = metric_name!("test_gauge"); + + let mut collection = MetricCollection::default(); + + collection + .set_gauge( + &metric_name!("test_gauge"), + &(label_name!("label_1"), LabelValue::new("value_1")).into(), + -2.0, + DurationSinceUnixEpoch::from_secs(1), + ) + .unwrap(); + + collection + .set_gauge( + &metric_name!("test_gauge"), + &(label_name!("label_2"), LabelValue::new("value_2")).into(), + 6.0, + DurationSinceUnixEpoch::from_secs(1), + ) + .unwrap(); + + // Average of -2.0 and 6.0 should be 2.0 + assert_eq!(collection.avg(&metric_name, &LabelSet::empty()), Some(2.0)); + } + + #[test] + fn nonexistent_metric() { + use crate::label::LabelSet; + use crate::metric_collection::MetricCollection; + use crate::metric_name; + + let collection = MetricCollection::default(); + + assert_eq!(collection.avg(&metric_name!("nonexistent"), &LabelSet::empty()), None); + } + } +} diff --git a/packages/metrics/src/metric_collection/aggregate/mod.rs b/packages/metrics/src/metric_collection/aggregate/mod.rs index dce785d95..1224a1f52 100644 --- a/packages/metrics/src/metric_collection/aggregate/mod.rs +++ b/packages/metrics/src/metric_collection/aggregate/mod.rs @@ -1 +1,2 @@ +pub mod avg; pub mod sum; diff --git a/packages/rest-tracker-api-core/src/statistics/services.rs b/packages/rest-tracker-api-core/src/statistics/services.rs index a8132d4fd..f87cb8c76 100644 --- a/packages/rest-tracker-api-core/src/statistics/services.rs +++ b/packages/rest-tracker-api-core/src/statistics/services.rs @@ -59,18 +59,6 @@ async fn get_protocol_metrics( let http_stats = http_stats_repository.get_stats().await; let udp_server_stats = udp_server_stats_repository.get_stats().await; - /* - - todo: We have to delete the global metrics from Metric types: - - - bittorrent_http_tracker_core::statistics::metrics::Metrics - - bittorrent_udp_tracker_core::statistics::metrics::Metrics - - torrust_udp_tracker_server::statistics::metrics::Metrics - - Internally only the labeled metrics should be used. - - */ - // TCPv4 let tcp4_announces_handled = http_stats.tcp4_announces_handled(); @@ -83,30 +71,30 @@ async fn get_protocol_metrics( // UDP - let udp_requests_aborted = udp_server_stats.udp_requests_aborted(); - let udp_requests_banned = udp_server_stats.udp_requests_banned(); + let udp_requests_aborted = udp_server_stats.udp_requests_aborted_total(); + let udp_requests_banned = udp_server_stats.udp_requests_banned_total(); let udp_banned_ips_total = udp_server_stats.udp_banned_ips_total(); - let udp_avg_connect_processing_time_ns = udp_server_stats.udp_avg_connect_processing_time_ns(); - let udp_avg_announce_processing_time_ns = udp_server_stats.udp_avg_announce_processing_time_ns(); - let udp_avg_scrape_processing_time_ns = udp_server_stats.udp_avg_scrape_processing_time_ns(); + let udp_avg_connect_processing_time_ns = udp_server_stats.udp_avg_connect_processing_time_ns_averaged(); + let udp_avg_announce_processing_time_ns = udp_server_stats.udp_avg_announce_processing_time_ns_averaged(); + let udp_avg_scrape_processing_time_ns = udp_server_stats.udp_avg_scrape_processing_time_ns_averaged(); // UDPv4 - let udp4_requests = udp_server_stats.udp4_requests(); - let udp4_connections_handled = udp_server_stats.udp4_connections_handled(); - let udp4_announces_handled = udp_server_stats.udp4_announces_handled(); - let udp4_scrapes_handled = udp_server_stats.udp4_scrapes_handled(); - let udp4_responses = udp_server_stats.udp4_responses(); - let udp4_errors_handled = udp_server_stats.udp4_errors_handled(); + let udp4_requests = udp_server_stats.udp4_requests_received_total(); + let udp4_connections_handled = udp_server_stats.udp4_connect_requests_accepted_total(); + let udp4_announces_handled = udp_server_stats.udp4_announce_requests_accepted_total(); + let udp4_scrapes_handled = udp_server_stats.udp4_scrape_requests_accepted_total(); + let udp4_responses = udp_server_stats.udp4_responses_sent_total(); + let udp4_errors_handled = udp_server_stats.udp4_errors_total(); // UDPv6 - let udp6_requests = udp_server_stats.udp6_requests(); - let udp6_connections_handled = udp_server_stats.udp6_connections_handled(); - let udp6_announces_handled = udp_server_stats.udp6_announces_handled(); - let udp6_scrapes_handled = udp_server_stats.udp6_scrapes_handled(); - let udp6_responses = udp_server_stats.udp6_responses(); - let udp6_errors_handled = udp_server_stats.udp6_errors_handled(); + let udp6_requests = udp_server_stats.udp6_requests_received_total(); + let udp6_connections_handled = udp_server_stats.udp6_connect_requests_accepted_total(); + let udp6_announces_handled = udp_server_stats.udp6_announce_requests_accepted_total(); + let udp6_scrapes_handled = udp_server_stats.udp6_scrape_requests_accepted_total(); + let udp6_responses = udp_server_stats.udp6_responses_sent_total(); + let udp6_errors_handled = udp_server_stats.udp6_errors_total(); // For backward compatibility we keep the `tcp4_connections_handled` and // `tcp6_connections_handled` metrics. They don't make sense for the HTTP diff --git a/packages/udp-tracker-server/src/statistics/event/handler/error.rs b/packages/udp-tracker-server/src/statistics/event/handler/error.rs index d83a0584d..63e480ca5 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler/error.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler/error.rs @@ -137,6 +137,6 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp4_errors_handled(), 1); + assert_eq!(stats.udp4_errors_total(), 1); } } diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs index 19e410d5e..f340fe51a 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs @@ -54,7 +54,7 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_aborted(), 1); + assert_eq!(stats.udp_requests_aborted_total(), 1); } #[tokio::test] @@ -77,6 +77,6 @@ mod tests { ) .await; let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_aborted(), 1); + assert_eq!(stats.udp_requests_aborted_total(), 1); } } diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs index af92636df..33971926e 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_accepted.rs @@ -61,7 +61,7 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp4_connections_handled(), 1); + assert_eq!(stats.udp4_connect_requests_accepted_total(), 1); } #[tokio::test] @@ -89,7 +89,7 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp4_announces_handled(), 1); + assert_eq!(stats.udp4_announce_requests_accepted_total(), 1); } #[tokio::test] @@ -115,7 +115,7 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp4_scrapes_handled(), 1); + assert_eq!(stats.udp4_scrape_requests_accepted_total(), 1); } #[tokio::test] @@ -141,7 +141,7 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp6_connections_handled(), 1); + assert_eq!(stats.udp6_connect_requests_accepted_total(), 1); } #[tokio::test] @@ -169,7 +169,7 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp6_announces_handled(), 1); + assert_eq!(stats.udp6_announce_requests_accepted_total(), 1); } #[tokio::test] @@ -195,6 +195,6 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp6_scrapes_handled(), 1); + assert_eq!(stats.udp6_scrape_requests_accepted_total(), 1); } } diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs index 8badfa137..10f6cad88 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_banned.rs @@ -54,7 +54,7 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_banned(), 1); + assert_eq!(stats.udp_requests_banned_total(), 1); } #[tokio::test] @@ -77,6 +77,6 @@ mod tests { ) .await; let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_banned(), 1); + assert_eq!(stats.udp_requests_banned_total(), 1); } } diff --git a/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs b/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs index eced5a215..148b9d8da 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler/request_received.rs @@ -54,6 +54,6 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp4_requests(), 1); + assert_eq!(stats.udp4_requests_received_total(), 1); } } diff --git a/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs b/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs index e76d67a4e..b1a046b5b 100644 --- a/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs +++ b/packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs @@ -4,7 +4,7 @@ use torrust_tracker_primitives::DurationSinceUnixEpoch; use crate::event::{ConnectionContext, UdpRequestKind, UdpResponseKind}; use crate::statistics::repository::Repository; -use crate::statistics::{UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL}; +use crate::statistics::UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL; pub async fn handle_event( context: ConnectionContext, @@ -16,82 +16,40 @@ pub async fn handle_event( let (result_label_value, kind_label_value) = match kind { UdpResponseKind::Ok { req_kind } => match req_kind { UdpRequestKind::Connect => { - let new_avg = stats_repository - .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) - .await; - - tracing::debug!("Updating average processing time metric for connect requests: {} ns", new_avg); - let mut label_set = LabelSet::from(context.clone()); label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } + + let _new_avg = stats_repository + .recalculate_udp_avg_processing_time_ns(req_processing_time, &label_set, now) + .await; + (LabelValue::new("ok"), UdpRequestKind::Connect.into()) } UdpRequestKind::Announce { announce_request } => { - let new_avg = stats_repository - .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) - .await; - - tracing::debug!( - "Updating average processing time metric for announce requests: {} ns", - new_avg - ); - let mut label_set = LabelSet::from(context.clone()); label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } + + let _new_avg = stats_repository + .recalculate_udp_avg_processing_time_ns(req_processing_time, &label_set, now) + .await; + (LabelValue::new("ok"), UdpRequestKind::Announce { announce_request }.into()) } UdpRequestKind::Scrape => { - let new_avg = stats_repository - .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) - .await; - - tracing::debug!("Updating average processing time metric for scrape requests: {} ns", new_avg); - let mut label_set = LabelSet::from(context.clone()); label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string())); - match stats_repository - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &label_set, - new_avg, - now, - ) - .await - { - Ok(()) => {} - Err(err) => tracing::error!("Failed to set gauge: {}", err), - } + + let _new_avg = stats_repository + .recalculate_udp_avg_processing_time_ns(req_processing_time, &label_set, now) + .await; + (LabelValue::new("ok"), LabelValue::new(&UdpRequestKind::Scrape.to_string())) } }, UdpResponseKind::Error { opt_req_kind: _ } => (LabelValue::new("error"), LabelValue::ignore()), }; - // Extendable metrics + // Increase the number of responses sent let mut label_set = LabelSet::from(context); if result_label_value == LabelValue::new("ok") { label_set.upsert(label_name!("request_kind"), kind_label_value); @@ -147,7 +105,7 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp4_responses(), 1); + assert_eq!(stats.udp4_responses_sent_total(), 1); } #[tokio::test] @@ -178,6 +136,6 @@ mod tests { let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp6_responses(), 1); + assert_eq!(stats.udp6_responses_sent_total(), 1); } } diff --git a/packages/udp-tracker-server/src/statistics/metrics.rs b/packages/udp-tracker-server/src/statistics/metrics.rs index 3c162ff02..e167dc5ae 100644 --- a/packages/udp-tracker-server/src/statistics/metrics.rs +++ b/packages/udp-tracker-server/src/statistics/metrics.rs @@ -1,13 +1,17 @@ +use std::time::Duration; + use serde::Serialize; use torrust_tracker_metrics::label::LabelSet; use torrust_tracker_metrics::metric::MetricName; +use torrust_tracker_metrics::metric_collection::aggregate::avg::Avg; use torrust_tracker_metrics::metric_collection::aggregate::sum::Sum; use torrust_tracker_metrics::metric_collection::{Error, MetricCollection}; use torrust_tracker_metrics::metric_name; use torrust_tracker_primitives::DurationSinceUnixEpoch; use crate::statistics::{ - UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_IPS_BANNED_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, + UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_IPS_BANNED_TOTAL, + UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL, UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL, @@ -48,12 +52,106 @@ impl Metrics { } impl Metrics { + #[allow(clippy::cast_precision_loss)] + pub fn recalculate_udp_avg_processing_time_ns( + &mut self, + req_processing_time: Duration, + label_set: &LabelSet, + now: DurationSinceUnixEpoch, + ) -> f64 { + self.increment_udp_processed_requests_total(label_set, now); + + let processed_requests_total = self.udp_processed_requests_total(label_set) as f64; + let previous_avg = self.udp_avg_processing_time_ns(label_set); + let req_processing_time = req_processing_time.as_nanos() as f64; + + // Moving average: https://en.wikipedia.org/wiki/Moving_average + let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / processed_requests_total; + + tracing::debug!( + "Recalculated UDP average processing time for labels {}: {} ns (previous: {} ns, req_processing_time: {} ns, request_processed_total: {})", + label_set, + new_avg, + previous_avg, + req_processing_time, + processed_requests_total + ); + + self.update_udp_avg_processing_time_ns(new_avg, label_set, now); + + new_avg + } + + #[must_use] + #[allow(clippy::cast_sign_loss)] + #[allow(clippy::cast_possible_truncation)] + fn udp_avg_processing_time_ns(&self, label_set: &LabelSet) -> u64 { + self.metric_collection + .sum( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + label_set, + ) + .unwrap_or_default() as u64 + } + + #[must_use] + #[allow(clippy::cast_sign_loss)] + #[allow(clippy::cast_possible_truncation)] + pub fn udp_request_accepted_total(&self, label_set: &LabelSet) -> u64 { + self.metric_collection + .sum(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), label_set) + .unwrap_or_default() as u64 + } + + #[must_use] + #[allow(clippy::cast_sign_loss)] + #[allow(clippy::cast_possible_truncation)] + fn udp_processed_requests_total(&self, label_set: &LabelSet) -> u64 { + self.metric_collection + .sum( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL), + label_set, + ) + .unwrap_or_default() as u64 + } + + fn update_udp_avg_processing_time_ns(&mut self, new_avg: f64, label_set: &LabelSet, now: DurationSinceUnixEpoch) { + tracing::debug!( + "Updating average processing time metric to {} ns for label set {}", + new_avg, + label_set, + ); + + match self.set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + label_set, + new_avg, + now, + ) { + Ok(()) => {} + Err(err) => tracing::error!("Failed to set gauge: {}", err), + } + } + + fn increment_udp_processed_requests_total(&mut self, label_set: &LabelSet, now: DurationSinceUnixEpoch) { + tracing::debug!("Incrementing processed requests total for label set {}", label_set,); + + match self.increase_counter( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL), + label_set, + now, + ) { + Ok(()) => {} + Err(err) => tracing::error!("Failed to increment counter: {}", err), + } + } + // UDP /// Total number of UDP (UDP tracker) requests aborted. #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp_requests_aborted(&self) -> u64 { + pub fn udp_requests_aborted_total(&self) -> u64 { self.metric_collection .sum(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL), &LabelSet::empty()) .unwrap_or_default() as u64 @@ -63,7 +161,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp_requests_banned(&self) -> u64 { + pub fn udp_requests_banned_total(&self) -> u64 { self.metric_collection .sum(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL), &LabelSet::empty()) .unwrap_or_default() as u64 @@ -79,43 +177,46 @@ impl Metrics { .unwrap_or_default() as u64 } - /// Average rounded time spent processing UDP connect requests. + /// Average processing time for UDP connect requests across all servers (in nanoseconds). + /// This calculates the average of all gauge samples for connect requests. #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp_avg_connect_processing_time_ns(&self) -> u64 { + pub fn udp_avg_connect_processing_time_ns_averaged(&self) -> u64 { self.metric_collection - .sum( + .avg( &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), &[("request_kind", "connect")].into(), ) - .unwrap_or_default() as u64 + .unwrap_or(0.0) as u64 } - /// Average rounded time spent processing UDP announce requests. + /// Average processing time for UDP announce requests across all servers (in nanoseconds). + /// This calculates the average of all gauge samples for announce requests. #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp_avg_announce_processing_time_ns(&self) -> u64 { + pub fn udp_avg_announce_processing_time_ns_averaged(&self) -> u64 { self.metric_collection - .sum( + .avg( &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), &[("request_kind", "announce")].into(), ) - .unwrap_or_default() as u64 + .unwrap_or(0.0) as u64 } - /// Average rounded time spent processing UDP scrape requests. + /// Average processing time for UDP scrape requests across all servers (in nanoseconds). + /// This calculates the average of all gauge samples for scrape requests. #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp_avg_scrape_processing_time_ns(&self) -> u64 { + pub fn udp_avg_scrape_processing_time_ns_averaged(&self) -> u64 { self.metric_collection - .sum( + .avg( &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), &[("request_kind", "scrape")].into(), ) - .unwrap_or_default() as u64 + .unwrap_or(0.0) as u64 } // UDPv4 @@ -123,7 +224,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp4_requests(&self) -> u64 { + pub fn udp4_requests_received_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL), @@ -136,7 +237,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp4_connections_handled(&self) -> u64 { + pub fn udp4_connect_requests_accepted_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), @@ -149,7 +250,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp4_announces_handled(&self) -> u64 { + pub fn udp4_announce_requests_accepted_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), @@ -162,7 +263,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp4_scrapes_handled(&self) -> u64 { + pub fn udp4_scrape_requests_accepted_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), @@ -175,7 +276,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp4_responses(&self) -> u64 { + pub fn udp4_responses_sent_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL), @@ -188,7 +289,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp4_errors_handled(&self) -> u64 { + pub fn udp4_errors_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_ERRORS_TOTAL), @@ -202,7 +303,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp6_requests(&self) -> u64 { + pub fn udp6_requests_received_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL), @@ -215,7 +316,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp6_connections_handled(&self) -> u64 { + pub fn udp6_connect_requests_accepted_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), @@ -228,7 +329,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp6_announces_handled(&self) -> u64 { + pub fn udp6_announce_requests_accepted_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), @@ -241,7 +342,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp6_scrapes_handled(&self) -> u64 { + pub fn udp6_scrape_requests_accepted_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), @@ -254,7 +355,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp6_responses(&self) -> u64 { + pub fn udp6_responses_sent_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL), @@ -267,7 +368,7 @@ impl Metrics { #[must_use] #[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_possible_truncation)] - pub fn udp6_errors_handled(&self) -> u64 { + pub fn udp6_errors_total(&self) -> u64 { self.metric_collection .sum( &metric_name!(UDP_TRACKER_SERVER_ERRORS_TOTAL), @@ -285,9 +386,10 @@ mod tests { use super::*; use crate::statistics::{ UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_IPS_BANNED_TOTAL, - UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL, - UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL, - UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL, UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL, + UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, + UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL, + UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL, + UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL, }; use crate::CurrentClock; @@ -362,13 +464,38 @@ mod tests { assert!(result.is_ok()); } + #[test] + fn it_should_return_zero_for_udp_processed_requests_total_when_no_data() { + let metrics = Metrics::default(); + let labels = LabelSet::from([("request_kind", "connect")]); + assert_eq!(metrics.udp_processed_requests_total(&labels), 0); + } + + #[test] + fn it_should_increment_processed_requests_total() { + let mut metrics = Metrics::default(); + let now = CurrentClock::now(); + let labels = LabelSet::from([("request_kind", "connect")]); + + // Directly increment the counter using the public method + metrics + .increase_counter( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL), + &labels, + now, + ) + .unwrap(); + + assert_eq!(metrics.udp_processed_requests_total(&labels), 1); + } + mod udp_general_metrics { use super::*; #[test] fn it_should_return_zero_for_udp_requests_aborted_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp_requests_aborted(), 0); + assert_eq!(metrics.udp_requests_aborted_total(), 0); } #[test] @@ -384,13 +511,13 @@ mod tests { .increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL), &labels, now) .unwrap(); - assert_eq!(metrics.udp_requests_aborted(), 2); + assert_eq!(metrics.udp_requests_aborted_total(), 2); } #[test] fn it_should_return_zero_for_udp_requests_banned_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp_requests_banned(), 0); + assert_eq!(metrics.udp_requests_banned_total(), 0); } #[test] @@ -405,7 +532,7 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp_requests_banned(), 3); + assert_eq!(metrics.udp_requests_banned_total(), 3); } #[test] @@ -428,89 +555,13 @@ mod tests { } } - mod udp_performance_metrics { - use super::*; - - #[test] - fn it_should_return_zero_for_udp_avg_connect_processing_time_ns_when_no_data() { - let metrics = Metrics::default(); - assert_eq!(metrics.udp_avg_connect_processing_time_ns(), 0); - } - - #[test] - fn it_should_return_gauge_value_for_udp_avg_connect_processing_time_ns() { - let mut metrics = Metrics::default(); - let now = CurrentClock::now(); - let labels = LabelSet::from([("request_kind", "connect")]); - - metrics - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &labels, - 1500.0, - now, - ) - .unwrap(); - - assert_eq!(metrics.udp_avg_connect_processing_time_ns(), 1500); - } - - #[test] - fn it_should_return_zero_for_udp_avg_announce_processing_time_ns_when_no_data() { - let metrics = Metrics::default(); - assert_eq!(metrics.udp_avg_announce_processing_time_ns(), 0); - } - - #[test] - fn it_should_return_gauge_value_for_udp_avg_announce_processing_time_ns() { - let mut metrics = Metrics::default(); - let now = CurrentClock::now(); - let labels = LabelSet::from([("request_kind", "announce")]); - - metrics - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &labels, - 2500.0, - now, - ) - .unwrap(); - - assert_eq!(metrics.udp_avg_announce_processing_time_ns(), 2500); - } - - #[test] - fn it_should_return_zero_for_udp_avg_scrape_processing_time_ns_when_no_data() { - let metrics = Metrics::default(); - assert_eq!(metrics.udp_avg_scrape_processing_time_ns(), 0); - } - - #[test] - fn it_should_return_gauge_value_for_udp_avg_scrape_processing_time_ns() { - let mut metrics = Metrics::default(); - let now = CurrentClock::now(); - let labels = LabelSet::from([("request_kind", "scrape")]); - - metrics - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &labels, - 3500.0, - now, - ) - .unwrap(); - - assert_eq!(metrics.udp_avg_scrape_processing_time_ns(), 3500); - } - } - mod udpv4_metrics { use super::*; #[test] fn it_should_return_zero_for_udp4_requests_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp4_requests(), 0); + assert_eq!(metrics.udp4_requests_received_total(), 0); } #[test] @@ -525,13 +576,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp4_requests(), 5); + assert_eq!(metrics.udp4_requests_received_total(), 5); } #[test] fn it_should_return_zero_for_udp4_connections_handled_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp4_connections_handled(), 0); + assert_eq!(metrics.udp4_connect_requests_accepted_total(), 0); } #[test] @@ -546,13 +597,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp4_connections_handled(), 3); + assert_eq!(metrics.udp4_connect_requests_accepted_total(), 3); } #[test] fn it_should_return_zero_for_udp4_announces_handled_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp4_announces_handled(), 0); + assert_eq!(metrics.udp4_announce_requests_accepted_total(), 0); } #[test] @@ -567,13 +618,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp4_announces_handled(), 7); + assert_eq!(metrics.udp4_announce_requests_accepted_total(), 7); } #[test] fn it_should_return_zero_for_udp4_scrapes_handled_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp4_scrapes_handled(), 0); + assert_eq!(metrics.udp4_scrape_requests_accepted_total(), 0); } #[test] @@ -588,13 +639,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp4_scrapes_handled(), 4); + assert_eq!(metrics.udp4_scrape_requests_accepted_total(), 4); } #[test] fn it_should_return_zero_for_udp4_responses_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp4_responses(), 0); + assert_eq!(metrics.udp4_responses_sent_total(), 0); } #[test] @@ -609,13 +660,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp4_responses(), 6); + assert_eq!(metrics.udp4_responses_sent_total(), 6); } #[test] fn it_should_return_zero_for_udp4_errors_handled_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp4_errors_handled(), 0); + assert_eq!(metrics.udp4_errors_total(), 0); } #[test] @@ -630,7 +681,7 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp4_errors_handled(), 2); + assert_eq!(metrics.udp4_errors_total(), 2); } } @@ -640,7 +691,7 @@ mod tests { #[test] fn it_should_return_zero_for_udp6_requests_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp6_requests(), 0); + assert_eq!(metrics.udp6_requests_received_total(), 0); } #[test] @@ -655,13 +706,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp6_requests(), 8); + assert_eq!(metrics.udp6_requests_received_total(), 8); } #[test] fn it_should_return_zero_for_udp6_connections_handled_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp6_connections_handled(), 0); + assert_eq!(metrics.udp6_connect_requests_accepted_total(), 0); } #[test] @@ -676,13 +727,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp6_connections_handled(), 4); + assert_eq!(metrics.udp6_connect_requests_accepted_total(), 4); } #[test] fn it_should_return_zero_for_udp6_announces_handled_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp6_announces_handled(), 0); + assert_eq!(metrics.udp6_announce_requests_accepted_total(), 0); } #[test] @@ -697,13 +748,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp6_announces_handled(), 9); + assert_eq!(metrics.udp6_announce_requests_accepted_total(), 9); } #[test] fn it_should_return_zero_for_udp6_scrapes_handled_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp6_scrapes_handled(), 0); + assert_eq!(metrics.udp6_scrape_requests_accepted_total(), 0); } #[test] @@ -718,13 +769,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp6_scrapes_handled(), 6); + assert_eq!(metrics.udp6_scrape_requests_accepted_total(), 6); } #[test] fn it_should_return_zero_for_udp6_responses_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp6_responses(), 0); + assert_eq!(metrics.udp6_responses_sent_total(), 0); } #[test] @@ -739,13 +790,13 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp6_responses(), 11); + assert_eq!(metrics.udp6_responses_sent_total(), 11); } #[test] fn it_should_return_zero_for_udp6_errors_handled_when_no_data() { let metrics = Metrics::default(); - assert_eq!(metrics.udp6_errors_handled(), 0); + assert_eq!(metrics.udp6_errors_total(), 0); } #[test] @@ -760,7 +811,7 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp6_errors_handled(), 3); + assert_eq!(metrics.udp6_errors_total(), 3); } } @@ -788,8 +839,8 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp4_requests(), 3); - assert_eq!(metrics.udp6_requests(), 7); + assert_eq!(metrics.udp4_requests_received_total(), 3); + assert_eq!(metrics.udp6_requests_received_total(), 7); } #[test] @@ -828,9 +879,9 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp4_connections_handled(), 2); - assert_eq!(metrics.udp4_announces_handled(), 5); - assert_eq!(metrics.udp4_scrapes_handled(), 1); + assert_eq!(metrics.udp4_connect_requests_accepted_total(), 2); + assert_eq!(metrics.udp4_announce_requests_accepted_total(), 5); + assert_eq!(metrics.udp4_scrape_requests_accepted_total(), 1); } #[test] @@ -887,10 +938,10 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp4_connections_handled(), 3); - assert_eq!(metrics.udp6_connections_handled(), 2); - assert_eq!(metrics.udp4_announces_handled(), 4); - assert_eq!(metrics.udp6_announces_handled(), 6); + assert_eq!(metrics.udp4_connect_requests_accepted_total(), 3); + assert_eq!(metrics.udp6_connect_requests_accepted_total(), 2); + assert_eq!(metrics.udp4_announce_requests_accepted_total(), 4); + assert_eq!(metrics.udp6_announce_requests_accepted_total(), 6); } } @@ -910,7 +961,7 @@ mod tests { .unwrap(); } - assert_eq!(metrics.udp_requests_aborted(), 1000); + assert_eq!(metrics.udp_requests_aborted_total(), 1000); } #[test] @@ -940,25 +991,6 @@ mod tests { assert_eq!(metrics.udp_banned_ips_total(), 0); } - #[test] - fn it_should_handle_fractional_gauge_values_with_truncation() { - let mut metrics = Metrics::default(); - let now = CurrentClock::now(); - let labels = LabelSet::from([("request_kind", "connect")]); - - metrics - .set_gauge( - &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), - &labels, - 1234.567, - now, - ) - .unwrap(); - - // Should truncate to 1234 - assert_eq!(metrics.udp_avg_connect_processing_time_ns(), 1234); - } - #[test] fn it_should_overwrite_gauge_values_when_set_multiple_times() { let mut metrics = Metrics::default(); @@ -989,7 +1021,7 @@ mod tests { let result = metrics.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL), &empty_labels, now); assert!(result.is_ok()); - assert_eq!(metrics.udp_requests_aborted(), 1); + assert_eq!(metrics.udp_requests_aborted_total(), 1); } #[test] @@ -1014,8 +1046,8 @@ mod tests { } // Should return labeled sums correctly - assert_eq!(metrics.udp4_requests(), 3); - assert_eq!(metrics.udp6_requests(), 5); + assert_eq!(metrics.udp4_requests_received_total(), 3); + assert_eq!(metrics.udp6_requests_received_total(), 5); } } @@ -1056,4 +1088,228 @@ mod tests { assert!(result.is_ok()); } } + + mod averaged_processing_time_metrics { + use super::*; + + #[test] + fn it_should_return_zero_for_udp_avg_connect_processing_time_ns_averaged_when_no_data() { + let metrics = Metrics::default(); + assert_eq!(metrics.udp_avg_connect_processing_time_ns_averaged(), 0); + } + + #[test] + fn it_should_return_averaged_value_for_udp_avg_connect_processing_time_ns_averaged() { + let mut metrics = Metrics::default(); + let now = CurrentClock::now(); + let labels1 = LabelSet::from([("request_kind", "connect"), ("server_id", "server1")]); + let labels2 = LabelSet::from([("request_kind", "connect"), ("server_id", "server2")]); + + // Set different gauge values for connect requests from different servers + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels1, + 1000.0, + now, + ) + .unwrap(); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels2, + 2000.0, + now, + ) + .unwrap(); + + // Should return the average: (1000 + 2000) / 2 = 1500 + assert_eq!(metrics.udp_avg_connect_processing_time_ns_averaged(), 1500); + } + + #[test] + fn it_should_return_zero_for_udp_avg_announce_processing_time_ns_averaged_when_no_data() { + let metrics = Metrics::default(); + assert_eq!(metrics.udp_avg_announce_processing_time_ns_averaged(), 0); + } + + #[test] + fn it_should_return_averaged_value_for_udp_avg_announce_processing_time_ns_averaged() { + let mut metrics = Metrics::default(); + let now = CurrentClock::now(); + let labels1 = LabelSet::from([("request_kind", "announce"), ("server_id", "server1")]); + let labels2 = LabelSet::from([("request_kind", "announce"), ("server_id", "server2")]); + let labels3 = LabelSet::from([("request_kind", "announce"), ("server_id", "server3")]); + + // Set different gauge values for announce requests from different servers + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels1, + 1500.0, + now, + ) + .unwrap(); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels2, + 2500.0, + now, + ) + .unwrap(); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels3, + 3000.0, + now, + ) + .unwrap(); + + // Should return the average: (1500 + 2500 + 3000) / 3 = 2333 (truncated) + assert_eq!(metrics.udp_avg_announce_processing_time_ns_averaged(), 2333); + } + + #[test] + fn it_should_return_zero_for_udp_avg_scrape_processing_time_ns_averaged_when_no_data() { + let metrics = Metrics::default(); + assert_eq!(metrics.udp_avg_scrape_processing_time_ns_averaged(), 0); + } + + #[test] + fn it_should_return_averaged_value_for_udp_avg_scrape_processing_time_ns_averaged() { + let mut metrics = Metrics::default(); + let now = CurrentClock::now(); + let labels1 = LabelSet::from([("request_kind", "scrape"), ("server_id", "server1")]); + let labels2 = LabelSet::from([("request_kind", "scrape"), ("server_id", "server2")]); + + // Set different gauge values for scrape requests from different servers + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels1, + 500.0, + now, + ) + .unwrap(); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels2, + 1500.0, + now, + ) + .unwrap(); + + // Should return the average: (500 + 1500) / 2 = 1000 + assert_eq!(metrics.udp_avg_scrape_processing_time_ns_averaged(), 1000); + } + + #[test] + fn it_should_handle_fractional_averages_with_truncation() { + let mut metrics = Metrics::default(); + let now = CurrentClock::now(); + let labels1 = LabelSet::from([("request_kind", "connect"), ("server_id", "server1")]); + let labels2 = LabelSet::from([("request_kind", "connect"), ("server_id", "server2")]); + let labels3 = LabelSet::from([("request_kind", "connect"), ("server_id", "server3")]); + + // Set values that will result in a fractional average + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels1, + 1000.0, + now, + ) + .unwrap(); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels2, + 1001.0, + now, + ) + .unwrap(); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels3, + 1001.0, + now, + ) + .unwrap(); + + // Should return the average: (1000 + 1001 + 1001) / 3 = 1000.666... → 1000 (truncated) + assert_eq!(metrics.udp_avg_connect_processing_time_ns_averaged(), 1000); + } + + #[test] + fn it_should_only_average_matching_request_kinds() { + let mut metrics = Metrics::default(); + let now = CurrentClock::now(); + + // Set values for different request kinds with the same server_id + let connect_labels = LabelSet::from([("request_kind", "connect"), ("server_id", "server1")]); + let announce_labels = LabelSet::from([("request_kind", "announce"), ("server_id", "server1")]); + let scrape_labels = LabelSet::from([("request_kind", "scrape"), ("server_id", "server1")]); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &connect_labels, + 1000.0, + now, + ) + .unwrap(); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &announce_labels, + 2000.0, + now, + ) + .unwrap(); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &scrape_labels, + 3000.0, + now, + ) + .unwrap(); + + // Each function should only return the value for its specific request kind + assert_eq!(metrics.udp_avg_connect_processing_time_ns_averaged(), 1000); + assert_eq!(metrics.udp_avg_announce_processing_time_ns_averaged(), 2000); + assert_eq!(metrics.udp_avg_scrape_processing_time_ns_averaged(), 3000); + } + + #[test] + fn it_should_handle_single_server_averaged_metrics() { + let mut metrics = Metrics::default(); + let now = CurrentClock::now(); + let labels = LabelSet::from([("request_kind", "connect"), ("server_id", "single_server")]); + + metrics + .set_gauge( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &labels, + 1234.0, + now, + ) + .unwrap(); + + // With only one server, the average should be the same as the single value + assert_eq!(metrics.udp_avg_connect_processing_time_ns_averaged(), 1234); + } + } } diff --git a/packages/udp-tracker-server/src/statistics/mod.rs b/packages/udp-tracker-server/src/statistics/mod.rs index 768722ba3..6bd35b9a1 100644 --- a/packages/udp-tracker-server/src/statistics/mod.rs +++ b/packages/udp-tracker-server/src/statistics/mod.rs @@ -17,6 +17,8 @@ pub const UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL: &str = "udp_tracker_server pub const UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL: &str = "udp_tracker_server_responses_sent_total"; pub const UDP_TRACKER_SERVER_ERRORS_TOTAL: &str = "udp_tracker_server_errors_total"; pub const UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS: &str = "udp_tracker_server_performance_avg_processing_time_ns"; +pub const UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL: &str = + "udp_tracker_server_performance_avg_processed_requests_total"; #[must_use] pub fn describe_metrics() -> Metrics { @@ -76,5 +78,13 @@ pub fn describe_metrics() -> Metrics { Some(MetricDescription::new("Average time to process a UDP request in nanoseconds")), ); + metrics.metric_collection.describe_counter( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL), + Some(Unit::Count), + Some(MetricDescription::new( + "Total number of UDP requests processed for the average performance metrics", + )), + ); + metrics } diff --git a/packages/udp-tracker-server/src/statistics/repository.rs b/packages/udp-tracker-server/src/statistics/repository.rs index eb0951614..94a86e3ab 100644 --- a/packages/udp-tracker-server/src/statistics/repository.rs +++ b/packages/udp-tracker-server/src/statistics/repository.rs @@ -73,85 +73,18 @@ impl Repository { result } - #[allow(clippy::cast_precision_loss)] - #[allow(clippy::cast_possible_truncation)] - #[allow(clippy::cast_sign_loss)] - pub async fn recalculate_udp_avg_connect_processing_time_ns(&self, req_processing_time: Duration) -> f64 { - let stats_lock = self.stats.write().await; - - let req_processing_time = req_processing_time.as_nanos() as f64; - let udp_connections_handled = (stats_lock.udp4_connections_handled() + stats_lock.udp6_connections_handled()) as f64; - - let previous_avg = stats_lock.udp_avg_connect_processing_time_ns(); - - // Moving average: https://en.wikipedia.org/wiki/Moving_average - let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_connections_handled; - - drop(stats_lock); - - tracing::debug!( - "Recalculated UDP average connect processing time: {} ns (previous: {} ns, req_processing_time: {} ns, udp_connections_handled: {})", - new_avg, - previous_avg, - req_processing_time, - udp_connections_handled - ); - - new_avg - } - - #[allow(clippy::cast_precision_loss)] - #[allow(clippy::cast_possible_truncation)] - #[allow(clippy::cast_sign_loss)] - pub async fn recalculate_udp_avg_announce_processing_time_ns(&self, req_processing_time: Duration) -> f64 { - let stats_lock = self.stats.write().await; - - let req_processing_time = req_processing_time.as_nanos() as f64; - - let udp_announces_handled = (stats_lock.udp4_announces_handled() + stats_lock.udp6_announces_handled()) as f64; - - let previous_avg = stats_lock.udp_avg_announce_processing_time_ns(); - - // Moving average: https://en.wikipedia.org/wiki/Moving_average - let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_announces_handled; - - drop(stats_lock); - - tracing::debug!( - "Recalculated UDP average announce processing time: {} ns (previous: {} ns, req_processing_time: {} ns, udp_announces_handled: {})", - new_avg, - previous_avg, - req_processing_time, - udp_announces_handled - ); - - new_avg - } - - #[allow(clippy::cast_precision_loss)] - #[allow(clippy::cast_possible_truncation)] - #[allow(clippy::cast_sign_loss)] - pub async fn recalculate_udp_avg_scrape_processing_time_ns(&self, req_processing_time: Duration) -> f64 { - let stats_lock = self.stats.write().await; - - let req_processing_time = req_processing_time.as_nanos() as f64; - let udp_scrapes_handled = (stats_lock.udp4_scrapes_handled() + stats_lock.udp6_scrapes_handled()) as f64; - - let previous_avg = stats_lock.udp_avg_scrape_processing_time_ns(); + pub async fn recalculate_udp_avg_processing_time_ns( + &self, + req_processing_time: Duration, + label_set: &LabelSet, + now: DurationSinceUnixEpoch, + ) -> f64 { + let mut stats_lock = self.stats.write().await; - // Moving average: https://en.wikipedia.org/wiki/Moving_average - let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_scrapes_handled; + let new_avg = stats_lock.recalculate_udp_avg_processing_time_ns(req_processing_time, label_set, now); drop(stats_lock); - tracing::debug!( - "Recalculated UDP average scrape processing time: {} ns (previous: {} ns, req_processing_time: {} ns, udp_scrapes_handled: {})", - new_avg, - previous_avg, - req_processing_time, - udp_scrapes_handled - ); - new_avg } } @@ -162,6 +95,7 @@ mod tests { use std::time::Duration; use torrust_tracker_clock::clock::Time; + use torrust_tracker_metrics::metric_collection::aggregate::sum::Sum; use torrust_tracker_metrics::metric_name; use super::*; @@ -222,8 +156,8 @@ mod tests { let stats = repo.get_stats().await; // Should be able to read metrics through the guard - assert_eq!(stats.udp_requests_aborted(), 0); - assert_eq!(stats.udp_requests_banned(), 0); + assert_eq!(stats.udp_requests_aborted_total(), 0); + assert_eq!(stats.udp_requests_banned_total(), 0); } #[tokio::test] @@ -241,7 +175,7 @@ mod tests { // Verify the counter was incremented let stats = repo.get_stats().await; - assert_eq!(stats.udp_requests_aborted(), 1); + assert_eq!(stats.udp_requests_aborted_total(), 1); } #[tokio::test] @@ -259,7 +193,7 @@ mod tests { // Verify the counter was incremented correctly let stats = repo.get_stats().await; - assert_eq!(stats.udp_requests_aborted(), 5); + assert_eq!(stats.udp_requests_aborted_total(), 5); } #[tokio::test] @@ -281,8 +215,8 @@ mod tests { // Verify both labeled metrics let stats = repo.get_stats().await; - assert_eq!(stats.udp4_requests(), 1); - assert_eq!(stats.udp6_requests(), 1); + assert_eq!(stats.udp4_requests_received_total(), 1); + assert_eq!(stats.udp6_requests_received_total(), 1); } #[tokio::test] @@ -353,8 +287,29 @@ mod tests { // Verify both labeled metrics let stats = repo.get_stats().await; - assert_eq!(stats.udp_avg_connect_processing_time_ns(), 1000); - assert_eq!(stats.udp_avg_announce_processing_time_ns(), 2000); + + #[allow(clippy::cast_sign_loss)] + #[allow(clippy::cast_possible_truncation)] + let udp_avg_connect_processing_time_ns = stats + .metric_collection + .sum( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &[("request_kind", "connect")].into(), + ) + .unwrap_or_default() as u64; + + #[allow(clippy::cast_sign_loss)] + #[allow(clippy::cast_possible_truncation)] + let udp_avg_announce_processing_time_ns = stats + .metric_collection + .sum( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), + &[("request_kind", "announce")].into(), + ) + .unwrap_or_default() as u64; + + assert_eq!(udp_avg_connect_processing_time_ns, 1000); + assert_eq!(udp_avg_announce_processing_time_ns, 2000); } #[tokio::test] @@ -362,21 +317,6 @@ mod tests { let repo = Repository::new(); let now = CurrentClock::now(); - // Set up initial connections handled - let ipv4_labels = LabelSet::from([("server_binding_address_ip_family", "inet"), ("request_kind", "connect")]); - let ipv6_labels = LabelSet::from([("server_binding_address_ip_family", "inet6"), ("request_kind", "connect")]); - - // Simulate 2 IPv4 and 1 IPv6 connections - repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now) - .await - .unwrap(); - repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now) - .await - .unwrap(); - repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv6_labels, now) - .await - .unwrap(); - // Set initial average to 1000ns let connect_labels = LabelSet::from([("request_kind", "connect")]); repo.set_gauge( @@ -389,12 +329,16 @@ mod tests { .unwrap(); // Calculate new average with processing time of 2000ns + // This will increment the processed requests counter from 0 to 1 let processing_time = Duration::from_nanos(2000); - let new_avg = repo.recalculate_udp_avg_connect_processing_time_ns(processing_time).await; + let new_avg = repo + .recalculate_udp_avg_processing_time_ns(processing_time, &connect_labels, now) + .await; - // Moving average: previous_avg + (new_value - previous_avg) / total_connections - // 1000 + (2000 - 1000) / 3 = 1000 + 333.33 = 1333.33 - let expected_avg = 1000.0 + (2000.0 - 1000.0) / 3.0; + // Moving average: previous_avg + (new_value - previous_avg) / processed_requests_total + // With processed_requests_total = 1 (incremented during the call): + // 1000 + (2000 - 1000) / 1 = 1000 + 1000 = 2000 + let expected_avg = 1000.0 + (2000.0 - 1000.0) / 1.0; assert!( (new_avg - expected_avg).abs() < 0.01, "Expected {expected_avg}, got {new_avg}" @@ -406,22 +350,6 @@ mod tests { let repo = Repository::new(); let now = CurrentClock::now(); - // Set up initial announces handled - let ipv4_labels = LabelSet::from([("server_binding_address_ip_family", "inet"), ("request_kind", "announce")]); - let ipv6_labels = LabelSet::from([("server_binding_address_ip_family", "inet6"), ("request_kind", "announce")]); - - // Simulate 3 IPv4 and 2 IPv6 announces - for _ in 0..3 { - repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now) - .await - .unwrap(); - } - for _ in 0..2 { - repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv6_labels, now) - .await - .unwrap(); - } - // Set initial average to 500ns let announce_labels = LabelSet::from([("request_kind", "announce")]); repo.set_gauge( @@ -434,12 +362,16 @@ mod tests { .unwrap(); // Calculate new average with processing time of 1500ns + // This will increment the processed requests counter from 0 to 1 let processing_time = Duration::from_nanos(1500); - let new_avg = repo.recalculate_udp_avg_announce_processing_time_ns(processing_time).await; + let new_avg = repo + .recalculate_udp_avg_processing_time_ns(processing_time, &announce_labels, now) + .await; - // Moving average: previous_avg + (new_value - previous_avg) / total_announces - // 500 + (1500 - 500) / 5 = 500 + 200 = 700 - let expected_avg = 500.0 + (1500.0 - 500.0) / 5.0; + // Moving average: previous_avg + (new_value - previous_avg) / processed_requests_total + // With processed_requests_total = 1 (incremented during the call): + // 500 + (1500 - 500) / 1 = 500 + 1000 = 1500 + let expected_avg = 500.0 + (1500.0 - 500.0) / 1.0; assert!( (new_avg - expected_avg).abs() < 0.01, "Expected {expected_avg}, got {new_avg}" @@ -451,16 +383,6 @@ mod tests { let repo = Repository::new(); let now = CurrentClock::now(); - // Set up initial scrapes handled - let ipv4_labels = LabelSet::from([("server_binding_address_ip_family", "inet"), ("request_kind", "scrape")]); - - // Simulate 4 IPv4 scrapes - for _ in 0..4 { - repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now) - .await - .unwrap(); - } - // Set initial average to 800ns let scrape_labels = LabelSet::from([("request_kind", "scrape")]); repo.set_gauge( @@ -473,12 +395,16 @@ mod tests { .unwrap(); // Calculate new average with processing time of 1200ns + // This will increment the processed requests counter from 0 to 1 let processing_time = Duration::from_nanos(1200); - let new_avg = repo.recalculate_udp_avg_scrape_processing_time_ns(processing_time).await; + let new_avg = repo + .recalculate_udp_avg_processing_time_ns(processing_time, &scrape_labels, now) + .await; - // Moving average: previous_avg + (new_value - previous_avg) / total_scrapes - // 800 + (1200 - 800) / 4 = 800 + 100 = 900 - let expected_avg = 800.0 + (1200.0 - 800.0) / 4.0; + // Moving average: previous_avg + (new_value - previous_avg) / processed_requests_total + // With processed_requests_total = 1 (incremented during the call): + // 800 + (1200 - 800) / 1 = 800 + 400 = 1200 + let expected_avg = 800.0 + (1200.0 - 800.0) / 1.0; assert!( (new_avg - expected_avg).abs() < 0.01, "Expected {expected_avg}, got {new_avg}" @@ -488,19 +414,31 @@ mod tests { #[tokio::test] async fn recalculate_average_methods_should_handle_zero_connections_gracefully() { let repo = Repository::new(); + let now = CurrentClock::now(); // Test with zero connections (should not panic, should handle division by zero) let processing_time = Duration::from_nanos(1000); - let connect_avg = repo.recalculate_udp_avg_connect_processing_time_ns(processing_time).await; - let announce_avg = repo.recalculate_udp_avg_announce_processing_time_ns(processing_time).await; - let scrape_avg = repo.recalculate_udp_avg_scrape_processing_time_ns(processing_time).await; + let connect_labels = LabelSet::from([("request_kind", "connect")]); + let connect_avg = repo + .recalculate_udp_avg_processing_time_ns(processing_time, &connect_labels, now) + .await; + + let announce_labels = LabelSet::from([("request_kind", "announce")]); + let announce_avg = repo + .recalculate_udp_avg_processing_time_ns(processing_time, &announce_labels, now) + .await; + + let scrape_labels = LabelSet::from([("request_kind", "scrape")]); + let scrape_avg = repo + .recalculate_udp_avg_processing_time_ns(processing_time, &scrape_labels, now) + .await; // With 0 total connections, the formula becomes 0 + (1000 - 0) / 0 // This should handle the division by zero case gracefully - assert!(connect_avg.is_infinite() || connect_avg.is_nan()); - assert!(announce_avg.is_infinite() || announce_avg.is_nan()); - assert!(scrape_avg.is_infinite() || scrape_avg.is_nan()); + assert!((connect_avg - 1000.0).abs() < f64::EPSILON); + assert!((announce_avg - 1000.0).abs() < f64::EPSILON); + assert!((scrape_avg - 1000.0).abs() < f64::EPSILON); } #[tokio::test] @@ -536,7 +474,7 @@ mod tests { // Verify all increments were properly recorded let stats = repo.get_stats().await; - assert_eq!(stats.udp_requests_aborted(), 50); // 10 tasks * 5 increments each + assert_eq!(stats.udp_requests_aborted_total(), 50); // 10 tasks * 5 increments each } #[tokio::test] @@ -552,7 +490,10 @@ mod tests { // Test with very large processing time let large_duration = Duration::from_secs(1); // 1 second = 1,000,000,000 ns - let new_avg = repo.recalculate_udp_avg_connect_processing_time_ns(large_duration).await; + let connect_labels = LabelSet::from([("request_kind", "connect")]); + let new_avg = repo + .recalculate_udp_avg_processing_time_ns(large_duration, &connect_labels, now) + .await; // Should handle large numbers without overflow assert!(new_avg > 0.0); @@ -592,9 +533,9 @@ mod tests { // Check final state let stats = repo.get_stats().await; - assert_eq!(stats.udp_requests_aborted(), 1); + assert_eq!(stats.udp_requests_aborted_total(), 1); assert_eq!(stats.udp_banned_ips_total(), 10); - assert_eq!(stats.udp_requests_banned(), 1); + assert_eq!(stats.udp_requests_banned_total(), 1); } #[tokio::test] @@ -624,46 +565,211 @@ mod tests { assert!(result.is_ok()); } - #[tokio::test] - async fn it_should_handle_moving_average_calculation_before_any_connections_are_recorded() { - let repo = Repository::new(); - let now = CurrentClock::now(); + mod race_conditions { - // This test checks the behavior of `recalculate_udp_avg_connect_processing_time_ns`` - // when no connections have been recorded yet. The first call should - // handle division by zero gracefully and return an infinite average, - // which is the current behavior. + use core::f64; + use std::time::Duration; - // todo: the first average should be 2000ns, not infinity. - // This is because the first connection is not counted in the average - // calculation if the counter is increased after calculating the average. - // The problem is that we count requests when they are accepted, not - // when they are processed. And we calculate the average when the - // response is sent. + use tokio::task::JoinHandle; + use torrust_tracker_clock::clock::Time; + use torrust_tracker_metrics::metric_name; - // First calculation: no connections recorded yet, should result in infinity - let processing_time_1 = Duration::from_nanos(2000); - let avg_1 = repo.recalculate_udp_avg_connect_processing_time_ns(processing_time_1).await; + use super::*; + use crate::CurrentClock; - // Division by zero: 1000 + (2000 - 1000) / 0 = infinity - assert!( - avg_1.is_infinite(), - "First calculation should be infinite due to division by zero" - ); + #[tokio::test] + async fn it_should_handle_race_conditions_when_updating_udp_performance_metrics_in_parallel() { + const REQUESTS_PER_SERVER: usize = 100; - // Now add one connection and try again - let ipv4_labels = LabelSet::from([("server_binding_address_ip_family", "inet"), ("request_kind", "connect")]); - repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now) - .await - .unwrap(); + // ** Set up test data and environment ** - // Second calculation: 1 connection, but previous average is infinity - let processing_time_2 = Duration::from_nanos(3000); - let avg_2 = repo.recalculate_udp_avg_connect_processing_time_ns(processing_time_2).await; + let repo = Repository::new(); + let now = CurrentClock::now(); - assert!( - (avg_2 - 3000.0).abs() < f64::EPSILON, - "Second calculation should be 3000ns, but got {avg_2}" + let server1_labels = create_server_metric_labels("6868"); + let server2_labels = create_server_metric_labels("6969"); + + // ** Execute concurrent metric updates ** + + // Spawn concurrent tasks for server 1 with processing times [1000, 2000, 3000, 4000, 5000] ns + let server1_handles = spawn_server_tasks(&repo, &server1_labels, 1000, now, REQUESTS_PER_SERVER); + + // Spawn concurrent tasks for server 2 with processing times [2000, 3000, 4000, 5000, 6000] ns + let server2_handles = spawn_server_tasks(&repo, &server2_labels, 2000, now, REQUESTS_PER_SERVER); + + // Wait for both servers' results + let (server1_results, server2_results) = tokio::join!( + collect_concurrent_task_results(server1_handles), + collect_concurrent_task_results(server2_handles) + ); + + // ** Verify results and metrics ** + + // Verify correctness of concurrent operations + assert_server_results_are_valid(&server1_results, "Server 1", REQUESTS_PER_SERVER); + assert_server_results_are_valid(&server2_results, "Server 2", REQUESTS_PER_SERVER); + + let stats = repo.get_stats().await; + + // Verify each server's metrics individually + let server1_avg = assert_server_metrics_are_correct(&stats, &server1_labels, "Server 1", REQUESTS_PER_SERVER, 3000.0); + let server2_avg = assert_server_metrics_are_correct(&stats, &server2_labels, "Server 2", REQUESTS_PER_SERVER, 4000.0); + + // Verify relationship between servers + assert_server_metrics_relationship(server1_avg, server2_avg); + + // Verify each server's result consistency individually + assert_server_result_matches_stored_average(&server1_results, &stats, &server1_labels, "Server 1"); + assert_server_result_matches_stored_average(&server2_results, &stats, &server2_labels, "Server 2"); + + // Verify metric collection integrity + assert_metric_collection_integrity(&stats); + } + + // Test helper functions to hide implementation details + + fn create_server_metric_labels(port: &str) -> LabelSet { + LabelSet::from([ + ("request_kind", "connect"), + ("server_binding_address_ip_family", "inet"), + ("server_port", port), + ]) + } + + fn spawn_server_tasks( + repo: &Repository, + labels: &LabelSet, + base_processing_time_ns: usize, + now: DurationSinceUnixEpoch, + requests_per_server: usize, + ) -> Vec> { + let mut handles = vec![]; + + for i in 0..requests_per_server { + let repo_clone = repo.clone(); + let labels_clone = labels.clone(); + let handle = tokio::spawn(async move { + let processing_time_ns = base_processing_time_ns + (i % 5) * 1000; + let processing_time = Duration::from_nanos(processing_time_ns as u64); + repo_clone + .recalculate_udp_avg_processing_time_ns(processing_time, &labels_clone, now) + .await + }); + handles.push(handle); + } + + handles + } + + async fn collect_concurrent_task_results(handles: Vec>) -> Vec { + let mut server_results = Vec::new(); + + for handle in handles { + let result = handle.await.unwrap(); + server_results.push(result); + } + + server_results + } + + fn assert_server_results_are_valid(results: &[f64], server_name: &str, expected_count: usize) { + // Verify all tasks completed + assert_eq!( + results.len(), + expected_count, + "{server_name} should have {expected_count} results" + ); + + // Verify all results are valid numbers + for result in results { + assert!(result.is_finite(), "{server_name} result should be finite: {result}"); + assert!(*result > 0.0, "{server_name} result should be positive: {result}"); + } + } + + fn assert_server_metrics_are_correct( + stats: &Metrics, + labels: &LabelSet, + server_name: &str, + expected_request_count: usize, + expected_avg_ns: f64, + ) -> f64 { + // Verify request count + let processed_requests = get_processed_requests_count(stats, labels); + assert_eq!( + processed_requests, expected_request_count as u64, + "{server_name} should have processed {expected_request_count} requests" + ); + + // Verify average processing time is within expected range + let avg_processing_time = get_average_processing_time(stats, labels); + assert!( + (avg_processing_time - expected_avg_ns).abs() < 50.0, + "{server_name} average should be ~{expected_avg_ns}ns (±50ns), got {avg_processing_time}ns" + ); + + avg_processing_time + } + + fn assert_server_metrics_relationship(server1_avg: f64, server2_avg: f64) { + const MIN_DIFFERENCE_NS: f64 = 950.0; + + assert_averages_are_significantly_different(server1_avg, server2_avg, MIN_DIFFERENCE_NS); + assert_server_ordering_is_correct(server1_avg, server2_avg); + } + + fn assert_averages_are_significantly_different(avg1: f64, avg2: f64, min_difference: f64) { + let difference = (avg1 - avg2).abs(); + assert!( + difference > min_difference, + "Server averages should differ by more than {min_difference}ns, but difference was {difference}ns" + ); + } + + fn assert_server_ordering_is_correct(server1_avg: f64, server2_avg: f64) { + // Server 2 should have higher average since it has higher processing times [2000-6000] vs [1000-5000] + assert!( + server2_avg > server1_avg, + "Server 2 average ({server2_avg}ns) should be higher than Server 1 ({server1_avg}ns) due to higher processing time ranges" ); + } + + fn assert_server_result_matches_stored_average(results: &[f64], stats: &Metrics, labels: &LabelSet, server_name: &str) { + let final_avg = get_average_processing_time(stats, labels); + let last_result = results.last().copied().unwrap(); + + assert!( + (last_result - final_avg).abs() <= f64::EPSILON, + "{server_name} last result ({last_result}) should match final average ({final_avg}) exactly" + ); + } + + fn assert_metric_collection_integrity(stats: &Metrics) { + assert!(stats + .metric_collection + .contains_gauge(&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS))); + assert!(stats + .metric_collection + .contains_counter(&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL))); + } + + fn get_processed_requests_count(stats: &Metrics, labels: &LabelSet) -> u64 { + stats + .metric_collection + .get_counter_value( + &metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL), + labels, + ) + .unwrap() + .value() + } + + fn get_average_processing_time(stats: &Metrics, labels: &LabelSet) -> f64 { + stats + .metric_collection + .get_gauge_value(&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS), labels) + .unwrap() + .value() + } } } diff --git a/packages/udp-tracker-server/tests/server/contract.rs b/packages/udp-tracker-server/tests/server/contract.rs index 2745f3407..da08bc177 100644 --- a/packages/udp-tracker-server/tests/server/contract.rs +++ b/packages/udp-tracker-server/tests/server/contract.rs @@ -273,7 +273,7 @@ mod receiving_an_announce_request { .stats_repository .get_stats() .await - .udp_requests_banned(); + .udp_requests_banned_total(); // This should return a timeout error match client.send(announce_request.into()).await { @@ -289,7 +289,7 @@ mod receiving_an_announce_request { .stats_repository .get_stats() .await - .udp_requests_banned(); + .udp_requests_banned_total(); let udp_banned_ips_total_after = ban_service.read().await.get_banned_ips_total(); // UDP counter for banned requests should be increased by 1