Skip to content

Commit 734a23c

Browse files
committed
refactor: [torrust#1444] http core event listener start in app start. Step 1
This is the first step in a bigger refactor to move the start of event listeners from app container instantiation to app start (jobs creation).
1 parent d80bfc0 commit 734a23c

File tree

9 files changed

+67
-32
lines changed

9 files changed

+67
-32
lines changed

packages/axum-http-tracker-server/src/server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,6 @@ mod tests {
274274
let (http_stats_event_sender, http_stats_repository) =
275275
bittorrent_http_tracker_core::statistics::setup::factory(configuration.core.tracker_usage_statistics);
276276
let http_stats_event_sender = Arc::new(http_stats_event_sender);
277-
let http_stats_repository = Arc::new(http_stats_repository);
278277

279278
let tracker_core_container = Arc::new(TrackerCoreContainer::initialize(&core_config));
280279

packages/http-tracker-core/src/container.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ impl HttpTrackerCoreServices {
6565
let (http_stats_event_sender, http_stats_repository) =
6666
statistics::setup::factory(tracker_core_container.core_config.tracker_usage_statistics);
6767
let http_stats_event_sender = Arc::new(http_stats_event_sender);
68-
let http_stats_repository = Arc::new(http_stats_repository);
6968
let http_announce_service = Arc::new(AnnounceService::new(
7069
tracker_core_container.core_config.clone(),
7170
tracker_core_container.announce_handler.clone(),

packages/http-tracker-core/src/event/sender.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub trait Sender: Sync + Send {
1616
}
1717

1818
/// An event sender implementation using a broadcast channel.
19+
#[derive(Clone)]
1920
pub struct Broadcaster {
2021
pub(crate) sender: broadcast::Sender<Event>,
2122
}

packages/http-tracker-core/src/statistics/event/handler.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::net::IpAddr;
2+
use std::sync::Arc;
23

34
use torrust_tracker_metrics::label::{LabelSet, LabelValue};
45
use torrust_tracker_metrics::{label_name, metric_name};
@@ -12,7 +13,7 @@ use crate::statistics::HTTP_TRACKER_CORE_REQUESTS_RECEIVED_TOTAL;
1213
///
1314
/// This function panics if the client IP address is not the same as the IP
1415
/// version of the event.
15-
pub async fn handle_event(event: Event, stats_repository: &Repository, now: DurationSinceUnixEpoch) {
16+
pub async fn handle_event(event: Event, stats_repository: &Arc<Repository>, now: DurationSinceUnixEpoch) {
1617
match event {
1718
Event::TcpAnnounce { connection, .. } => {
1819
// Global fixed metrics
@@ -72,6 +73,7 @@ pub async fn handle_event(event: Event, stats_repository: &Repository, now: Dura
7273
#[cfg(test)]
7374
mod tests {
7475
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
76+
use std::sync::Arc;
7577

7678
use bittorrent_http_tracker_protocol::v1::services::peer_ip_resolver::{RemoteClientAddr, ResolvedIp};
7779
use torrust_tracker_clock::clock::Time;
@@ -85,7 +87,7 @@ mod tests {
8587

8688
#[tokio::test]
8789
async fn should_increase_the_tcp4_announces_counter_when_it_receives_a_tcp4_announce_event() {
88-
let stats_repository = Repository::new();
90+
let stats_repository = Arc::new(Repository::new());
8991
let peer = sample_peer_using_ipv4();
9092
let remote_client_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2));
9193

@@ -110,7 +112,7 @@ mod tests {
110112

111113
#[tokio::test]
112114
async fn should_increase_the_tcp4_scrapes_counter_when_it_receives_a_tcp4_scrape_event() {
113-
let stats_repository = Repository::new();
115+
let stats_repository = Arc::new(Repository::new());
114116

115117
handle_event(
116118
Event::TcpScrape {
@@ -134,7 +136,7 @@ mod tests {
134136

135137
#[tokio::test]
136138
async fn should_increase_the_tcp6_announces_counter_when_it_receives_a_tcp6_announce_event() {
137-
let stats_repository = Repository::new();
139+
let stats_repository = Arc::new(Repository::new());
138140
let peer = sample_peer_using_ipv6();
139141
let remote_client_ip = IpAddr::V6(Ipv6Addr::new(0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969, 0x6969));
140142

@@ -159,7 +161,7 @@ mod tests {
159161

160162
#[tokio::test]
161163
async fn should_increase_the_tcp6_scrapes_counter_when_it_receives_a_tcp6_scrape_event() {
162-
let stats_repository = Repository::new();
164+
let stats_repository = Arc::new(Repository::new());
163165

164166
handle_event(
165167
Event::TcpScrape {

packages/http-tracker-core/src/statistics/event/listener.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use tokio::sync::broadcast;
24
use torrust_tracker_clock::clock::Time;
35

@@ -6,7 +8,7 @@ use crate::event::Event;
68
use crate::statistics::repository::Repository;
79
use crate::{CurrentClock, HTTP_TRACKER_LOG_TARGET};
810

9-
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Repository) {
11+
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Arc<Repository>) {
1012
loop {
1113
match receiver.recv().await {
1214
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,

packages/http-tracker-core/src/statistics/keeper.rs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,69 @@
1-
use tokio::sync::broadcast::Receiver;
1+
use std::sync::Arc;
2+
3+
use tokio::task::JoinHandle;
24

35
use super::event::listener::dispatch_events;
46
use super::repository::Repository;
5-
use crate::event::Event;
7+
use crate::event::sender::{self, Broadcaster};
68
use crate::HTTP_TRACKER_LOG_TARGET;
79

810
/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
911
///
1012
/// It actively listen to new statistics events. When it receives a new event
1113
/// it accordingly increases the counters.
1214
pub struct Keeper {
13-
pub repository: Repository,
15+
pub enable_sender: bool,
16+
pub broadcaster: Broadcaster,
17+
pub repository: Arc<Repository>,
1418
}
1519

1620
impl Default for Keeper {
1721
fn default() -> Self {
18-
Self::new()
22+
let enable_sender = true;
23+
let broadcaster = Broadcaster::default();
24+
let repository = Arc::new(Repository::new());
25+
26+
Self::new(enable_sender, broadcaster, repository)
1927
}
2028
}
2129

2230
impl Keeper {
31+
/// Creates a new instance of [`Keeper`].
2332
#[must_use]
24-
pub fn new() -> Self {
33+
pub fn new(enable_sender: bool, broadcaster: Broadcaster, repository: Arc<Repository>) -> Self {
2534
Self {
26-
repository: Repository::new(),
35+
enable_sender,
36+
broadcaster,
37+
repository,
38+
}
39+
}
40+
41+
#[must_use]
42+
pub fn sender(&self) -> Option<Box<dyn sender::Sender>> {
43+
if self.enable_sender {
44+
Some(Box::new(self.broadcaster.clone()))
45+
} else {
46+
None
2747
}
2848
}
2949

30-
pub fn run_event_listener(&mut self, receiver: Receiver<Event>) {
50+
#[must_use]
51+
pub fn repository(&self) -> Arc<Repository> {
52+
self.repository.clone()
53+
}
54+
55+
#[must_use]
56+
pub fn run_event_listener(&self) -> JoinHandle<()> {
3157
let stats_repository = self.repository.clone();
58+
let receiver = self.broadcaster.subscribe();
3259

3360
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Starting HTTP tracker core event listener");
3461

3562
tokio::spawn(async move {
3663
dispatch_events(receiver, stats_repository).await;
3764

3865
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "HTTP tracker core event listener finished");
39-
});
66+
})
4067
}
4168
}
4269

@@ -48,7 +75,7 @@ mod tests {
4875

4976
#[tokio::test]
5077
async fn should_contain_the_tracker_statistics() {
51-
let stats_tracker = Keeper::new();
78+
let stats_tracker = Keeper::default();
5279

5380
let stats = stats_tracker.repository.get_stats().await;
5481

packages/http-tracker-core/src/statistics/services.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,8 @@ mod tests {
8989
let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default());
9090

9191
let (_http_stats_event_sender, http_stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics);
92-
let http_stats_repository = Arc::new(http_stats_repository);
9392

94-
let tracker_metrics = get_metrics(in_memory_torrent_repository.clone(), http_stats_repository.clone()).await;
93+
let tracker_metrics = get_metrics(in_memory_torrent_repository.clone(), http_stats_repository).await;
9594

9695
assert_eq!(
9796
tracker_metrics,

packages/http-tracker-core/src/statistics/setup.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
//! Setup for the tracker statistics.
22
//!
33
//! The [`factory`] function builds the structs needed for handling the tracker metrics.
4+
use std::sync::Arc;
5+
6+
use super::keeper::Keeper;
7+
use super::repository::Repository;
8+
use crate::event;
49
use crate::event::sender::Broadcaster;
5-
use crate::{event, statistics};
610

711
/// It builds the structs needed for handling the tracker metrics.
812
///
@@ -17,20 +21,23 @@ use crate::{event, statistics};
1721
/// not run the event listeners, consequently the statistics events are sent are
1822
/// received but not dispatched to the handler.
1923
#[must_use]
20-
pub fn factory(tracker_usage_statistics: bool) -> (Option<Box<dyn event::sender::Sender>>, statistics::repository::Repository) {
21-
let mut keeper = statistics::keeper::Keeper::new();
22-
23-
let opt_event_sender: Option<Box<dyn event::sender::Sender>> = if tracker_usage_statistics {
24-
let broadcaster = Broadcaster::default();
24+
pub fn factory(tracker_usage_statistics: bool) -> (Option<Box<dyn event::sender::Sender>>, Arc<Repository>) {
25+
let keeper = keeper_factory(tracker_usage_statistics);
2526

26-
keeper.run_event_listener(broadcaster.subscribe());
27+
if tracker_usage_statistics {
28+
// todo: this should be started like the other jobs during `app::start`
29+
// and keep the join handle in a list of jobs.
30+
let _unused = keeper.run_event_listener();
31+
}
2732

28-
Some(Box::new(broadcaster))
29-
} else {
30-
None
31-
};
33+
(keeper.sender(), keeper.repository())
34+
}
3235

33-
(opt_event_sender, keeper.repository)
36+
#[must_use]
37+
pub fn keeper_factory(tracker_usage_statistics: bool) -> Arc<Keeper> {
38+
let broadcaster = Broadcaster::default();
39+
let repository = Arc::new(Repository::new());
40+
Arc::new(Keeper::new(tracker_usage_statistics, broadcaster.clone(), repository.clone()))
3441
}
3542

3643
#[cfg(test)]

packages/rest-tracker-api-core/src/statistics/services.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ mod tests {
149149
// HTTP core stats
150150
let (_http_stats_event_sender, http_stats_repository) =
151151
bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
152-
let http_stats_repository = Arc::new(http_stats_repository);
153152

154153
// UDP core stats
155154
let (_udp_stats_event_sender, _udp_stats_repository) =

0 commit comments

Comments
 (0)