Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/udp-tracker-server/src/banning/event/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::sync::Arc;

use bittorrent_udp_tracker_core::services::banning::BanService;
use tokio::sync::RwLock;
use torrust_tracker_primitives::DurationSinceUnixEpoch;

use crate::event::{ErrorKind, Event};

pub async fn handle_event(event: Event, ban_service: &Arc<RwLock<BanService>>, _now: DurationSinceUnixEpoch) {
if let Event::UdpError {
context,
kind: _,
error: ErrorKind::ConnectionCookie(_msg),
} = event
{
let mut ban_service = ban_service.write().await;
ban_service.increase_counter(&context.client_socket_addr().ip());
}
}
58 changes: 58 additions & 0 deletions packages/udp-tracker-server/src/banning/event/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use bittorrent_udp_tracker_core::services::banning::BanService;
use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use torrust_tracker_clock::clock::Time;
use torrust_tracker_events::receiver::RecvError;

use super::handler::handle_event;
use crate::event::receiver::Receiver;
use crate::CurrentClock;

#[must_use]
pub fn run_event_listener(receiver: Receiver, ban_service: &Arc<RwLock<BanService>>) -> JoinHandle<()> {
let ban_service_clone = ban_service.clone();

tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting UDP tracker server event listener (banning)");

tokio::spawn(async move {
dispatch_events(receiver, ban_service_clone).await;

tracing::info!(target: UDP_TRACKER_LOG_TARGET, "UDP tracker server event listener (banning) finished");
})
}

async fn dispatch_events(mut receiver: Receiver, ban_service: Arc<RwLock<BanService>>) {
let shutdown_signal = tokio::signal::ctrl_c();
tokio::pin!(shutdown_signal);

loop {
tokio::select! {
biased;

_ = &mut shutdown_signal => {
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Received Ctrl+C, shutting down UDP tracker server event listener (banning)");
break;
}

result = receiver.recv() => {
match result {
Ok(event) => handle_event(event, &ban_service, CurrentClock::now()).await,
Err(e) => {
match e {
RecvError::Closed => {
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp server receiver (banning) closed.");
break;
}
RecvError::Lagged(n) => {
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, "Udp server receiver (banning) lagged by {} events.", n);
}
}
}
}
}
}
}
}
2 changes: 2 additions & 0 deletions packages/udp-tracker-server/src/banning/event/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod handler;
pub mod listener;
1 change: 1 addition & 0 deletions packages/udp-tracker-server/src/banning/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod event;
51 changes: 25 additions & 26 deletions packages/udp-tracker-server/src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::net::SocketAddr;
use std::sync::Arc;

use bittorrent_primitives::info_hash::InfoHash;
use bittorrent_tracker_core::container::TrackerCoreContainer;
use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer;
use tokio::task::JoinHandle;
use torrust_server_lib::registar::Registar;
use torrust_tracker_configuration::{logging, Configuration, DEFAULT_TIMEOUT};
use torrust_tracker_primitives::peer;
use torrust_tracker_swarm_coordination_registry::container::SwarmCoordinationRegistryContainer;

use crate::container::UdpTrackerServerContainer;
Expand All @@ -25,22 +23,8 @@ where
pub registar: Registar,
pub server: Server<S>,
pub udp_core_event_listener_job: Option<JoinHandle<()>>,
pub udp_server_event_listener_job: Option<JoinHandle<()>>,
}

impl<S> Environment<S>
where
S: std::fmt::Debug + std::fmt::Display,
{
/// Add a torrent to the tracker
#[allow(dead_code)]
pub async fn add_torrent(&self, info_hash: &InfoHash, peer: &peer::Peer) {
self.container
.tracker_core_container
.in_memory_torrent_repository
.handle_announcement(info_hash, peer, None)
.await;
}
pub udp_server_stats_event_listener_job: Option<JoinHandle<()>>,
pub udp_server_banning_event_listener_job: Option<JoinHandle<()>>,
}

impl Environment<Stopped> {
Expand All @@ -60,7 +44,8 @@ impl Environment<Stopped> {
registar: Registar::default(),
server,
udp_core_event_listener_job: None,
udp_server_event_listener_job: None,
udp_server_stats_event_listener_job: None,
udp_server_banning_event_listener_job: None,
}
}

Expand All @@ -78,10 +63,15 @@ impl Environment<Stopped> {
&self.container.udp_tracker_core_container.stats_repository,
));

// Start the UDP tracker server event listener
let udp_server_event_listener_job = Some(crate::statistics::event::listener::run_event_listener(
// Start the UDP tracker server event listener (statistics)
let udp_server_stats_event_listener_job = Some(crate::statistics::event::listener::run_event_listener(
self.container.udp_tracker_server_container.event_bus.receiver(),
&self.container.udp_tracker_server_container.stats_repository,
));

// Start the UDP tracker server event listener (banning)
let udp_server_banning_event_listener_job = Some(crate::banning::event::listener::run_event_listener(
self.container.udp_tracker_server_container.event_bus.receiver(),
&self.container.udp_tracker_core_container.ban_service,
));

Expand All @@ -102,7 +92,8 @@ impl Environment<Stopped> {
registar: self.registar.clone(),
server,
udp_core_event_listener_job,
udp_server_event_listener_job,
udp_server_stats_event_listener_job,
udp_server_banning_event_listener_job,
}
}
}
Expand Down Expand Up @@ -131,11 +122,18 @@ impl Environment<Running> {
udp_core_event_listener_job.abort();
}

