Skip to content

Commit c07f366

Browse files
committed
feat: [#1539] load global downloads counter from DB
When the tracker starts.
1 parent 9301e58 commit c07f366

File tree

12 files changed

+214
-18
lines changed

12 files changed

+214
-18
lines changed

packages/metrics/src/counter.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ impl Counter {
2020
pub fn increment(&mut self, value: u64) {
2121
self.0 += value;
2222
}
23+
24+
pub fn absolute(&mut self, value: u64) {
25+
self.0 = value;
26+
}
2327
}
2428

2529
impl From<u64> for Counter {
@@ -73,6 +77,13 @@ mod tests {
7377
assert_eq!(counter.value(), 3);
7478
}
7579

80+
#[test]
81+
fn it_could_set_to_an_absolute_value() {
82+
let mut counter = Counter::new(0);
83+
counter.absolute(1);
84+
assert_eq!(counter.value(), 1);
85+
}
86+
7687
#[test]
7788
fn it_serializes_to_prometheus() {
7889
let counter = Counter::new(42);

packages/metrics/src/metric/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ impl Metric<Counter> {
5555
pub fn increment(&mut self, label_set: &LabelSet, time: DurationSinceUnixEpoch) {
5656
self.sample_collection.increment(label_set, time);
5757
}
58+
59+
pub fn absolute(&mut self, label_set: &LabelSet, value: u64, time: DurationSinceUnixEpoch) {
60+
self.sample_collection.absolute(label_set, value, time);
61+
}
5862
}
5963

6064
impl Metric<Gauge> {

packages/metrics/src/metric_collection.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ impl MetricCollection {
7272
self.counters.get_value(name, label_set)
7373
}
7474

75+
/// Increases the counter for the given metric name and labels.
76+
///
7577
/// # Errors
7678
///
7779
/// Return an error if a metrics of a different type with the same name
@@ -93,6 +95,30 @@ impl MetricCollection {
9395
Ok(())
9496
}
9597

98+
/// Sets the counter for the given metric name and labels.
99+
///
100+
/// # Errors
101+
///
102+
/// Return an error if a metrics of a different type with the same name
103+
/// already exists.
104+
pub fn set_counter(
105+
&mut self,
106+
name: &MetricName,
107+
label_set: &LabelSet,
108+
value: u64,
109+
time: DurationSinceUnixEpoch,
110+
) -> Result<(), Error> {
111+
if self.gauges.metrics.contains_key(name) {
112+
return Err(Error::MetricNameCollisionAdding {
113+
metric_name: name.clone(),
114+
});
115+
}
116+
117+
self.counters.absolute(name, label_set, value, time);
118+
119+
Ok(())
120+
}
121+
96122
pub fn ensure_counter_exists(&mut self, name: &MetricName) {
97123
self.counters.ensure_metric_exists(name);
98124
}
@@ -361,7 +387,7 @@ impl MetricKindCollection<Counter> {
361387
///
362388
/// # Panics
363389
///
364-
/// Panics if the metric does not exist and it could not be created.
390+
/// Panics if the metric does not exist.
365391
pub fn increment(&mut self, name: &MetricName, label_set: &LabelSet, time: DurationSinceUnixEpoch) {
366392
self.ensure_metric_exists(name);
367393

@@ -370,6 +396,21 @@ impl MetricKindCollection<Counter> {
370396
metric.increment(label_set, time);
371397
}
372398

399+
/// Sets the counter to an absolute value for the given metric name and labels.
400+
///
401+
/// If the metric name does not exist, it will be created.
402+
///
403+
/// # Panics
404+
///
405+
/// Panics if the metric does not exist.
406+
pub fn absolute(&mut self, name: &MetricName, label_set: &LabelSet, value: u64, time: DurationSinceUnixEpoch) {
407+
self.ensure_metric_exists(name);
408+
409+
let metric = self.metrics.get_mut(name).expect("Counter metric should exist");
410+
411+
metric.absolute(label_set, value, time);
412+
}
413+
373414
#[must_use]
374415
pub fn get_value(&self, name: &MetricName, label_set: &LabelSet) -> Option<Counter> {
375416
self.metrics

packages/metrics/src/sample.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ impl Measurement<Counter> {
122122
self.value.increment(1);
123123
self.set_recorded_at(time);
124124
}
125+
126+
pub fn absolute(&mut self, value: u64, time: DurationSinceUnixEpoch) {
127+
self.value.absolute(value);
128+
self.set_recorded_at(time);
129+
}
125130
}
126131

127132
impl Measurement<Gauge> {

packages/metrics/src/sample_collection.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,15 @@ impl SampleCollection<Counter> {
7979

8080
sample.increment(time);
8181
}
82+
83+
pub fn absolute(&mut self, label_set: &LabelSet, value: u64, time: DurationSinceUnixEpoch) {
84+
let sample = self
85+
.samples
86+
.entry(label_set.clone())
87+
.or_insert_with(|| Measurement::new(Counter::default(), time));
88+
89+
sample.absolute(value, time);
90+
}
8291
}
8392

8493
impl SampleCollection<Gauge> {

packages/tracker-core/src/statistics/metrics.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,19 @@ impl Metrics {
2424
self.metric_collection.increase_counter(metric_name, labels, now)
2525
}
2626

27+
/// # Errors
28+
///
29+
/// Returns an error if the metric does not exist and it cannot be created.
30+
pub fn set_counter(
31+
&mut self,
32+
metric_name: &MetricName,
33+
labels: &LabelSet,
34+
value: u64,
35+
now: DurationSinceUnixEpoch,
36+
) -> Result<(), Error> {
37+
self.metric_collection.set_counter(metric_name, labels, value, now)
38+
}
39+
2740
/// # Errors
2841
///
2942
/// Returns an error if the metric does not exist and it cannot be created.

packages/tracker-core/src/statistics/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod event;
22
pub mod metrics;
3+
pub mod persisted_metrics;
34
pub mod repository;
45

56
use metrics::Metrics;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::sync::Arc;
2+
3+
use thiserror::Error;
4+
use torrust_tracker_metrics::label::LabelSet;
5+
use torrust_tracker_metrics::{metric_collection, metric_name};
6+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
7+
8+
use super::repository::Repository;
9+
use super::TRACKER_CORE_PERSISTENT_TORRENTS_DOWNLOADS_TOTAL;
10+
use crate::databases;
11+
use crate::torrent::repository::persisted::DatabasePersistentTorrentRepository;
12+
13+
/// Loads persisted metrics from the database and sets them in the stats repository.
14+
///
15+
/// # Errors
16+
///
17+
/// This function will return an error if the database query fails or if the
18+
/// metric collection fails to set the initial metric values.
19+
pub async fn load_persisted_metrics(
20+
stats_repository: &Arc<Repository>,
21+
db_torrent_repository: &Arc<DatabasePersistentTorrentRepository>,
22+
now: DurationSinceUnixEpoch,
23+
) -> Result<(), Error> {
24+
if let Some(downloads) = db_torrent_repository.load_global_number_of_downloads()? {
25+
stats_repository
26+
.set_counter(
27+
&metric_name!(TRACKER_CORE_PERSISTENT_TORRENTS_DOWNLOADS_TOTAL),
28+
&LabelSet::default(),
29+
u64::from(downloads),
30+
now,
31+
)
32+
.await?;
33+
}
34+
35+
Ok(())
36+
}
37+
38+
#[derive(Error, Debug, Clone)]
39+
pub enum Error {
40+
#[error("Database error: {err}")]
41+
DatabaseError { err: databases::error::Error },
42+
43+
#[error("Metrics error: {err}")]
44+
MetricsError { err: metric_collection::Error },
45+
}
46+
47+
impl From<databases::error::Error> for Error {
48+
fn from(err: databases::error::Error) -> Self {
49+
Self::DatabaseError { err }
50+
}
51+
}
52+
53+
impl From<metric_collection::Error> for Error {
54+
fn from(err: metric_collection::Error) -> Self {
55+
Self::MetricsError { err }
56+
}
57+
}

packages/tracker-core/src/statistics/repository.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,31 @@ impl Repository {
5757
result
5858
}
5959

60+
/// # Errors
61+
///
62+
/// This function will return an error if the metric collection fails to
63+
/// increment the counter.
64+
pub async fn set_counter(
65+
&self,
66+
metric_name: &MetricName,
67+
labels: &LabelSet,
68+
value: u64,
69+
now: DurationSinceUnixEpoch,
70+
) -> Result<(), Error> {
71+
let mut stats_lock = self.stats.write().await;
72+
73+
let result = stats_lock.set_counter(metric_name, labels, value, now);
74+
75+
drop(stats_lock);
76+
77+
match result {
78+
Ok(()) => {}
79+
Err(ref err) => tracing::error!("Failed to set the counter: {}", err),
80+
}
81+
82+
result
83+
}
84+
6085
/// # Errors
6186
///
6287
/// This function will return an error if the metric collection fails to

packages/tracker-core/src/torrent/repository/persisted.rs

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ impl DatabasePersistentTorrentRepository {
4747
}
4848
}
4949

50+
// Single Torrent Metrics
51+
5052
/// Increases the number of downloads for a given torrent.
5153
///
5254
/// If the torrent is not found, it creates a new entry.
@@ -67,22 +69,6 @@ impl DatabasePersistentTorrentRepository {
6769
}
6870
}
6971

70-
/// Increases the global number of downloads for all torrent.
71-
///
72-
/// If the metric is not found, it creates it.
73-
///
74-
/// # Errors
75-
///
76-
/// Returns an [`Error`] if the database operation fails.
77-
pub(crate) fn increase_global_number_of_downloads(&self) -> Result<(), Error> {
78-
let torrent = self.database.load_global_number_of_downloads()?;
79-
80-
match torrent {
81-
Some(_number_of_downloads) => self.database.increase_global_number_of_downloads(),
82-
None => self.database.save_global_number_of_downloads(1),
83-
}
84-
}
85-
8672
/// Loads all persistent torrent metrics from the database.
8773
///
8874
/// This function retrieves the torrent metrics (e.g., download counts) from the persistent store
@@ -123,6 +109,33 @@ impl DatabasePersistentTorrentRepository {
123109
pub(crate) fn save(&self, info_hash: &InfoHash, downloaded: u32) -> Result<(), Error> {
124110
self.database.save_persistent_torrent(info_hash, downloaded)
125111
}
112+
113+
// Aggregate Metrics
114+
115+
/// Increases the global number of downloads for all torrent.
116+
///
117+
/// If the metric is not found, it creates it.
118+
///
119+
/// # Errors
120+
///
121+
/// Returns an [`Error`] if the database operation fails.
122+
pub(crate) fn increase_global_number_of_downloads(&self) -> Result<(), Error> {
123+
let torrent = self.database.load_global_number_of_downloads()?;
124+
125+
match torrent {
126+
Some(_number_of_downloads) => self.database.increase_global_number_of_downloads(),
127+
None => self.database.save_global_number_of_downloads(1),
128+
}
129+
}
130+
131+
/// Loads the global number of downloads for all torrents from the database.
132+
///
133+
/// # Errors
134+
///
135+
/// Returns an [`Error`] if the underlying database query fails.
136+
pub(crate) fn load_global_number_of_downloads(&self) -> Result<Option<PersistentTorrent>, Error> {
137+
self.database.load_global_number_of_downloads()
138+
}
126139
}
127140

128141
#[cfg(test)]

0 commit comments

Comments
 (0)