Skip to content

Commit d81e59e

Browse files
committed
fix: [#1565] ban service should work with stats disabled
1 parent d949e48 commit d81e59e

File tree

16 files changed

+137
-105
lines changed

16 files changed

+137
-105
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use std::sync::Arc;
2+
3+
use bittorrent_udp_tracker_core::services::banning::BanService;
4+
use tokio::sync::RwLock;
5+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
6+
7+
use crate::event::{ErrorKind, Event};
8+
9+
pub async fn handle_event(event: Event, ban_service: &Arc<RwLock<BanService>>, _now: DurationSinceUnixEpoch) {
10+
if let Event::UdpError {
11+
context,
12+
kind: _,
13+
error: ErrorKind::ConnectionCookie(_msg),
14+
} = event
15+
{
16+
let mut ban_service = ban_service.write().await;
17+
ban_service.increase_counter(&context.client_socket_addr().ip());
18+
}
19+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use std::sync::Arc;
2+
3+
use bittorrent_udp_tracker_core::services::banning::BanService;
4+
use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET;
5+
use tokio::sync::RwLock;
6+
use tokio::task::JoinHandle;
7+
use torrust_tracker_clock::clock::Time;
8+
use torrust_tracker_events::receiver::RecvError;
9+
10+
use super::handler::handle_event;
11+
use crate::event::receiver::Receiver;
12+
use crate::CurrentClock;
13+
14+
#[must_use]
15+
pub fn run_event_listener(receiver: Receiver, ban_service: &Arc<RwLock<BanService>>) -> JoinHandle<()> {
16+
let ban_service_clone = ban_service.clone();
17+
18+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting UDP tracker server event listener (banning)");
19+
20+
tokio::spawn(async move {
21+
dispatch_events(receiver, ban_service_clone).await;
22+
23+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "UDP tracker server event listener (banning) finished");
24+
})
25+
}
26+
27+
async fn dispatch_events(mut receiver: Receiver, ban_service: Arc<RwLock<BanService>>) {
28+
let shutdown_signal = tokio::signal::ctrl_c();
29+
tokio::pin!(shutdown_signal);
30+
31+
loop {
32+
tokio::select! {
33+
biased;
34+
35+
_ = &mut shutdown_signal => {
36+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Received Ctrl+C, shutting down UDP tracker server event listener (banning)");
37+
break;
38+
}
39+
40+
result = receiver.recv() => {
41+
match result {
42+
Ok(event) => handle_event(event, &ban_service, CurrentClock::now()).await,
43+
Err(e) => {
44+
match e {
45+
RecvError::Closed => {
46+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp server receiver (banning) closed.");
47+
break;
48+
}
49+
RecvError::Lagged(n) => {
50+
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, "Udp server receiver (banning) lagged by {} events.", n);
51+
}
52+
}
53+
}
54+
}
55+
}
56+
}
57+
}
58+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod handler;
2+
pub mod listener;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod event;

packages/udp-tracker-server/src/environment.rs

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
use std::net::SocketAddr;
22
use std::sync::Arc;
33

4-
use bittorrent_primitives::info_hash::InfoHash;
54
use bittorrent_tracker_core::container::TrackerCoreContainer;
65
use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer;
76
use tokio::task::JoinHandle;
87
use torrust_server_lib::registar::Registar;
98
use torrust_tracker_configuration::{logging, Configuration, DEFAULT_TIMEOUT};
10-
use torrust_tracker_primitives::peer;
119
use torrust_tracker_swarm_coordination_registry::container::SwarmCoordinationRegistryContainer;
1210

1311
use crate::container::UdpTrackerServerContainer;
@@ -25,22 +23,8 @@ where
2523
pub registar: Registar,
2624
pub server: Server<S>,
2725
pub udp_core_event_listener_job: Option<JoinHandle<()>>,
28-
pub udp_server_event_listener_job: Option<JoinHandle<()>>,
29-
}
30-
31-
impl<S> Environment<S>
32-
where
33-
S: std::fmt::Debug + std::fmt::Display,
34-
{
35-
/// Add a torrent to the tracker
36-
#[allow(dead_code)]
37-
pub async fn add_torrent(&self, info_hash: &InfoHash, peer: &peer::Peer) {
38-
self.container
39-
.tracker_core_container
40-
.in_memory_torrent_repository
41-
.handle_announcement(info_hash, peer, None)
42-
.await;
43-
}
26+
pub udp_server_stats_event_listener_job: Option<JoinHandle<()>>,
27+
pub udp_server_banning_event_listener_job: Option<JoinHandle<()>>,
4428
}
4529

