Skip to content

Commit 260f7ff

Browse files
committed
feat: [torrust#1523] add new metric: number of inactive peers
The metric is added to the `torrent-repository` package. The metric in Prometheus format: ``` torrent_repository_peers_inactive_total{} 0 ``` It was not included as a new label in the number of peers because it can't be calculated from current events. New inactivity events could have been added but the solution was much more complex than this and having two metrics counting peers is not so bad. The discarded alternative was addinga new label por satte (`active`, `inactive`).
1 parent fbefc88 commit 260f7ff

File tree

11 files changed

+211
-5
lines changed

11 files changed

+211
-5
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/torrent-repository/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ version.workspace = true
1818
[dependencies]
1919
aquatic_udp_protocol = "0"
2020
bittorrent-primitives = "0.1.0"
21+
chrono = { version = "0", default-features = false, features = ["clock"] }
2122
crossbeam-skiplist = "0"
2223
futures = "0"
2324
serde = { version = "1.0.219", features = ["derive"] }

packages/torrent-repository/src/statistics/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod event;
22
pub mod metrics;
3+
pub mod peers_inactivity_update;
34
pub mod repository;
45

56
use metrics::Metrics;
@@ -23,6 +24,7 @@ const TORRENT_REPOSITORY_PEERS_UPDATED_TOTAL: &str = "torrent_repository_peers_u
2324

2425
const TORRENT_REPOSITORY_PEER_CONNECTIONS_TOTAL: &str = "torrent_repository_peer_connections_total";
2526
const TORRENT_REPOSITORY_UNIQUE_PEERS_TOTAL: &str = "torrent_repository_unique_peers_total"; // todo: not implemented yet
27+
const TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL: &str = "torrent_repository_peers_inactive_total";
2628

2729
#[must_use]
2830
pub fn describe_metrics() -> Metrics {
@@ -88,5 +90,11 @@ pub fn describe_metrics() -> Metrics {
8890
Some(&MetricDescription::new("The total number of unique peers.")),
8991
);
9092

93+
metrics.metric_collection.describe_gauge(
94+
&metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL),
95+
Some(Unit::Count),
96+
Some(&MetricDescription::new("The total number of inactive peers.")),
97+
);
98+
9199
metrics
92100
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
//! Job that runs a task on intervals to update peers' inactivity metrics.
2+
use std::sync::Arc;
3+
4+
use chrono::Utc;
5+
use tokio::task::JoinHandle;
6+
use torrust_tracker_clock::clock::Time;
7+
use torrust_tracker_metrics::label::LabelSet;
8+
use torrust_tracker_metrics::metric_name;
9+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
10+
use tracing::instrument;
11+
12+
use super::repository::Repository;
13+
use crate::statistics::TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL;
14+
use crate::{CurrentClock, Swarms};
15+
16+
#[must_use]
17+
#[instrument(skip(swarms, stats_repository))]
18+
pub fn start_job(
19+
swarms: &Arc<Swarms>,
20+
stats_repository: &Arc<Repository>,
21+
inactivity_cutoff: DurationSinceUnixEpoch,
22+
) -> JoinHandle<()> {
23+
let weak_swarms = std::sync::Arc::downgrade(swarms);
24+
let weak_stats_repository = std::sync::Arc::downgrade(stats_repository);
25+
26+
let interval_in_secs = 15; // todo: make this configurable
27+
28+
tokio::spawn(async move {
29+
let interval = std::time::Duration::from_secs(interval_in_secs);
30+
let mut interval = tokio::time::interval(interval);
31+
interval.tick().await;
32+
33+
loop {
34+
tokio::select! {
35+
_ = tokio::signal::ctrl_c() => {
36+
tracing::info!("Stopping peers inactivity metrics update job ...");
37+
break;
38+
}
39+
_ = interval.tick() => {
40+
if let (Some(swarms), Some(stats_repository)) = (weak_swarms.upgrade(), weak_stats_repository.upgrade()) {
41+
let start_time = Utc::now().time();
42+
43+
tracing::debug!("Updating peers inactivity metrics (executed every {} secs) ...", interval_in_secs);
44+
45+
let inactive_peers_total = swarms.count_inactive_peers(inactivity_cutoff).await;
46+
47+
tracing::info!(inactive_peers_total = inactive_peers_total);
48+
49+
#[allow(clippy::cast_precision_loss)]
50+
let inactive_peers_total = inactive_peers_total as f64;
51+
52+
let _unused = stats_repository
53+
.set_gauge(
54+
&metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL),
55+
&LabelSet::default(),
56+
inactive_peers_total,
57+
CurrentClock::now(),
58+
)
59+
.await;
60+
61+
tracing::debug!(
62+
"Peers inactivity metrics updated in {} ms",
63+
(Utc::now().time() - start_time).num_milliseconds()
64+
);
65+
} else {
66+
break;
67+
}
68+
}
69+
}
70+
}
71+
})
72+
}

