@@ -3,10 +3,11 @@ use std::sync::Arc;
33use std:: time:: Duration ;
44
55use bittorrent_tracker_client:: udp:: client:: check;
6+ use bloom:: CountingBloomFilter ;
67use derive_more:: Constructor ;
78use futures_util:: StreamExt ;
89use tokio:: select;
9- use tokio:: sync:: oneshot;
10+ use tokio:: sync:: { oneshot, RwLock } ;
1011use tracing:: instrument;
1112
1213use super :: request_buffer:: ActiveRequests ;
@@ -20,6 +21,10 @@ use crate::servers::udp::server::processor::Processor;
2021use crate :: servers:: udp:: server:: receiver:: Receiver ;
2122use crate :: servers:: udp:: UDP_TRACKER_LOG_TARGET ;
2223
24+ /// The maximum number of connection id errors per ip. Clients will be banned if
25+ /// they exceed this limit.
26+ const MAX_CONNECTION_ID_ERRORS_PER_IP : u32 = 10 ;
27+
2328/// A UDP server instance launcher.
2429#[ derive( Constructor ) ]
2530pub struct Launcher ;
@@ -114,6 +119,10 @@ impl Launcher {
114119 async fn run_udp_server_main ( mut receiver : Receiver , tracker : Arc < Tracker > , cookie_lifetime : Duration ) {
115120 let active_requests = & mut ActiveRequests :: default ( ) ;
116121
122+ // Create a counting bloom filter that uses 4 bits per element and has a
123+ // false positive rate of 0.01 when 100 items have been inserted
124+ let cbf = Arc :: new ( RwLock :: new ( CountingBloomFilter :: with_rate ( 4 , 0.01 , 100 ) ) ) ;
125+
117126 let addr = receiver. bound_socket_address ( ) ;
118127 let local_addr = format ! ( "udp://{addr}" ) ;
119128
@@ -140,6 +149,13 @@ impl Launcher {
140149 }
141150 } ;
142151
152+ let connection_id_errors_from_ip = cbf. read ( ) . await . estimate_count ( & req. from . ip ( ) . to_string ( ) ) ;
153+
154+ if connection_id_errors_from_ip > MAX_CONNECTION_ID_ERRORS_PER_IP {
155+ tracing:: debug!( target: UDP_TRACKER_LOG_TARGET , local_addr, "Udp::run_udp_server::loop continue: (banned ip)" ) ;
156+ continue ;
157+ }
158+
143159 // We spawn the new task even if there active requests buffer is
144160 // full. This could seem counterintuitive because we are accepting
145161 // more request and consuming more memory even if the server is
@@ -151,7 +167,8 @@ impl Launcher {
151167 // are only adding and removing tasks without given them the
152168 // chance to finish. However, the buffer is yielding before
153169 // aborting one tasks, giving it the chance to finish.
154- let abort_handle: tokio:: task:: AbortHandle = tokio:: task:: spawn ( processor. process_request ( req) ) . abort_handle ( ) ;
170+ let abort_handle: tokio:: task:: AbortHandle =
171+ tokio:: task:: spawn ( processor. process_request ( req, cbf. clone ( ) ) ) . abort_handle ( ) ;
155172
156173 if abort_handle. is_finished ( ) {
157174 continue ;
0 commit comments