4630
impl Environment<Stopped> {
@@ -60,7 +44,8 @@ impl Environment<Stopped> {
6044
registar: Registar::default(),
6145
server,
6246
udp_core_event_listener_job: None,
63-
udp_server_event_listener_job: None,
47+
udp_server_stats_event_listener_job: None,
48+
udp_server_banning_event_listener_job: None,
6449
}
6550
}
6651

@@ -78,10 +63,15 @@ impl Environment<Stopped> {
7863
&self.container.udp_tracker_core_container.stats_repository,
7964
));
8065

81-
// Start the UDP tracker server event listener
82-
let udp_server_event_listener_job = Some(crate::statistics::event::listener::run_event_listener(
66+
// Start the UDP tracker server event listener (statistics)
67+
let udp_server_stats_event_listener_job = Some(crate::statistics::event::listener::run_event_listener(
8368
self.container.udp_tracker_server_container.event_bus.receiver(),
8469
&self.container.udp_tracker_server_container.stats_repository,
70+
));
71+
72+
// Start the UDP tracker server event listener (banning)
73+
let udp_server_banning_event_listener_job = Some(crate::banning::event::listener::run_event_listener(
74+
self.container.udp_tracker_server_container.event_bus.receiver(),
8575
&self.container.udp_tracker_core_container.ban_service,
8676
));
8777

@@ -102,7 +92,8 @@ impl Environment<Stopped> {
10292
registar: self.registar.clone(),
10393
server,
10494
udp_core_event_listener_job,
105-
udp_server_event_listener_job,
95+
udp_server_stats_event_listener_job,
96+
udp_server_banning_event_listener_job,
10697
}
10798
}
10899
}
@@ -131,11 +122,18 @@ impl Environment<Running> {
131122
udp_core_event_listener_job.abort();
132123
}
133124

134-
// Stop the UDP tracker server event listener
135-
if let Some(udp_server_event_listener_job) = self.udp_server_event_listener_job {
125+
// Stop the UDP tracker server event listener (statistics)
126+
if let Some(udp_server_stats_event_listener_job) = self.udp_server_stats_event_listener_job {
127+
// todo: send a message to the event listener to stop and wait for
128+
// it to finish
129+
udp_server_stats_event_listener_job.abort();
130+
}
131+
132+
// Stop the UDP tracker server event listener (banning)
133+
if let Some(udp_server_banning_event_listener_job) = self.udp_server_banning_event_listener_job {
136134
// todo: send a message to the event listener to stop and wait for
137135
// it to finish
138-
udp_server_event_listener_job.abort();
136+
udp_server_banning_event_listener_job.abort();
139137
}
140138

141139
// Stop the UDP tracker server
@@ -149,7 +147,8 @@ impl Environment<Running> {
149147
registar: Registar::default(),
150148
server,
151149
udp_core_event_listener_job: None,
152-
udp_server_event_listener_job: None,
150+
udp_server_stats_event_listener_job: None,
151+
udp_server_banning_event_listener_job: None,
153152
}
154153
}
155154

packages/udp-tracker-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,7 @@
634634
//! documentation by [Arvid Norberg](https://github.com/arvidn) was very
635635
//! supportive in the development of this documentation. Some descriptions were
636636
//! taken from the [libtorrent](https://www.rasterbar.com/products/libtorrent/udp_tracker_protocol.html).
637+
pub mod banning;
637638
pub mod container;
638639
pub mod environment;
639640
pub mod error;

