Skip to content

Commit f25438a

Browse files
committed
refactor: [torrust#1478] decouple events from stats in http core keeper
1 parent 4bb7a5a commit f25438a

File tree

14 files changed

+69
-87
lines changed

14 files changed

+69
-87
lines changed

packages/axum-http-tracker-server/src/environment.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

33
use bittorrent_http_tracker_core::container::HttpTrackerCoreContainer;
4+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
45
use bittorrent_primitives::info_hash::InfoHash;
56
use bittorrent_tracker_core::container::TrackerCoreContainer;
67
use futures::executor::block_on;
@@ -68,7 +69,10 @@ impl Environment<Stopped> {
6869
#[allow(dead_code)]
6970
pub async fn start(self) -> Environment<Running> {
7071
// Start the event listener
71-
let event_listener_job = self.container.http_tracker_core_container.stats_keeper.run_event_listener();
72+
let event_listener_job = run_event_listener(
73+
self.container.http_tracker_core_container.stats_keeper.receiver(),
74+
&self.container.http_tracker_core_container.stats_repository,
75+
);
7276

7377
// Start the server
7478
let server = self

packages/axum-http-tracker-server/src/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ mod tests {
250250
use bittorrent_http_tracker_core::container::HttpTrackerCoreContainer;
251251
use bittorrent_http_tracker_core::services::announce::AnnounceService;
252252
use bittorrent_http_tracker_core::services::scrape::ScrapeService;
253+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
253254
use bittorrent_tracker_core::container::TrackerCoreContainer;
254255
use torrust_axum_server::tsl::make_rust_tls;
255256
use torrust_server_lib::registar::Registar;
@@ -271,13 +272,12 @@ mod tests {
271272
let http_tracker_config = Arc::new(http_tracker_config.clone());
272273

273274
// HTTP core stats
274-
let http_stats_keeper =
275+
let (http_stats_keeper, http_stats_repository) =
275276
bittorrent_http_tracker_core::statistics::setup::factory(configuration.core.tracker_usage_statistics);
276277
let http_stats_event_sender = http_stats_keeper.sender();
277-
let http_stats_repository = http_stats_keeper.repository();
278278

279279
if configuration.core.tracker_usage_statistics {
280-
let _unused = http_stats_keeper.run_event_listener();
280+
let _unused = run_event_listener(http_stats_keeper.receiver(), &http_stats_repository);
281281
}
282282

283283
let tracker_core_container = Arc::new(TrackerCoreContainer::initialize(&core_config));

packages/axum-http-tracker-server/src/v1/handlers/announce.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ mod tests {
108108

109109
use aquatic_udp_protocol::PeerId;
110110
use bittorrent_http_tracker_core::services::announce::AnnounceService;
111+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
111112
use bittorrent_http_tracker_protocol::v1::requests::announce::Announce;
112113
use bittorrent_http_tracker_protocol::v1::responses;
113114
use bittorrent_http_tracker_protocol::v1::services::peer_ip_resolver::ClientIpSources;
@@ -161,12 +162,12 @@ mod tests {
161162
));
162163

163164
// HTTP core stats
164-
let http_stats_keeper = bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
165+
let (http_stats_keeper, http_stats_repository) =
166+
bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
165167
let http_stats_event_sender = http_stats_keeper.sender();
166-
let _http_stats_repository = http_stats_keeper.repository();
167168

168169
if config.core.tracker_usage_statistics {
169-
let _unused = http_stats_keeper.run_event_listener();
170+
let _unused = run_event_listener(http_stats_keeper.receiver(), &http_stats_repository);
170171
}
171172

172173
let announce_service = Arc::new(AnnounceService::new(

packages/axum-http-tracker-server/src/v1/handlers/scrape.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ mod tests {
8383
use std::str::FromStr;
8484
use std::sync::Arc;
8585

86+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
8687
use bittorrent_http_tracker_protocol::v1::requests::scrape::Scrape;
8788
use bittorrent_http_tracker_protocol::v1::responses;
8889
use bittorrent_http_tracker_protocol::v1::services::peer_ip_resolver::ClientIpSources;
@@ -132,12 +133,12 @@ mod tests {
132133
let scrape_handler = Arc::new(ScrapeHandler::new(&whitelist_authorization, &in_memory_torrent_repository));
133134

134135
// HTTP core stats
135-
let http_stats_keeper = bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
136+
let (http_stats_keeper, http_stats_repository) =
137+
bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
136138
let http_stats_event_sender = http_stats_keeper.sender();
137-
let _http_stats_repository = http_stats_keeper.repository();
138139

139140
if config.core.tracker_usage_statistics {
140-
let _unused = http_stats_keeper.run_event_listener();
141+
let _unused = run_event_listener(http_stats_keeper.receiver(), &http_stats_repository);
141142
}
142143

143144
(

packages/http-tracker-core/benches/helpers/util.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33

44
use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId};
55
use bittorrent_http_tracker_core::event::Event;
6+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
67
use bittorrent_http_tracker_core::{event, statistics};
78
use bittorrent_http_tracker_protocol::v1::requests::announce::Announce;
89
use bittorrent_http_tracker_protocol::v1::services::peer_ip_resolver::ClientIpSources;
@@ -56,12 +57,11 @@ pub fn initialize_core_tracker_services_with_config(config: &Configuration) -> (
5657
));
5758

5859
// HTTP core stats
59-
let http_stats_keeper = statistics::setup::factory(config.core.tracker_usage_statistics);
60+
let (http_stats_keeper, http_stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics);
6061
let http_stats_event_sender = http_stats_keeper.sender();
61-
let _http_stats_repository = http_stats_keeper.repository();
6262

6363
if config.core.tracker_usage_statistics {
64-
let _unused = http_stats_keeper.run_event_listener();
64+
let _unused = run_event_listener(http_stats_keeper.receiver(), &http_stats_repository);
6565
}
6666

6767
(

packages/http-tracker-core/src/container.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ impl HttpTrackerCoreServices {
6666
#[must_use]
6767
pub fn initialize_from(tracker_core_container: &Arc<TrackerCoreContainer>) -> Arc<Self> {
6868
// HTTP core stats
69-
let http_stats_keeper = statistics::setup::factory(tracker_core_container.core_config.tracker_usage_statistics);
69+
let (http_stats_keeper, http_stats_repository) =
70+
statistics::setup::factory(tracker_core_container.core_config.tracker_usage_statistics);
7071
let http_stats_event_sender = http_stats_keeper.sender();
71-
let http_stats_repository = http_stats_keeper.repository();
7272

7373
let http_announce_service = Arc::new(AnnounceService::new(
7474
tracker_core_container.core_config.clone(),

packages/http-tracker-core/src/services/announce.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,12 +253,11 @@ mod tests {
253253
));
254254

255255
// HTTP core stats
256-
let http_stats_keeper = statistics::setup::factory(config.core.tracker_usage_statistics);
256+
let (http_stats_keeper, http_stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics);
257257
let http_stats_event_sender = http_stats_keeper.sender();
258-
let _http_stats_repository = http_stats_keeper.repository();
259258

260259
if config.core.tracker_usage_statistics {
261-
let _unused = http_stats_keeper.run_event_listener();
260+
let _unused = run_event_listener(http_stats_keeper.receiver(), &http_stats_repository);
262261
}
263262

264263
(
@@ -298,6 +297,7 @@ mod tests {
298297
use tokio::sync::broadcast::error::SendError;
299298

300299
use crate::event::Event;
300+
use crate::statistics::event::listener::run_event_listener;
301301
use crate::tests::sample_info_hash;
302302
use crate::{event, statistics};
303303

packages/http-tracker-core/src/services/scrape.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,8 @@ mod tests {
273273
let core_config = Arc::new(configuration.core.clone());
274274

275275
// HTTP core stats
276-
let http_stats_keeper = statistics::setup::factory(false);
276+
let (http_stats_keeper, _http_stats_repository) = statistics::setup::factory(false);
277277
let http_stats_event_sender = http_stats_keeper.sender();
278-
let _http_stats_repository = http_stats_keeper.repository();
279278

280279
let container = initialize_services_with_configuration(&configuration);
281280

@@ -465,9 +464,8 @@ mod tests {
465464
let container = initialize_services_with_configuration(&config);
466465

467466
// HTTP core stats
468-
let http_stats_keeper = statistics::setup::factory(false);
467+
let (http_stats_keeper, _http_stats_repository) = statistics::setup::factory(false);
469468
let http_stats_event_sender = http_stats_keeper.sender();
470-
let _http_stats_repository = http_stats_keeper.repository();
471469

472470
let info_hash = sample_info_hash();
473471
let info_hashes = vec![info_hash];

packages/http-tracker-core/src/statistics/event/listener.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,28 @@
11
use std::sync::Arc;
22

3-
use tokio::sync::broadcast;
3+
use tokio::sync::broadcast::{self, Receiver};
4+
use tokio::task::JoinHandle;
45
use torrust_tracker_clock::clock::Time;
56

67
use super::handler::handle_event;
78
use crate::event::Event;
89
use crate::statistics::repository::Repository;
910
use crate::{CurrentClock, HTTP_TRACKER_LOG_TARGET};
1011

11-
pub async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Arc<Repository>) {
12+
#[must_use]
13+
pub fn run_event_listener(receiver: Receiver<Event>, repository: &Arc<Repository>) -> JoinHandle<()> {
14+
let stats_repository = repository.clone();
15+
16+
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Starting HTTP tracker core event listener");
17+
18+
tokio::spawn(async move {
19+
dispatch_events(receiver, stats_repository).await;
20+
21+
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "HTTP tracker core event listener finished");
22+
})
23+
}
24+
25+
async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Arc<Repository>) {
1226
loop {
1327
match receiver.recv().await {
1428
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,30 @@
11
use std::sync::Arc;
22

3-
use tokio::task::JoinHandle;
3+
use tokio::sync::broadcast::Receiver;
44

5-
use super::event::listener::dispatch_events;
6-
use super::repository::Repository;
75
use crate::event::sender::{self, Broadcaster};
8-
use crate::HTTP_TRACKER_LOG_TARGET;
6+
use crate::event::Event;
97

10-
/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
11-
///
12-
/// It actively listen to new statistics events. When it receives a new event
13-
/// it accordingly increases the counters.
148
pub struct Keeper {
159
pub enable_sender: bool,
1610
pub broadcaster: Broadcaster,
17-
pub repository: Arc<Repository>,
1811
}
1912

2013
impl Default for Keeper {
2114
fn default() -> Self {
2215
let enable_sender = true;
2316
let broadcaster = Broadcaster::default();
24-
let repository = Arc::new(Repository::new());
2517

26-
Self::new(enable_sender, broadcaster, repository)
18+
Self::new(enable_sender, broadcaster)
2719
}
2820
}
2921

3022
impl Keeper {
31-
/// Creates a new instance of [`Keeper`].
3223
#[must_use]
33-
pub fn new(enable_sender: bool, broadcaster: Broadcaster, repository: Arc<Repository>) -> Self {
24+
pub fn new(enable_sender: bool, broadcaster: Broadcaster) -> Self {
3425
Self {
3526
enable_sender,
3627
broadcaster,
37-
repository,
3828
}
3929
}
4030

@@ -48,37 +38,7 @@ impl Keeper {
4838
}
4939

5040
#[must_use]
51-
pub fn repository(&self) -> Arc<Repository> {
52-
self.repository.clone()
53-
}
54-
55-
#[must_use]
56-
pub fn run_event_listener(&self) -> JoinHandle<()> {
57-
let stats_repository = self.repository.clone();
58-
let receiver = self.broadcaster.subscribe();
59-
60-
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Starting HTTP tracker core event listener");
61-
62-
tokio::spawn(async move {
63-
dispatch_events(receiver, stats_repository).await;
64-
65-
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "HTTP tracker core event listener finished");
66-
})
67-
}
68-
}
69-
70-
#[cfg(test)]
71-
mod tests {
72-
73-
use crate::statistics::keeper::Keeper;
74-
use crate::statistics::metrics::Metrics;
75-
76-
#[tokio::test]
77-
async fn should_contain_the_tracker_statistics() {
78-
let stats_tracker = Keeper::default();
79-
80-
let stats = stats_tracker.repository.get_stats().await;
81-
82-
assert_eq!(stats.tcp4_announces_handled, Metrics::default().tcp4_announces_handled);
41+
pub fn receiver(&self) -> Receiver<Event> {
42+
self.broadcaster.subscribe()
8343
}
8444
}

0 commit comments

Comments
 (0)