Skip to content

Commit 1508bfb

Browse files
committed
feat: [#1358] basic scaffolding for events in torrent-repository pkg
TODO: - Run the event listener for the torrent-repository package when the tracker starts. - Inject enven sender in `Swarms` and `Swarm` type to send events. - Trigger events and process them to update the metrics. - Expose the metrics via the `metrics` API endpoint. - ...
1 parent cb487f3 commit 1508bfb

File tree

10 files changed

+261
-0
lines changed

10 files changed

+261
-0
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/torrent-repository/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ version.workspace = true
1919
aquatic_udp_protocol = "0"
2020
bittorrent-primitives = "0.1.0"
2121
crossbeam-skiplist = "0"
22+
serde = "1.0.219"
2223
thiserror = "2.0.12"
2324
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
2425
torrust-tracker-clock = { version = "3.0.0-develop", path = "../clock" }
2526
torrust-tracker-configuration = { version = "3.0.0-develop", path = "../configuration" }
27+
torrust-tracker-events = { version = "3.0.0-develop", path = "../events" }
28+
torrust-tracker-metrics = { version = "3.0.0-develop", path = "../metrics" }
2629
torrust-tracker-primitives = { version = "3.0.0-develop", path = "../primitives" }
2730
tracing = "0"
2831

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use std::net::SocketAddr;
2+
3+
use aquatic_udp_protocol::PeerId;
4+
use bittorrent_primitives::info_hash::InfoHash;
5+
use torrust_tracker_primitives::peer::PeerAnnouncement;
6+
7+
#[derive(Debug, PartialEq, Eq, Clone)]
8+
pub enum Event {
9+
TorrentAdded {
10+
info_hash: InfoHash,
11+
announcement: PeerAnnouncement,
12+
},
13+
TorrentRemoved {
14+
info_hash: InfoHash,
15+
},
16+
PeerAdded {
17+
announcement: PeerAnnouncement,
18+
},
19+
PeerRemoved {
20+
socket_addr: SocketAddr,
21+
peer_id: PeerId,
22+
},
23+
}
24+
25+
pub mod sender {
26+
use std::sync::Arc;
27+
28+
use super::Event;
29+
30+
pub type Sender = Option<Arc<dyn torrust_tracker_events::sender::Sender<Event = Event>>>;
31+
pub type Broadcaster = torrust_tracker_events::broadcaster::Broadcaster<Event>;
32+
}
33+
34+
pub mod receiver {
35+
use super::Event;
36+
37+
pub type Receiver = Box<dyn torrust_tracker_events::receiver::Receiver<Event = Event>>;
38+
}
39+
40+
pub mod bus {
41+
use crate::event::Event;
42+
43+
pub type EventBus = torrust_tracker_events::bus::EventBus<Event>;
44+
}

packages/torrent-repository/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod event;
2+
pub mod statistics;
13
pub mod swarm;
24
pub mod swarms;
35

@@ -19,6 +21,8 @@ pub(crate) type CurrentClock = clock::Working;
1921
#[allow(dead_code)]
2022
pub(crate) type CurrentClock = clock::Stopped;
2123

24+
pub const TORRENT_REPOSITORY_LOG_TARGET: &str = "TORRENT_REPOSITORY";
25+
2226
pub trait LockTrackedTorrent {
2327
fn lock_or_panic(&self) -> MutexGuard<'_, Swarm>;
2428
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use std::sync::Arc;
2+
3+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
4+
5+
use crate::event::Event;
6+
use crate::statistics::repository::Repository;
7+
8+
/// # Panics
9+
///
10+
/// This function panics if the client IP address is not the same as the IP
11+
/// version of the event.
12+
pub async fn handle_event(_event: Event, stats_repository: &Arc<Repository>, _now: DurationSinceUnixEpoch) {
13+
/*match event {
14+
Event::TorrentAdded { .. } => {}
15+
Event::TorrentRemoved { .. } => {}
16+
Event::PeerAdded { .. } => {}
17+
Event::PeerRemoved { .. } => {}
18+
}*/
19+
20+
tracing::debug!("metrics: {:?}", stats_repository.get_metrics().await);
21+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::sync::Arc;
2+
3+
use tokio::task::JoinHandle;
4+
use torrust_tracker_clock::clock::Time;
5+
use torrust_tracker_events::receiver::RecvError;
6+
7+
use super::handler::handle_event;
8+
use crate::event::receiver::Receiver;
9+
use crate::statistics::repository::Repository;
10+
use crate::{CurrentClock, TORRENT_REPOSITORY_LOG_TARGET};
11+
12+
#[must_use]
13+
pub fn run_event_listener(receiver: Receiver, repository: &Arc<Repository>) -> JoinHandle<()> {
14+
let stats_repository = repository.clone();
15+
16+
tracing::info!(target: TORRENT_REPOSITORY_LOG_TARGET, "Starting torrent repository event listener");
17+
18+
tokio::spawn(async move {
19+
dispatch_events(receiver, stats_repository).await;
20+
21+
tracing::info!(target: TORRENT_REPOSITORY_LOG_TARGET, "Torrent repository listener finished");
22+
})
23+
}
24+
25+
async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc<Repository>) {
26+
let shutdown_signal = tokio::signal::ctrl_c();
27+
28+
tokio::pin!(shutdown_signal);
29+
30+
loop {
31+
tokio::select! {
32+
biased;
33+
34+
_ = &mut shutdown_signal => {
35+
tracing::info!(target: TORRENT_REPOSITORY_LOG_TARGET, "Received Ctrl+C, shutting down torrent repository event listener.");
36+
break;
37+
}
38+
39+
result = receiver.recv() => {
40+
match result {
41+
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
42+
Err(e) => {
43+
match e {
44+
RecvError::Closed => {
45+
tracing::info!(target: TORRENT_REPOSITORY_LOG_TARGET, "Torrent repository event receiver closed.");
46+
break;
47+
}
48+
RecvError::Lagged(n) => {
49+
tracing::warn!(target: TORRENT_REPOSITORY_LOG_TARGET, "Torrent repository event receiver lagged by {} events.", n);
50+
}
51+
}
52+
}
53+
}
54+
}
55+
}
56+
}
57+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod handler;
2+
pub mod listener;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use serde::Serialize;
2+
use torrust_tracker_metrics::label::LabelSet;
3+
use torrust_tracker_metrics::metric::MetricName;
4+
use torrust_tracker_metrics::metric_collection::{Error, MetricCollection};
5+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
6+
7+
/// Metrics collected by the torrent repository.
8+
#[derive(Debug, Clone, PartialEq, Default, Serialize)]
9+
pub struct Metrics {
10+
/// A collection of metrics.
11+
pub metric_collection: MetricCollection,
12+
}
13+
14+
impl Metrics {
15+
/// # Errors
16+
///
17+
/// Returns an error if the metric does not exist and it cannot be created.
18+
pub fn increase_counter(
19+
&mut self,
20+
metric_name: &MetricName,
21+
labels: &LabelSet,
22+
now: DurationSinceUnixEpoch,
23+
) -> Result<(), Error> {
24+
self.metric_collection.increase_counter(metric_name, labels, now)
25+
}
26+
27+
/// # Errors
28+
///
29+
/// Returns an error if the metric does not exist and it cannot be created.
30+
pub fn set_gauge(
31+
&mut self,
32+
metric_name: &MetricName,
33+
labels: &LabelSet,
34+
value: f64,
35+
now: DurationSinceUnixEpoch,
36+
) -> Result<(), Error> {
37+
self.metric_collection.set_gauge(metric_name, labels, value, now)
38+
}
39+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
pub mod event;
2+
pub mod metrics;
3+
pub mod repository;
4+
5+
use metrics::Metrics;
6+
use torrust_tracker_metrics::metric::description::MetricDescription;
7+
use torrust_tracker_metrics::metric_name;
8+
use torrust_tracker_metrics::unit::Unit;
9+
10+
const TORRENT_REPOSITORY_RUNTIME_TORRENTS_DOWNLOADS_TOTAL: &str = "torrent_repository_runtime_torrents_downloads_total";
11+
const TORRENT_REPOSITORY_PERSISTENT_TORRENTS_DOWNLOADS_TOTAL: &str = "torrent_repository_persistent_torrents_downloads_total";
12+
13+
#[must_use]
14+
pub fn describe_metrics() -> Metrics {
15+
let mut metrics = Metrics::default();
16+
17+
metrics.metric_collection.describe_counter(
18+
&metric_name!(TORRENT_REPOSITORY_RUNTIME_TORRENTS_DOWNLOADS_TOTAL),
19+
Some(Unit::Count),
20+
Some(&MetricDescription::new(
21+
"The total number of torrent downloads since the tracker process started.",
22+
)),
23+
);
24+
25+
metrics.metric_collection.describe_counter(
26+
&metric_name!(TORRENT_REPOSITORY_PERSISTENT_TORRENTS_DOWNLOADS_TOTAL),
27+
Some(Unit::Count),
28+
Some(&MetricDescription::new(
29+
"The total number of torrent downloads since persistent statistics were enabled the first time.",
30+
)),
31+
);
32+
33+
metrics
34+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::sync::Arc;
2+
3+
use tokio::sync::{RwLock, RwLockReadGuard};
4+
use torrust_tracker_metrics::label::LabelSet;
5+
use torrust_tracker_metrics::metric::MetricName;
6+
use torrust_tracker_metrics::metric_collection::Error;
7+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
8+
9+
use super::describe_metrics;
10+
use super::metrics::Metrics;
11+
12+
/// A repository for the torrent repository metrics.
13+
#[derive(Clone)]
14+
pub struct Repository {
15+
pub stats: Arc<RwLock<Metrics>>,
16+
}
17+
18+
impl Default for Repository {
19+
fn default() -> Self {
20+
Self::new()
21+
}
22+
}
23+
24+
impl Repository {
25+
#[must_use]
26+
pub fn new() -> Self {
27+
let stats = Arc::new(RwLock::new(describe_metrics()));
28+
29+
Self { stats }
30+
}
31+
32+
pub async fn get_metrics(&self) -> RwLockReadGuard<'_, Metrics> {
33+
self.stats.read().await
34+
}
35+
36+
/// # Errors
37+
///
38+
/// This function will return an error if the metric collection fails to
39+
/// increase the counter.
40+
pub async fn increase_counter(
41+
&self,
42+
metric_name: &MetricName,
43+
labels: &LabelSet,
44+
now: DurationSinceUnixEpoch,
45+
) -> Result<(), Error> {
46+
let mut stats_lock = self.stats.write().await;
47+
48+
let result = stats_lock.increase_counter(metric_name, labels, now);
49+
50+
drop(stats_lock);
51+
52+
result
53+
}
54+
}

0 commit comments

Comments
 (0)