packages/udp-tracker-server/src/statistics/event/handler/error.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use std::sync::Arc;
2-
31
use aquatic_udp_protocol::PeerClient;
4-
use bittorrent_udp_tracker_core::services::banning::BanService;
5-
use tokio::sync::RwLock;
62
use torrust_tracker_metrics::label::LabelSet;
73
use torrust_tracker_metrics::{label_name, metric_name};
84
use torrust_tracker_primitives::DurationSinceUnixEpoch;
@@ -16,16 +12,9 @@ pub async fn handle_event(
1612
opt_udp_request_kind: Option<UdpRequestKind>,
1713
error_kind: ErrorKind,
1814
repository: &Repository,
19-
ban_service: &Arc<RwLock<BanService>>,
2015
now: DurationSinceUnixEpoch,
2116
) {
22-
if let ErrorKind::ConnectionCookie(_msg) = error_kind.clone() {
23-
let mut ban_service = ban_service.write().await;
24-
ban_service.increase_counter(&connection_context.client_socket_addr().ip());
25-
}
26-
2717
update_global_fixed_metrics(&connection_context, repository).await;
28-
2918
update_extendable_metrics(&connection_context, opt_udp_request_kind, error_kind, repository, now).await;
3019
}
3120

@@ -126,9 +115,7 @@ fn extract_name_and_version(peer_client: &PeerClient) -> (String, String) {
126115
#[cfg(test)]
127116
mod tests {
128117
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
129-
use std::sync::Arc;
130118

131-
use bittorrent_udp_tracker_core::services::banning::BanService;
132119
use torrust_tracker_clock::clock::Time;
133120
use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding};
134121

@@ -141,7 +128,6 @@ mod tests {
141128
#[tokio::test]
142129
async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() {
143130
let stats_repository = Repository::new();
144-
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
145131

146132
handle_event(
147133
Event::UdpError {
@@ -157,7 +143,6 @@ mod tests {
157143
error: ErrorKind::RequestParse("Invalid request format".to_string()),
158144
},
159145
&stats_repository,
160-
&ban_service,
161146
CurrentClock::now(),
162147
)
163148
.await;

packages/udp-tracker-server/src/statistics/event/handler/mod.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,12 @@ mod request_banned;
55
mod request_received;
66
mod response_sent;
77

8-
use std::sync::Arc;
9-
10-
use bittorrent_udp_tracker_core::services::banning::BanService;
11-
use tokio::sync::RwLock;
128
use torrust_tracker_primitives::DurationSinceUnixEpoch;
139

1410
use crate::event::Event;
1511
use crate::statistics::repository::Repository;
1612

17-
pub async fn handle_event(
18-
event: Event,
19-
stats_repository: &Repository,
20-
ban_service: &Arc<RwLock<BanService>>,
21-
now: DurationSinceUnixEpoch,
22-
) {
13+
pub async fn handle_event(event: Event, stats_repository: &Repository, now: DurationSinceUnixEpoch) {
2314
match event {
2415
Event::UdpRequestAborted { context } => {
2516
request_aborted::handle_event(context, stats_repository, now).await;
@@ -41,7 +32,7 @@ pub async fn handle_event(
4132
response_sent::handle_event(context, kind, req_processing_time, stats_repository, now).await;
4233
}
4334
Event::UdpError { context, kind, error } => {
44-
error::handle_event(context, kind, error, stats_repository, ban_service, now).await;
35+
error::handle_event(context, kind, error, stats_repository, now).await;
4536
}
4637
}
4738

packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ pub async fn handle_event(context: ConnectionContext, stats_repository: &Reposit
2727
#[cfg(test)]
2828
mod tests {
2929
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
30-
use std::sync::Arc;
3130

32-
use bittorrent_udp_tracker_core::services::banning::BanService;
3331
use torrust_tracker_clock::clock::Time;
3432
use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding};
3533

@@ -41,7 +39,6 @@ mod tests {
4139
#[tokio::test]
4240
async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() {
4341
let stats_repository = Repository::new();
44-
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
4542

4643
handle_event(
4744
Event::UdpRequestAborted {
@@ -55,7 +52,6 @@ mod tests {
5552
),
5653
},
5754
&stats_repository,
58-
&ban_service,
5955
CurrentClock::now(),
6056
)
6157
.await;
@@ -68,7 +64,6 @@ mod tests {
6864
#[tokio::test]
6965
async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() {
7066
let stats_repository = Repository::new();
71-
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
7267

7368
handle_event(
7469
Event::UdpRequestAborted {
@@ -82,7 +77,6 @@ mod tests {
8277
),
8378
},
8479
&stats_repository,
85-
&ban_service,
8680
CurrentClock::now(),
8781
)
8882
.await;

0 commit comments

Comments
 (0)