packages/torrent-repository/src/statistics/repository.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,31 @@ impl Repository {
5757
result
5858
}
5959

60+
/// # Errors
61+
///
62+
/// This function will return an error if the metric collection fails to
63+
/// set the gauge.
64+
pub async fn set_gauge(
65+
&self,
66+
metric_name: &MetricName,
67+
labels: &LabelSet,
68+
value: f64,
69+
now: DurationSinceUnixEpoch,
70+
) -> Result<(), Error> {
71+
let mut stats_lock = self.stats.write().await;
72+
73+
let result = stats_lock.set_gauge(metric_name, labels, value, now);
74+
75+
drop(stats_lock);
76+
77+
match result {
78+
Ok(()) => {}
79+
Err(ref err) => tracing::error!("Failed to set the gauge: {}", err),
80+
}
81+
82+
result
83+
}
84+
6085
/// # Errors
6186
///
6287
/// This function will return an error if the metric collection fails to

packages/torrent-repository/src/swarm.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@ impl Swarm {
118118
(seeders, leechers)
119119
}
120120

121+
#[must_use]
122+
pub fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize {
123+
self.peers
124+
.iter()
125+
.filter(|(_, peer)| peer::ReadInfo::get_updated(&**peer) <= current_cutoff)
126+
.count()
127+
}
128+
121129
#[must_use]
122130
pub fn len(&self) -> usize {
123131
self.peers.len()
@@ -435,6 +443,22 @@ mod tests {
435443
assert_eq!(swarm.peers_excluding(&peer2.peer_addr, None), [Arc::new(peer1)]);
436444
}
437445

446+
#[tokio::test]
447+
async fn it_should_count_inactive_peers() {
448+
let mut swarm = Swarm::new(&sample_info_hash(), 0, None);
449+
let mut downloads_increased = false;
450+
let one_second = DurationSinceUnixEpoch::new(1, 0);
451+
452+
// Insert the peer
453+
let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0);
454+
let peer = PeerBuilder::default().last_updated_on(last_update_time).build();
455+
swarm.upsert_peer(peer.into(), &mut downloads_increased).await;
456+
457+
let inactive_peers_total = swarm.count_inactive_peers(last_update_time + one_second);
458+
459+
assert_eq!(inactive_peers_total, 1);
460+
}
461+
438462
#[tokio::test]
439463
async fn it_should_remove_inactive_peers() {
440464
let mut swarm = Swarm::new(&sample_info_hash(), 0, None);

packages/torrent-repository/src/swarms.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,18 @@ impl Swarms {
248248
}
249249
}
250250

251+
/// Counts the number of inactive peers across all torrents.
252+
pub async fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize {
253+
let mut inactive_peers_total = 0;
254+
255+
for swarm_handle in &self.swarms {
256+
let swarm = swarm_handle.value().lock().await;
257+
inactive_peers_total += swarm.count_inactive_peers(current_cutoff);
258+
}
259+
260+
inactive_peers_total
261+
}
262+
251263
/// Removes inactive peers from all torrent entries.
252264
///
253265
/// A peer is considered inactive if its last update timestamp is older than
@@ -705,6 +717,22 @@ mod tests {
705717
assert!(swarms.get(&info_hash).is_none());
706718
}
707719

