Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/torrent-repository/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ version.workspace = true
[dependencies]
aquatic_udp_protocol = "0"
bittorrent-primitives = "0.1.0"
chrono = { version = "0", default-features = false, features = ["clock"] }
crossbeam-skiplist = "0"
futures = "0"
serde = { version = "1.0.219", features = ["derive"] }
Expand Down
104 changes: 104 additions & 0 deletions packages/torrent-repository/src/statistics/activity_metrics_updater.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! Job that runs a task on intervals to update peers' activity metrics.
use std::sync::Arc;

use chrono::Utc;
use tokio::task::JoinHandle;
use torrust_tracker_clock::clock::Time;
use torrust_tracker_metrics::label::LabelSet;
use torrust_tracker_metrics::metric_name;
use torrust_tracker_primitives::DurationSinceUnixEpoch;
use tracing::instrument;

use super::repository::Repository;
use crate::statistics::{TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL, TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL};
use crate::{CurrentClock, Swarms};

#[must_use]
#[instrument(skip(swarms, stats_repository))]
pub fn start_job(
swarms: &Arc<Swarms>,
stats_repository: &Arc<Repository>,
inactivity_cutoff: DurationSinceUnixEpoch,
) -> JoinHandle<()> {
let weak_swarms = std::sync::Arc::downgrade(swarms);
let weak_stats_repository = std::sync::Arc::downgrade(stats_repository);

let interval_in_secs = 15; // todo: make this configurable

tokio::spawn(async move {
let interval = std::time::Duration::from_secs(interval_in_secs);
let mut interval = tokio::time::interval(interval);
interval.tick().await;

loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::info!("Stopping peers activity metrics update job (ctrl-c signal received) ...");
break;
}
_ = interval.tick() => {
if let (Some(swarms), Some(stats_repository)) = (weak_swarms.upgrade(), weak_stats_repository.upgrade()) {
update_activity_metrics(interval_in_secs, &swarms, &stats_repository, inactivity_cutoff).await;
} else {
tracing::info!("Stopping peers activity metrics update job (can't upgrade weak pointers) ...");
break;
}
}
}
}
})
}

async fn update_activity_metrics(
interval_in_secs: u64,
swarms: &Arc<Swarms>,
stats_repository: &Arc<Repository>,
inactivity_cutoff: DurationSinceUnixEpoch,
) {
let start_time = Utc::now().time();

tracing::debug!(
"Updating peers and torrents activity metrics (executed every {} secs) ...",
interval_in_secs
);

let activity_metadata = swarms.get_activity_metadata(inactivity_cutoff).await;

activity_metadata.log();

update_inactive_peers_total(stats_repository, activity_metadata.inactive_peers_total).await;
update_inactive_torrents_total(stats_repository, activity_metadata.inactive_torrents_total).await;

tracing::debug!(
"Peers and torrents activity metrics updated in {} ms",
(Utc::now().time() - start_time).num_milliseconds()
);
}

async fn update_inactive_peers_total(stats_repository: &Arc<Repository>, inactive_peers_total: usize) {
#[allow(clippy::cast_precision_loss)]
let inactive_peers_total = inactive_peers_total as f64;

let _unused = stats_repository
.set_gauge(
&metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL),
&LabelSet::default(),
inactive_peers_total,
CurrentClock::now(),
)
.await;
}

async fn update_inactive_torrents_total(stats_repository: &Arc<Repository>, inactive_torrents_total: usize) {
#[allow(clippy::cast_precision_loss)]
let inactive_torrents_total = inactive_torrents_total as f64;

let _unused = stats_repository
.set_gauge(
&metric_name!(TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL),
&LabelSet::default(),
inactive_torrents_total,
CurrentClock::now(),
)
.await;
}
15 changes: 15 additions & 0 deletions packages/torrent-repository/src/statistics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod activity_metrics_updater;
pub mod event;
pub mod metrics;
pub mod repository;
Expand All @@ -14,6 +15,7 @@ const TORRENT_REPOSITORY_TORRENTS_REMOVED_TOTAL: &str = "torrent_repository_torr