// Stop the UDP tracker server event listener
if let Some(udp_server_event_listener_job) = self.udp_server_event_listener_job {
// Stop the UDP tracker server event listener (statistics)
if let Some(udp_server_stats_event_listener_job) = self.udp_server_stats_event_listener_job {
// todo: send a message to the event listener to stop and wait for
// it to finish
udp_server_stats_event_listener_job.abort();
}

// Stop the UDP tracker server event listener (banning)
if let Some(udp_server_banning_event_listener_job) = self.udp_server_banning_event_listener_job {
// todo: send a message to the event listener to stop and wait for
// it to finish
udp_server_event_listener_job.abort();
udp_server_banning_event_listener_job.abort();
}

// Stop the UDP tracker server
Expand All @@ -149,7 +147,8 @@ impl Environment<Running> {
registar: Registar::default(),
server,
udp_core_event_listener_job: None,
udp_server_event_listener_job: None,
udp_server_stats_event_listener_job: None,
udp_server_banning_event_listener_job: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/udp-tracker-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@
//! documentation by [Arvid Norberg](https://github.com/arvidn) was very
//! supportive in the development of this documentation. Some descriptions were
//! taken from the [libtorrent](https://www.rasterbar.com/products/libtorrent/udp_tracker_protocol.html).
pub mod banning;
pub mod container;
pub mod environment;
pub mod error;
Expand Down
15 changes: 0 additions & 15 deletions packages/udp-tracker-server/src/statistics/event/handler/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::sync::Arc;

use aquatic_udp_protocol::PeerClient;
use bittorrent_udp_tracker_core::services::banning::BanService;
use tokio::sync::RwLock;
use torrust_tracker_metrics::label::LabelSet;
use torrust_tracker_metrics::{label_name, metric_name};
use torrust_tracker_primitives::DurationSinceUnixEpoch;
Expand All @@ -16,16 +12,9 @@ pub async fn handle_event(
opt_udp_request_kind: Option<UdpRequestKind>,
error_kind: ErrorKind,
repository: &Repository,
ban_service: &Arc<RwLock<BanService>>,
now: DurationSinceUnixEpoch,
) {
if let ErrorKind::ConnectionCookie(_msg) = error_kind.clone() {
let mut ban_service = ban_service.write().await;
ban_service.increase_counter(&connection_context.client_socket_addr().ip());
}

update_global_fixed_metrics(&connection_context, repository).await;

update_extendable_metrics(&connection_context, opt_udp_request_kind, error_kind, repository, now).await;
}

Expand Down Expand Up @@ -126,9 +115,7 @@ fn extract_name_and_version(peer_client: &PeerClient) -> (String, String) {
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;

use bittorrent_udp_tracker_core::services::banning::BanService;
use torrust_tracker_clock::clock::Time;
use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding};

Expand All @@ -141,7 +128,6 @@ mod tests {
#[tokio::test]
async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() {
let stats_repository = Repository::new();
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));

handle_event(
Event::UdpError {
Expand All @@ -157,7 +143,6 @@ mod tests {
error: ErrorKind::RequestParse("Invalid request format".to_string()),
},
&stats_repository,
&ban_service,
CurrentClock::now(),
)
.await;
Expand Down
13 changes: 2 additions & 11 deletions packages/udp-tracker-server/src/statistics/event/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,12 @@ mod request_banned;
mod request_received;
mod response_sent;

use std::sync::Arc;

use bittorrent_udp_tracker_core::services::banning::BanService;
use tokio::sync::RwLock;
use torrust_tracker_primitives::DurationSinceUnixEpoch;

use crate::event::Event;
use crate::statistics::repository::Repository;

pub async fn handle_event(
event: Event,
stats_repository: &Repository,
ban_service: &Arc<RwLock<BanService>>,
now: DurationSinceUnixEpoch,
) {
pub async fn handle_event(event: Event, stats_repository: &Repository, now: DurationSinceUnixEpoch) {
match event {
Event::UdpRequestAborted { context } => {
request_aborted::handle_event(context, stats_repository, now).await;
Expand All @@ -41,7 +32,7 @@ pub async fn handle_event(
response_sent::handle_event(context, kind, req_processing_time, stats_repository, now).await;
}
Event::UdpError { context, kind, error } => {
error::handle_event(context, kind, error, stats_repository, ban_service, now).await;
error::handle_event(context, kind, error, stats_repository, now).await;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ pub async fn handle_event(context: ConnectionContext, stats_repository: &Reposit
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;

use bittorrent_udp_tracker_core::services::banning::BanService;
use torrust_tracker_clock::clock::Time;
use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding};

Expand All @@ -41,7 +39,6 @@ mod tests {
#[tokio::test]
async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() {
let stats_repository = Repository::new();
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));

handle_event(
Event::UdpRequestAborted {
Expand All @@ -55,7 +52,6 @@ mod tests {
),
},
&stats_repository,
&ban_service,
CurrentClock::now(),
)
.await;
Expand All @@ -68,7 +64,6 @@ mod tests {
#[tokio::test]
async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() {
let stats_repository = Repository::new();
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));

handle_event(
Event::UdpRequestAborted {
Expand All @@ -82,7 +77,6 @@ mod tests {
),
},
&stats_repository,
&ban_service,
CurrentClock::now(),
)
.await;
Expand Down
Loading
Loading