720+
#[tokio::test]
721+
async fn it_should_count_inactive_peers() {
722+
let swarms = Arc::new(Swarms::default());
723+
724+
let info_hash = sample_info_hash();
725+
let mut peer = sample_peer();
726+
peer.updated = DurationSinceUnixEpoch::new(0, 0);
727+
728+
swarms.handle_announcement(&info_hash, &peer, None).await.unwrap();
729+
730+
// Cut off time is 1 second after the peer was updated
731+
let inactive_peers_total = swarms.count_inactive_peers(peer.updated.add(Duration::from_secs(1))).await;
732+
733+
assert_eq!(inactive_peers_total, 1);
734+
}
735+
708736
#[tokio::test]
709737
async fn it_should_remove_peers_that_have_not_been_updated_after_a_cutoff_time() {
710738
let swarms = Arc::new(Swarms::default());

packages/tracker-core/src/torrent/manager.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::time::Duration;
44

55
use torrust_tracker_clock::clock::Time;
66
use torrust_tracker_configuration::Core;
7+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
78

89
use super::repository::in_memory::InMemoryTorrentRepository;
910
use super::repository::persisted::DatabasePersistentTorrentRepository;
@@ -103,10 +104,13 @@ impl TorrentsManager {
103104
}
104105

105106
async fn remove_inactive_peers(&self) {
106-
let current_cutoff = CurrentClock::now_sub(&Duration::from_secs(u64::from(self.config.tracker_policy.max_peer_timeout)))
107-
.unwrap_or_default();
107+
self.in_memory_torrent_repository
108+
.remove_inactive_peers(self.current_cutoff())
109+
.await;
110+
}
108111

109-
self.in_memory_torrent_repository.remove_inactive_peers(current_cutoff).await;
112+
fn current_cutoff(&self) -> DurationSinceUnixEpoch {
113+
CurrentClock::now_sub(&Duration::from_secs(u64::from(self.config.tracker_policy.max_peer_timeout))).unwrap_or_default()
110114
}
111115

112116
async fn remove_peerless_torrents(&self) {

src/app.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use torrust_tracker_configuration::{Configuration, HttpTracker, UdpTracker};
2727
use tracing::instrument;
2828

2929
use crate::bootstrap::jobs::manager::JobManager;
30-
use crate::bootstrap::jobs::{self, health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker};
30+
use crate::bootstrap::jobs::{
31+
self, health_check_api, http_tracker, peers_inactivity_update, torrent_cleanup, tracker_apis, udp_tracker,
32+
};
3133
use crate::bootstrap::{self};
3234
use crate::container::AppContainer;
3335

@@ -79,8 +81,11 @@ async fn start_jobs(config: &Configuration, app_container: &Arc<AppContainer>) -
7981

8082
start_the_udp_instances(config, app_container, &mut job_manager).await;
8183
start_the_http_instances(config, app_container, &mut job_manager).await;
82-
start_the_http_api(config, app_container, &mut job_manager).await;
84+
8385
start_torrent_cleanup(config, app_container, &mut job_manager);
86+
start_peers_inactivity_update(config, app_container, &mut job_manager);
87+
88+
start_the_http_api(config, app_container, &mut job_manager).await;
8489
start_health_check_api(config, app_container, &mut job_manager).await;
8590

8691
job_manager
@@ -260,6 +265,16 @@ fn start_torrent_cleanup(config: &Configuration, app_container: &Arc<AppContaine
260265
}
261266
}
262267

268+
fn start_peers_inactivity_update(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
269+
if config.core.tracker_usage_statistics {
270+
let handle = peers_inactivity_update::start_job(config, app_container);
271+
272+
job_manager.push("peers_inactivity_update", handle);
273+
} else {
274+
tracing::info!("Peers inactivity update job is disabled.");
275+
}
276+
}
277+
263278
async fn start_health_check_api(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
264279
let handle = health_check_api::start_job(&config.health_check_api, app_container.registar.entries()).await;
265280

src/bootstrap/jobs/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod health_check_api;
1010
pub mod http_tracker;
1111
pub mod http_tracker_core;
1212
pub mod manager;
13+
pub mod peers_inactivity_update;
1314
pub mod torrent_cleanup;
1415
pub mod torrent_repository;
1516
pub mod tracker_apis;

0 commit comments

Comments
 (0)