const TORRENT_REPOSITORY_TORRENTS_TOTAL: &str = "torrent_repository_torrents_total";
const TORRENT_REPOSITORY_TORRENTS_DOWNLOADS_TOTAL: &str = "torrent_repository_torrents_downloads_total";
const TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL: &str = "torrent_repository_torrents_inactive_total";

// Peers metrics

Expand All @@ -23,6 +25,7 @@ const TORRENT_REPOSITORY_PEERS_UPDATED_TOTAL: &str = "torrent_repository_peers_u

const TORRENT_REPOSITORY_PEER_CONNECTIONS_TOTAL: &str = "torrent_repository_peer_connections_total";
const TORRENT_REPOSITORY_UNIQUE_PEERS_TOTAL: &str = "torrent_repository_unique_peers_total"; // todo: not implemented yet
const TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL: &str = "torrent_repository_peers_inactive_total";

#[must_use]
pub fn describe_metrics() -> Metrics {
Expand Down Expand Up @@ -54,6 +57,12 @@ pub fn describe_metrics() -> Metrics {
Some(&MetricDescription::new("The total number of torrent downloads.")),
);

metrics.metric_collection.describe_gauge(
&metric_name!(TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL),
Some(Unit::Count),
Some(&MetricDescription::new("The total number of inactive torrents.")),
);

// Peers metrics

metrics.metric_collection.describe_counter(
Expand Down Expand Up @@ -88,5 +97,11 @@ pub fn describe_metrics() -> Metrics {
Some(&MetricDescription::new("The total number of unique peers.")),
);

metrics.metric_collection.describe_gauge(
&metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL),
Some(Unit::Count),
Some(&MetricDescription::new("The total number of inactive peers.")),
);

metrics
}
25 changes: 25 additions & 0 deletions packages/torrent-repository/src/statistics/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ impl Repository {
result
}

/// # Errors
///
/// This function will return an error if the metric collection fails to
/// set the gauge.
pub async fn set_gauge(
&self,
metric_name: &MetricName,
labels: &LabelSet,
value: f64,
now: DurationSinceUnixEpoch,
) -> Result<(), Error> {
let mut stats_lock = self.stats.write().await;

let result = stats_lock.set_gauge(metric_name, labels, value, now);

drop(stats_lock);

match result {
Ok(()) => {}
Err(ref err) => tracing::error!("Failed to set the gauge: {}", err),
}

result
}

/// # Errors
///
/// This function will return an error if the metric collection fails to
Expand Down
59 changes: 59 additions & 0 deletions packages/torrent-repository/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ impl Swarm {
(seeders, leechers)
}

#[must_use]
pub fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize {
self.peers
.iter()
.filter(|(_, peer)| peer::ReadInfo::get_updated(&**peer) <= current_cutoff)
.count()
}

#[must_use]
pub fn get_activity_metadata(&self, current_cutoff: DurationSinceUnixEpoch) -> ActivityMetadata {
let inactive_peers_total = self.count_inactive_peers(current_cutoff);

let active_peers_total = self.len() - inactive_peers_total;

let is_active = active_peers_total > 0;

ActivityMetadata::new(is_active, active_peers_total, inactive_peers_total)
}

#[must_use]
pub fn len(&self) -> usize {
self.peers.len()
Expand Down Expand Up @@ -288,6 +307,30 @@ impl Swarm {
}
}

#[derive(Clone)]
pub struct ActivityMetadata {
/// Indicates if the swarm is active. It's inactive if there are no active
/// peers.
pub is_active: bool,

/// The number of active peers in the swarm.
pub active_peers_total: usize,

/// The number of inactive peers in the swarm.
pub inactive_peers_total: usize,
}

impl ActivityMetadata {
#[must_use]
pub fn new(is_active: bool, active_peers_total: usize, inactive_peers_total: usize) -> Self {
Self {
is_active,
active_peers_total,
inactive_peers_total,
}
}
}

