diff --git a/Cargo.lock b/Cargo.lock index ab898e327..6e4ab415f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,6 +4832,7 @@ dependencies = [ "aquatic_udp_protocol", "async-std", "bittorrent-primitives", + "chrono", "criterion", "crossbeam-skiplist", "futures", diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index 98ae5817d..510a59e9d 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -18,6 +18,7 @@ version.workspace = true [dependencies] aquatic_udp_protocol = "0" bittorrent-primitives = "0.1.0" +chrono = { version = "0", default-features = false, features = ["clock"] } crossbeam-skiplist = "0" futures = "0" serde = { version = "1.0.219", features = ["derive"] } diff --git a/packages/torrent-repository/src/statistics/activity_metrics_updater.rs b/packages/torrent-repository/src/statistics/activity_metrics_updater.rs new file mode 100644 index 000000000..2dfa5fb4e --- /dev/null +++ b/packages/torrent-repository/src/statistics/activity_metrics_updater.rs @@ -0,0 +1,104 @@ +//! Job that runs a task on intervals to update peers' activity metrics. +use std::sync::Arc; + +use chrono::Utc; +use tokio::task::JoinHandle; +use torrust_tracker_clock::clock::Time; +use torrust_tracker_metrics::label::LabelSet; +use torrust_tracker_metrics::metric_name; +use torrust_tracker_primitives::DurationSinceUnixEpoch; +use tracing::instrument; + +use super::repository::Repository; +use crate::statistics::{TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL, TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL}; +use crate::{CurrentClock, Swarms}; + +#[must_use] +#[instrument(skip(swarms, stats_repository))] +pub fn start_job( + swarms: &Arc, + stats_repository: &Arc, + inactivity_cutoff: DurationSinceUnixEpoch, +) -> JoinHandle<()> { + let weak_swarms = std::sync::Arc::downgrade(swarms); + let weak_stats_repository = std::sync::Arc::downgrade(stats_repository); + + let interval_in_secs = 15; // todo: make this configurable + + tokio::spawn(async move { + let interval = std::time::Duration::from_secs(interval_in_secs); + let mut interval = tokio::time::interval(interval); + interval.tick().await; + + loop { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::info!("Stopping peers activity metrics update job (ctrl-c signal received) ..."); + break; + } + _ = interval.tick() => { + if let (Some(swarms), Some(stats_repository)) = (weak_swarms.upgrade(), weak_stats_repository.upgrade()) { + update_activity_metrics(interval_in_secs, &swarms, &stats_repository, inactivity_cutoff).await; + } else { + tracing::info!("Stopping peers activity metrics update job (can't upgrade weak pointers) ..."); + break; + } + } + } + } + }) +} + +async fn update_activity_metrics( + interval_in_secs: u64, + swarms: &Arc, + stats_repository: &Arc, + inactivity_cutoff: DurationSinceUnixEpoch, +) { + let start_time = Utc::now().time(); + + tracing::debug!( + "Updating peers and torrents activity metrics (executed every {} secs) ...", + interval_in_secs + ); + + let activity_metadata = swarms.get_activity_metadata(inactivity_cutoff).await; + + activity_metadata.log(); + + update_inactive_peers_total(stats_repository, activity_metadata.inactive_peers_total).await; + update_inactive_torrents_total(stats_repository, activity_metadata.inactive_torrents_total).await; + + tracing::debug!( + "Peers and torrents activity metrics updated in {} ms", + (Utc::now().time() - start_time).num_milliseconds() + ); +} + +async fn update_inactive_peers_total(stats_repository: &Arc, inactive_peers_total: usize) { + #[allow(clippy::cast_precision_loss)] + let inactive_peers_total = inactive_peers_total as f64; + + let _unused = stats_repository + .set_gauge( + &metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL), + &LabelSet::default(), + inactive_peers_total, + CurrentClock::now(), + ) + .await; +} + +async fn update_inactive_torrents_total(stats_repository: &Arc, inactive_torrents_total: usize) { + #[allow(clippy::cast_precision_loss)] + let inactive_torrents_total = inactive_torrents_total as f64; + + let _unused = stats_repository + .set_gauge( + &metric_name!(TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL), + &LabelSet::default(), + inactive_torrents_total, + CurrentClock::now(), + ) + .await; +} diff --git a/packages/torrent-repository/src/statistics/mod.rs b/packages/torrent-repository/src/statistics/mod.rs index 7d3ad85ce..cfc252e34 100644 --- a/packages/torrent-repository/src/statistics/mod.rs +++ b/packages/torrent-repository/src/statistics/mod.rs @@ -1,3 +1,4 @@ +pub mod activity_metrics_updater; pub mod event; pub mod metrics; pub mod repository; @@ -14,6 +15,7 @@ const TORRENT_REPOSITORY_TORRENTS_REMOVED_TOTAL: &str = "torrent_repository_torr const TORRENT_REPOSITORY_TORRENTS_TOTAL: &str = "torrent_repository_torrents_total"; const TORRENT_REPOSITORY_TORRENTS_DOWNLOADS_TOTAL: &str = "torrent_repository_torrents_downloads_total"; +const TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL: &str = "torrent_repository_torrents_inactive_total"; // Peers metrics @@ -23,6 +25,7 @@ const TORRENT_REPOSITORY_PEERS_UPDATED_TOTAL: &str = "torrent_repository_peers_u const TORRENT_REPOSITORY_PEER_CONNECTIONS_TOTAL: &str = "torrent_repository_peer_connections_total"; const TORRENT_REPOSITORY_UNIQUE_PEERS_TOTAL: &str = "torrent_repository_unique_peers_total"; // todo: not implemented yet +const TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL: &str = "torrent_repository_peers_inactive_total"; #[must_use] pub fn describe_metrics() -> Metrics { @@ -54,6 +57,12 @@ pub fn describe_metrics() -> Metrics { Some(&MetricDescription::new("The total number of torrent downloads.")), ); + metrics.metric_collection.describe_gauge( + &metric_name!(TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL), + Some(Unit::Count), + Some(&MetricDescription::new("The total number of inactive torrents.")), + ); + // Peers metrics metrics.metric_collection.describe_counter( @@ -88,5 +97,11 @@ pub fn describe_metrics() -> Metrics { Some(&MetricDescription::new("The total number of unique peers.")), ); + metrics.metric_collection.describe_gauge( + &metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL), + Some(Unit::Count), + Some(&MetricDescription::new("The total number of inactive peers.")), + ); + metrics } diff --git a/packages/torrent-repository/src/statistics/repository.rs b/packages/torrent-repository/src/statistics/repository.rs index 1e376faf7..fe1292d00 100644 --- a/packages/torrent-repository/src/statistics/repository.rs +++ b/packages/torrent-repository/src/statistics/repository.rs @@ -57,6 +57,31 @@ impl Repository { result } + /// # Errors + /// + /// This function will return an error if the metric collection fails to + /// set the gauge. + pub async fn set_gauge( + &self, + metric_name: &MetricName, + labels: &LabelSet, + value: f64, + now: DurationSinceUnixEpoch, + ) -> Result<(), Error> { + let mut stats_lock = self.stats.write().await; + + let result = stats_lock.set_gauge(metric_name, labels, value, now); + + drop(stats_lock); + + match result { + Ok(()) => {} + Err(ref err) => tracing::error!("Failed to set the gauge: {}", err), + } + + result + } + /// # Errors /// /// This function will return an error if the metric collection fails to diff --git a/packages/torrent-repository/src/swarm.rs b/packages/torrent-repository/src/swarm.rs index f25304979..b9076289b 100644 --- a/packages/torrent-repository/src/swarm.rs +++ b/packages/torrent-repository/src/swarm.rs @@ -118,6 +118,25 @@ impl Swarm { (seeders, leechers) } + #[must_use] + pub fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize { + self.peers + .iter() + .filter(|(_, peer)| peer::ReadInfo::get_updated(&**peer) <= current_cutoff) + .count() + } + + #[must_use] + pub fn get_activity_metadata(&self, current_cutoff: DurationSinceUnixEpoch) -> ActivityMetadata { + let inactive_peers_total = self.count_inactive_peers(current_cutoff); + + let active_peers_total = self.len() - inactive_peers_total; + + let is_active = active_peers_total > 0; + + ActivityMetadata::new(is_active, active_peers_total, inactive_peers_total) + } + #[must_use] pub fn len(&self) -> usize { self.peers.len() @@ -288,6 +307,30 @@ impl Swarm { } } +#[derive(Clone)] +pub struct ActivityMetadata { + /// Indicates if the swarm is active. It's inactive if there are no active + /// peers. + pub is_active: bool, + + /// The number of active peers in the swarm. + pub active_peers_total: usize, + + /// The number of inactive peers in the swarm. + pub inactive_peers_total: usize, +} + +impl ActivityMetadata { + #[must_use] + pub fn new(is_active: bool, active_peers_total: usize, inactive_peers_total: usize) -> Self { + Self { + is_active, + active_peers_total, + inactive_peers_total, + } + } +} + #[cfg(test)] mod tests { @@ -435,6 +478,22 @@ mod tests { assert_eq!(swarm.peers_excluding(&peer2.peer_addr, None), [Arc::new(peer1)]); } + #[tokio::test] + async fn it_should_count_inactive_peers() { + let mut swarm = Swarm::new(&sample_info_hash(), 0, None); + let mut downloads_increased = false; + let one_second = DurationSinceUnixEpoch::new(1, 0); + + // Insert the peer + let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0); + let peer = PeerBuilder::default().last_updated_on(last_update_time).build(); + swarm.upsert_peer(peer.into(), &mut downloads_increased).await; + + let inactive_peers_total = swarm.count_inactive_peers(last_update_time + one_second); + + assert_eq!(inactive_peers_total, 1); + } + #[tokio::test] async fn it_should_remove_inactive_peers() { let mut swarm = Swarm::new(&sample_info_hash(), 0, None); diff --git a/packages/torrent-repository/src/swarms.rs b/packages/torrent-repository/src/swarms.rs index ac2490853..36f83070d 100644 --- a/packages/torrent-repository/src/swarms.rs +++ b/packages/torrent-repository/src/swarms.rs @@ -248,6 +248,44 @@ impl Swarms { } } + pub async fn get_activity_metadata(&self, current_cutoff: DurationSinceUnixEpoch) -> AggregateActivityMetadata { + let mut active_peers_total = 0; + let mut inactive_peers_total = 0; + let mut active_torrents_total = 0; + + for swarm_handle in &self.swarms { + let swarm = swarm_handle.value().lock().await; + + let activity_metadata = swarm.get_activity_metadata(current_cutoff); + + if activity_metadata.is_active { + active_torrents_total += 1; + } + + active_peers_total += activity_metadata.active_peers_total; + inactive_peers_total += activity_metadata.inactive_peers_total; + } + + AggregateActivityMetadata { + active_peers_total, + inactive_peers_total, + active_torrents_total, + inactive_torrents_total: self.len() - active_torrents_total, + } + } + + /// Counts the number of inactive peers across all torrents. + pub async fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize { + let mut inactive_peers_total = 0; + + for swarm_handle in &self.swarms { + let swarm = swarm_handle.value().lock().await; + inactive_peers_total += swarm.count_inactive_peers(current_cutoff); + } + + inactive_peers_total + } + /// Removes inactive peers from all torrent entries. /// /// A peer is considered inactive if its last update timestamp is older than @@ -434,6 +472,31 @@ impl Swarms { #[derive(thiserror::Error, Debug, Clone)] pub enum Error {} +#[derive(Clone, Debug, Default)] +pub struct AggregateActivityMetadata { + /// The number of active peers in all swarms. + pub active_peers_total: usize, + + /// The number of inactive peers in all swarms. + pub inactive_peers_total: usize, + + /// The number of active torrents. + pub active_torrents_total: usize, + + /// The number of inactive torrents. + pub inactive_torrents_total: usize, +} + +impl AggregateActivityMetadata { + pub fn log(&self) { + tracing::info!( + active_peers_total = self.active_peers_total, + inactive_peers_total = self.inactive_peers_total, + active_torrents_total = self.active_torrents_total, + inactive_torrents_total = self.inactive_torrents_total + ); + } +} #[cfg(test)] mod tests { @@ -705,6 +768,22 @@ mod tests { assert!(swarms.get(&info_hash).is_none()); } + #[tokio::test] + async fn it_should_count_inactive_peers() { + let swarms = Arc::new(Swarms::default()); + + let info_hash = sample_info_hash(); + let mut peer = sample_peer(); + peer.updated = DurationSinceUnixEpoch::new(0, 0); + + swarms.handle_announcement(&info_hash, &peer, None).await.unwrap(); + + // Cut off time is 1 second after the peer was updated + let inactive_peers_total = swarms.count_inactive_peers(peer.updated.add(Duration::from_secs(1))).await; + + assert_eq!(inactive_peers_total, 1); + } + #[tokio::test] async fn it_should_remove_peers_that_have_not_been_updated_after_a_cutoff_time() { let swarms = Arc::new(Swarms::default()); diff --git a/packages/tracker-core/src/torrent/manager.rs b/packages/tracker-core/src/torrent/manager.rs index bc193bd4f..bf73f7e8b 100644 --- a/packages/tracker-core/src/torrent/manager.rs +++ b/packages/tracker-core/src/torrent/manager.rs @@ -4,6 +4,7 @@ use std::time::Duration; use torrust_tracker_clock::clock::Time; use torrust_tracker_configuration::Core; +use torrust_tracker_primitives::DurationSinceUnixEpoch; use super::repository::in_memory::InMemoryTorrentRepository; use super::repository::persisted::DatabasePersistentTorrentRepository; @@ -103,10 +104,13 @@ impl TorrentsManager { } async fn remove_inactive_peers(&self) { - let current_cutoff = CurrentClock::now_sub(&Duration::from_secs(u64::from(self.config.tracker_policy.max_peer_timeout))) - .unwrap_or_default(); + self.in_memory_torrent_repository + .remove_inactive_peers(self.current_cutoff()) + .await; + } - self.in_memory_torrent_repository.remove_inactive_peers(current_cutoff).await; + fn current_cutoff(&self) -> DurationSinceUnixEpoch { + CurrentClock::now_sub(&Duration::from_secs(u64::from(self.config.tracker_policy.max_peer_timeout))).unwrap_or_default() } async fn remove_peerless_torrents(&self) { diff --git a/src/app.rs b/src/app.rs index ca8b7a5c3..5180e4583 100644 --- a/src/app.rs +++ b/src/app.rs @@ -27,7 +27,9 @@ use torrust_tracker_configuration::{Configuration, HttpTracker, UdpTracker}; use tracing::instrument; use crate::bootstrap::jobs::manager::JobManager; -use crate::bootstrap::jobs::{self, health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker}; +use crate::bootstrap::jobs::{ + self, activity_metrics_updater, health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker, +}; use crate::bootstrap::{self}; use crate::container::AppContainer; @@ -79,8 +81,11 @@ async fn start_jobs(config: &Configuration, app_container: &Arc) - start_the_udp_instances(config, app_container, &mut job_manager).await; start_the_http_instances(config, app_container, &mut job_manager).await; - start_the_http_api(config, app_container, &mut job_manager).await; + start_torrent_cleanup(config, app_container, &mut job_manager); + start_peers_inactivity_update(config, app_container, &mut job_manager); + + start_the_http_api(config, app_container, &mut job_manager).await; start_health_check_api(config, app_container, &mut job_manager).await; job_manager @@ -260,6 +265,16 @@ fn start_torrent_cleanup(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { + if config.core.tracker_usage_statistics { + let handle = activity_metrics_updater::start_job(config, app_container); + + job_manager.push("peers_inactivity_update", handle); + } else { + tracing::info!("Peers inactivity update job is disabled."); + } +} + async fn start_health_check_api(config: &Configuration, app_container: &Arc, job_manager: &mut JobManager) { let handle = health_check_api::start_job(&config.health_check_api, app_container.registar.entries()).await; diff --git a/src/bootstrap/jobs/activity_metrics_updater.rs b/src/bootstrap/jobs/activity_metrics_updater.rs new file mode 100644 index 000000000..7411c05cf --- /dev/null +++ b/src/bootstrap/jobs/activity_metrics_updater.rs @@ -0,0 +1,27 @@ +//! Job that runs a task on intervals to update peers' activity metrics. +use std::sync::Arc; +use std::time::Duration; + +use tokio::task::JoinHandle; +use torrust_tracker_clock::clock::Time; +use torrust_tracker_configuration::Configuration; + +use crate::container::AppContainer; +use crate::CurrentClock; + +#[must_use] +pub fn start_job(config: &Configuration, app_container: &Arc) -> JoinHandle<()> { + torrust_tracker_torrent_repository::statistics::activity_metrics_updater::start_job( + &app_container.torrent_repository_container.swarms.clone(), + &app_container.torrent_repository_container.stats_repository.clone(), + peer_inactivity_cutoff_timestamp(config.core.tracker_policy.max_peer_timeout), + ) +} + +/// Returns the timestamp of the cutoff for inactive peers. +/// +/// Peers that has not been updated for more than `max_peer_timeout` seconds are +/// considered inactive. +fn peer_inactivity_cutoff_timestamp(max_peer_timeout: u32) -> Duration { + CurrentClock::now_sub(&Duration::from_secs(u64::from(max_peer_timeout))).unwrap_or_default() +} diff --git a/src/bootstrap/jobs/mod.rs b/src/bootstrap/jobs/mod.rs index b311c6da6..c8d7a8598 100644 --- a/src/bootstrap/jobs/mod.rs +++ b/src/bootstrap/jobs/mod.rs @@ -6,6 +6,7 @@ //! 2. Launch all the application services as concurrent jobs. //! //! This modules contains all the functions needed to start those jobs. +pub mod activity_metrics_updater; pub mod health_check_api; pub mod http_tracker; pub mod http_tracker_core;