Skip to content

Commit 6aaa4e6

Browse files
committed
fix: [#1589] add dedicated metric for UDP request processing in moving average calculation
Add a new metric `UDP_TRACKER_SERVER_PERFORMANCE_PROCESSED_REQUESTS_TOTAL` to track requests processed specifically for performance metrics, eliminating race conditions in the moving average calculation. **Changes:** - Add new metric constant `UDP_TRACKER_SERVER_PERFORMANCE_PROCESSED_REQUESTS_TOTAL` - Update `recalculate_udp_avg_processing_time_ns()` to use dedicated counter instead of accepted requests total - Add `udp_processed_requests_total()` method to retrieve the new metric value - Add `increment_udp_processed_requests_total()` helper method - Update metric descriptions to include the new counter **Problem Fixed:** Previously, the moving average calculation used the accepted requests counter that could be updated independently, causing race conditions where the same request count was used for multiple calculations. The new implementation increments its own dedicated counter atomically during the calculation, ensuring consistency. **Behavior Change:** The counter now starts at 0 and gets incremented to 1 on the first calculation call, then uses proper moving average formula for subsequent calls. This eliminates division by zero issues and provides more accurate moving averages. **Tests Updated:** Updated repository tests to reflect the new atomic behavior where the processed requests counter is managed specifically for moving average calculations. Fixes race conditions in UDP request processing time metrics while maintaining backward compatibility of all public APIs.
1 parent 164de92 commit 6aaa4e6

File tree

3 files changed

+103
-91
lines changed

3 files changed

+103
-91
lines changed

packages/udp-tracker-server/src/statistics/metrics.rs

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ use torrust_tracker_metrics::metric_name;
99
use torrust_tracker_primitives::DurationSinceUnixEpoch;
1010

1111
use crate::statistics::{
12-
UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_IPS_BANNED_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS,
12+
UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_IPS_BANNED_TOTAL,
13+
UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS,
1314
UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL,
1415
UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL,
1516
UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL,
@@ -57,26 +58,22 @@ impl Metrics {
5758
label_set: &LabelSet,
5859
now: DurationSinceUnixEpoch,
5960
) -> f64 {
60-
let req_processing_time = req_processing_time.as_nanos() as f64;
61-
62-
let request_accepted_total = self.udp_request_accepted(label_set) as f64;
61+
self.increment_udp_processed_requests_total(label_set, now);
6362

63+
let processed_requests_total = self.udp_processed_requests_total(label_set) as f64;
6464
let previous_avg = self.udp_avg_processing_time_ns(label_set);
65+
let req_processing_time = req_processing_time.as_nanos() as f64;
6566

66-
let new_avg = if request_accepted_total == 0.0 {
67-
req_processing_time
68-
} else {
69-
// Moving average: https://en.wikipedia.org/wiki/Moving_average
70-
previous_avg as f64 + (req_processing_time - previous_avg as f64) / request_accepted_total
71-
};
67+
// Moving average: https://en.wikipedia.org/wiki/Moving_average
68+
let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / processed_requests_total;
7269

7370
tracing::debug!(
74-
"Recalculated UDP average processing time for labels {}: {} ns (previous: {} ns, req_processing_time: {} ns, request_accepted_total: {})",
71+
"Recalculated UDP average processing time for labels {}: {} ns (previous: {} ns, req_processing_time: {} ns, request_processed_total: {})",
7572
label_set,
7673
new_avg,
7774
previous_avg,
7875
req_processing_time,
79-
request_accepted_total
76+
processed_requests_total
8077
);
8178

8279
self.update_udp_avg_processing_time_ns(new_avg, label_set, now);
@@ -105,6 +102,18 @@ impl Metrics {
105102
.unwrap_or_default() as u64
106103
}
107104

105+
#[must_use]
106+
#[allow(clippy::cast_sign_loss)]
107+
#[allow(clippy::cast_possible_truncation)]
108+
pub fn udp_processed_requests_total(&self, label_set: &LabelSet) -> u64 {
109+
self.metric_collection
110+
.sum(
111+
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL),
112+
label_set,
113+
)
114+
.unwrap_or_default() as u64
115+
}
116+
108117
fn update_udp_avg_processing_time_ns(&mut self, new_avg: f64, label_set: &LabelSet, now: DurationSinceUnixEpoch) {
109118
tracing::debug!(
110119
"Updating average processing time metric to {} ns for label set {}",
@@ -123,6 +132,19 @@ impl Metrics {
123132
}
124133
}
125134

135+
fn increment_udp_processed_requests_total(&mut self, label_set: &LabelSet, now: DurationSinceUnixEpoch) {
136+
tracing::debug!("Incrementing processed requests total for label set {}", label_set,);
137+
138+
match self.increase_counter(
139+
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL),
140+
label_set,
141+
now,
142+
) {
143+
Ok(()) => {}
144+
Err(err) => tracing::error!("Failed to increment counter: {}", err),
145+
}
146+
}
147+
126148
// UDP
127149
/// Total number of UDP (UDP tracker) requests aborted.
128150
#[must_use]
@@ -360,9 +382,10 @@ mod tests {
360382
use super::*;
361383
use crate::statistics::{
362384
UDP_TRACKER_SERVER_ERRORS_TOTAL, UDP_TRACKER_SERVER_IPS_BANNED_TOTAL,
363-
UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS, UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL,
364-
UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL,
365-
UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL, UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL,
385+
UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL, UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS,
386+
UDP_TRACKER_SERVER_REQUESTS_ABORTED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL,
387+
UDP_TRACKER_SERVER_REQUESTS_BANNED_TOTAL, UDP_TRACKER_SERVER_REQUESTS_RECEIVED_TOTAL,
388+
UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL,
366389
};
367390
use crate::CurrentClock;
368391

@@ -437,6 +460,31 @@ mod tests {
437460
assert!(result.is_ok());
438461
}
439462

463+
#[test]
464+
fn it_should_return_zero_for_udp_processed_requests_total_when_no_data() {
465+
let metrics = Metrics::default();
466+
let labels = LabelSet::from([("request_kind", "connect")]);
467+
assert_eq!(metrics.udp_processed_requests_total(&labels), 0);
468+
}
469+
470+
#[test]
471+
fn it_should_increment_processed_requests_total() {
472+
let mut metrics = Metrics::default();
473+
let now = CurrentClock::now();
474+
let labels = LabelSet::from([("request_kind", "connect")]);
475+
476+
// Directly increment the counter using the public method
477+
metrics
478+
.increase_counter(
479+
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL),
480+
&labels,
481+
now,
482+
)
483+
.unwrap();
484+
485+
assert_eq!(metrics.udp_processed_requests_total(&labels), 1);
486+
}
487+
440488
mod udp_general_metrics {
441489
use super::*;
442490

packages/udp-tracker-server/src/statistics/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub const UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL: &str = "udp_tracker_server
1717
pub const UDP_TRACKER_SERVER_RESPONSES_SENT_TOTAL: &str = "udp_tracker_server_responses_sent_total";
1818
pub const UDP_TRACKER_SERVER_ERRORS_TOTAL: &str = "udp_tracker_server_errors_total";
1919
pub const UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS: &str = "udp_tracker_server_performance_avg_processing_time_ns";
20+
pub const UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL: &str =
21+
"udp_tracker_server_performance_avg_processed_requests_total";
2022

2123
#[must_use]
2224
pub fn describe_metrics() -> Metrics {
@@ -76,5 +78,13 @@ pub fn describe_metrics() -> Metrics {
7678
Some(MetricDescription::new("Average time to process a UDP request in nanoseconds")),
7779
);
7880

81+
metrics.metric_collection.describe_counter(
82+
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL),
83+
Some(Unit::Count),
84+
Some(MetricDescription::new(
85+
"Total number of UDP requests processed for the average performance metrics",
86+
)),
87+
);
88+
7989
metrics
8090
}

packages/udp-tracker-server/src/statistics/repository.rs

Lines changed: 30 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -295,21 +295,6 @@ mod tests {
295295
let repo = Repository::new();
296296
let now = CurrentClock::now();
297297

298-
// Set up initial connections handled
299-
let ipv4_labels = LabelSet::from([("server_binding_address_ip_family", "inet"), ("request_kind", "connect")]);
300-
let ipv6_labels = LabelSet::from([("server_binding_address_ip_family", "inet6"), ("request_kind", "connect")]);
301-
302-
// Simulate 2 IPv4 and 1 IPv6 connections
303-
repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now)
304-
.await
305-
.unwrap();
306-
repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now)
307-
.await
308-
.unwrap();
309-
repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv6_labels, now)
310-
.await
311-
.unwrap();
312-
313298
// Set initial average to 1000ns
314299
let connect_labels = LabelSet::from([("request_kind", "connect")]);
315300
repo.set_gauge(
@@ -322,14 +307,16 @@ mod tests {
322307
.unwrap();
323308

324309
// Calculate new average with processing time of 2000ns
310+
// This will increment the processed requests counter from 0 to 1
325311
let processing_time = Duration::from_nanos(2000);
326312
let new_avg = repo
327313
.recalculate_udp_avg_processing_time_ns(processing_time, &connect_labels, now)
328314
.await;
329315

330-
// Moving average: previous_avg + (new_value - previous_avg) / total_connections
331-
// 1000 + (2000 - 1000) / 3 = 1000 + 333.33 = 1333.33
332-
let expected_avg = 1000.0 + (2000.0 - 1000.0) / 3.0;
316+
// Moving average: previous_avg + (new_value - previous_avg) / processed_requests_total
317+
// With processed_requests_total = 1 (incremented during the call):
318+
// 1000 + (2000 - 1000) / 1 = 1000 + 1000 = 2000
319+
let expected_avg = 1000.0 + (2000.0 - 1000.0) / 1.0;
333320
assert!(
334321
(new_avg - expected_avg).abs() < 0.01,
335322
"Expected {expected_avg}, got {new_avg}"
@@ -341,22 +328,6 @@ mod tests {
341328
let repo = Repository::new();
342329
let now = CurrentClock::now();
343330

344-
// Set up initial announces handled
345-
let ipv4_labels = LabelSet::from([("server_binding_address_ip_family", "inet"), ("request_kind", "announce")]);
346-
let ipv6_labels = LabelSet::from([("server_binding_address_ip_family", "inet6"), ("request_kind", "announce")]);
347-
348-
// Simulate 3 IPv4 and 2 IPv6 announces
349-
for _ in 0..3 {
350-
repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now)
351-
.await
352-
.unwrap();
353-
}
354-
for _ in 0..2 {
355-
repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv6_labels, now)
356-
.await
357-
.unwrap();
358-
}
359-
360331
// Set initial average to 500ns
361332
let announce_labels = LabelSet::from([("request_kind", "announce")]);
362333
repo.set_gauge(
@@ -369,14 +340,16 @@ mod tests {
369340
.unwrap();
370341

371342
// Calculate new average with processing time of 1500ns
343+
// This will increment the processed requests counter from 0 to 1
372344
let processing_time = Duration::from_nanos(1500);
373345
let new_avg = repo
374346
.recalculate_udp_avg_processing_time_ns(processing_time, &announce_labels, now)
375347
.await;
376348

377-
// Moving average: previous_avg + (new_value - previous_avg) / total_announces
378-
// 500 + (1500 - 500) / 5 = 500 + 200 = 700
379-
let expected_avg = 500.0 + (1500.0 - 500.0) / 5.0;
349+
// Moving average: previous_avg + (new_value - previous_avg) / processed_requests_total
350+
// With processed_requests_total = 1 (incremented during the call):
351+
// 500 + (1500 - 500) / 1 = 500 + 1000 = 1500
352+
let expected_avg = 500.0 + (1500.0 - 500.0) / 1.0;
380353
assert!(
381354
(new_avg - expected_avg).abs() < 0.01,
382355
"Expected {expected_avg}, got {new_avg}"
@@ -388,16 +361,6 @@ mod tests {
388361
let repo = Repository::new();
389362
let now = CurrentClock::now();
390363

391-
// Set up initial scrapes handled
392-
let ipv4_labels = LabelSet::from([("server_binding_address_ip_family", "inet"), ("request_kind", "scrape")]);
393-
394-
// Simulate 4 IPv4 scrapes
395-
for _ in 0..4 {
396-
repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now)
397-
.await
398-
.unwrap();
399-
}
400-
401364
// Set initial average to 800ns
402365
let scrape_labels = LabelSet::from([("request_kind", "scrape")]);
403366
repo.set_gauge(
@@ -410,14 +373,16 @@ mod tests {
410373
.unwrap();
411374

412375
// Calculate new average with processing time of 1200ns
376+
// This will increment the processed requests counter from 0 to 1
413377
let processing_time = Duration::from_nanos(1200);
414378
let new_avg = repo
415379
.recalculate_udp_avg_processing_time_ns(processing_time, &scrape_labels, now)
416380
.await;
417381

418-
// Moving average: previous_avg + (new_value - previous_avg) / total_scrapes
419-
// 800 + (1200 - 800) / 4 = 800 + 100 = 900
420-
let expected_avg = 800.0 + (1200.0 - 800.0) / 4.0;
382+
// Moving average: previous_avg + (new_value - previous_avg) / processed_requests_total
383+
// With processed_requests_total = 1 (incremented during the call):
384+
// 800 + (1200 - 800) / 1 = 800 + 400 = 1200
385+
let expected_avg = 800.0 + (1200.0 - 800.0) / 1.0;
421386
assert!(
422387
(new_avg - expected_avg).abs() < 0.01,
423388
"Expected {expected_avg}, got {new_avg}"
@@ -584,49 +549,38 @@ mod tests {
584549
let connect_labels = LabelSet::from([("request_kind", "connect")]);
585550
let now = CurrentClock::now();
586551

587-
// This test checks the behavior of `recalculate_udp_avg_connect_processing_time_ns``
588-
// when no connections have been recorded yet. The first call should
589-
// handle division by zero gracefully and return an infinite average,
590-
// which is the current behavior.
552+
// This test checks the behavior of `recalculate_udp_avg_processing_time_ns`
553+
// when no processed requests have been recorded yet. The first call should
554+
// handle division by zero gracefully and set the first average to the
555+
// processing time of the first request.
591556

592-
// todo: the first average should be 2000ns, not infinity.
593-
// This is because the first connection is not counted in the average
594-
// calculation if the counter is increased after calculating the average.
595-
// The problem is that we count requests when they are accepted, not
596-
// when they are processed. And we calculate the average when the
597-
// response is sent.
598-
599-
// First calculation: no connections recorded yet, should result in infinity
557+
// First calculation: no processed requests recorded yet
600558
let processing_time_1 = Duration::from_nanos(2000);
601559
let avg_1 = repo
602560
.recalculate_udp_avg_processing_time_ns(processing_time_1, &connect_labels, now)
603561
.await;
604562

563+
// The first average should be the first processing time since processed_requests_total is 0
564+
// When processed_requests_total == 0.0, new_avg = req_processing_time
605565
assert!(
606566
(avg_1 - 2000.0).abs() < f64::EPSILON,
607567
"First calculation should be 2000, but got {avg_1}"
608568
);
609569

610-
// Now add one connection and try again
611-
let ipv4_labels = LabelSet::from([("server_binding_address_ip_family", "inet"), ("request_kind", "connect")]);
612-
repo.increase_counter(&metric_name!(UDP_TRACKER_SERVER_REQUESTS_ACCEPTED_TOTAL), &ipv4_labels, now)
613-
.await
614-
.unwrap();
615-
616-
// Second calculation: 1 connection
570+
// Second calculation: now we have one processed request (incremented during first call)
617571
let processing_time_2 = Duration::from_nanos(3000);
618-
let connect_labels = LabelSet::from([("request_kind", "connect")]);
619572
let avg_2 = repo
620573
.recalculate_udp_avg_processing_time_ns(processing_time_2, &connect_labels, now)
621574
.await;
622575

623-
// There is one connection, so the average should be:
624-
// 2000 + (3000 - 2000) / 1 = 2000 + 1000 = 3000
625-
// This is because one connection is not counted yet in the average calculation,
626-
// so the average is simply the processing time of the second connection.
576+
// Moving average calculation: previous_avg + (new_value - previous_avg) / processed_requests_total
577+
// After first call: processed_requests_total = 1, avg = 2000
578+
// During second call: processed_requests_total incremented to 2
579+
// new_avg = 2000 + (3000 - 2000) / 2 = 2000 + 500 = 2500
580+
let expected_avg_2 = 2000.0 + (3000.0 - 2000.0) / 2.0;
627581
assert!(
628-
(avg_2 - 3000.0).abs() < f64::EPSILON,
629-
"Second calculation should be 3000ns, but got {avg_2}"
582+
(avg_2 - expected_avg_2).abs() < f64::EPSILON,
583+
"Second calculation should be {expected_avg_2}ns, but got {avg_2}"
630584
);
631585
}
632586
}

0 commit comments

Comments
 (0)