#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -435,6 +478,22 @@ mod tests {
assert_eq!(swarm.peers_excluding(&peer2.peer_addr, None), [Arc::new(peer1)]);
}

#[tokio::test]
async fn it_should_count_inactive_peers() {
let mut swarm = Swarm::new(&sample_info_hash(), 0, None);
let mut downloads_increased = false;
let one_second = DurationSinceUnixEpoch::new(1, 0);

// Insert the peer
let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0);
let peer = PeerBuilder::default().last_updated_on(last_update_time).build();
swarm.upsert_peer(peer.into(), &mut downloads_increased).await;

let inactive_peers_total = swarm.count_inactive_peers(last_update_time + one_second);

assert_eq!(inactive_peers_total, 1);
}

#[tokio::test]
async fn it_should_remove_inactive_peers() {
let mut swarm = Swarm::new(&sample_info_hash(), 0, None);
Expand Down
79 changes: 79 additions & 0 deletions packages/torrent-repository/src/swarms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,44 @@ impl Swarms {
}
}

pub async fn get_activity_metadata(&self, current_cutoff: DurationSinceUnixEpoch) -> AggregateActivityMetadata {
let mut active_peers_total = 0;
let mut inactive_peers_total = 0;
let mut active_torrents_total = 0;

for swarm_handle in &self.swarms {
let swarm = swarm_handle.value().lock().await;

let activity_metadata = swarm.get_activity_metadata(current_cutoff);

if activity_metadata.is_active {
active_torrents_total += 1;
}

active_peers_total += activity_metadata.active_peers_total;
inactive_peers_total += activity_metadata.inactive_peers_total;
}

AggregateActivityMetadata {
active_peers_total,
inactive_peers_total,
active_torrents_total,
inactive_torrents_total: self.len() - active_torrents_total,
}
}

/// Counts the number of inactive peers across all torrents.
pub async fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize {
let mut inactive_peers_total = 0;

for swarm_handle in &self.swarms {
let swarm = swarm_handle.value().lock().await;
inactive_peers_total += swarm.count_inactive_peers(current_cutoff);
}

inactive_peers_total
}

/// Removes inactive peers from all torrent entries.
///
/// A peer is considered inactive if its last update timestamp is older than
Expand Down Expand Up @@ -434,6 +472,31 @@ impl Swarms {
#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {}

#[derive(Clone, Debug, Default)]
pub struct AggregateActivityMetadata {
/// The number of active peers in all swarms.
pub active_peers_total: usize,

/// The number of inactive peers in all swarms.
pub inactive_peers_total: usize,

/// The number of active torrents.
pub active_torrents_total: usize,

/// The number of inactive torrents.
pub inactive_torrents_total: usize,
}

impl AggregateActivityMetadata {
pub fn log(&self) {
tracing::info!(
active_peers_total = self.active_peers_total,
inactive_peers_total = self.inactive_peers_total,
active_torrents_total = self.active_torrents_total,
inactive_torrents_total = self.inactive_torrents_total
);
}
}
#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -705,6 +768,22 @@ mod tests {
assert!(swarms.get(&info_hash).is_none());
}

#[tokio::test]
async fn it_should_count_inactive_peers() {
let swarms = Arc::new(Swarms::default());

let info_hash = sample_info_hash();
let mut peer = sample_peer();
peer.updated = DurationSinceUnixEpoch::new(0, 0);

swarms.handle_announcement(&info_hash, &peer, None).await.unwrap();

// Cut off time is 1 second after the peer was updated
let inactive_peers_total = swarms.count_inactive_peers(peer.updated.add(Duration::from_secs(1))).await;

assert_eq!(inactive_peers_total, 1);
}

#[tokio::test]
async fn it_should_remove_peers_that_have_not_been_updated_after_a_cutoff_time() {
let swarms = Arc::new(Swarms::default());
Expand Down
Loading
Loading