@@ -11,18 +11,20 @@ use retry::batcher_retryables::{
1111 get_user_nonce_from_ethereum_retryable, user_balance_is_unlocked_retryable,
1212} ;
1313use retry:: { retry_function, RetryError } ;
14+ use tokio:: time:: timeout;
1415use types:: batch_state:: BatchState ;
1516use types:: user_state:: UserState ;
1617
1718use std:: collections:: HashMap ;
1819use std:: env;
1920use std:: net:: SocketAddr ;
2021use std:: sync:: Arc ;
22+ use std:: time:: Duration ;
2123
2224use aligned_sdk:: core:: constants:: {
2325 ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF , AGGREGATOR_GAS_COST , BUMP_BACKOFF_FACTOR ,
24- BUMP_MAX_RETRIES , BUMP_MAX_RETRY_DELAY , BUMP_MIN_RETRY_DELAY , CONSTANT_GAS_COST ,
25- DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER , DEFAULT_MAX_FEE_PER_PROOF ,
26+ BUMP_MAX_RETRIES , BUMP_MAX_RETRY_DELAY , BUMP_MIN_RETRY_DELAY , CONNECTION_TIMEOUT ,
27+ CONSTANT_GAS_COST , DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER , DEFAULT_MAX_FEE_PER_PROOF ,
2628 ETHEREUM_CALL_BACKOFF_FACTOR , ETHEREUM_CALL_MAX_RETRIES , ETHEREUM_CALL_MAX_RETRY_DELAY ,
2729 ETHEREUM_CALL_MIN_RETRY_DELAY , GAS_PRICE_PERCENTAGE_MULTIPLIER , PERCENTAGE_DIVIDER ,
2830 RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER ,
@@ -265,13 +267,19 @@ impl Batcher {
265267 . map_err ( |e| BatcherError :: TcpListenerError ( e. to_string ( ) ) ) ?;
266268 info ! ( "Listening on: {}" , address) ;
267269
268- // Let's spawn the handling of each connection in a separate task.
269- while let Ok ( ( stream, addr) ) = listener. accept ( ) . await {
270- self . metrics . open_connections . inc ( ) ;
271- let batcher = self . clone ( ) ;
272- tokio:: spawn ( batcher. handle_connection ( stream, addr) ) ;
270+ loop {
271+ match listener. accept ( ) . await {
272+ Ok ( ( stream, addr) ) => {
273+ let batcher = self . clone ( ) ;
274+ // Let's spawn the handling of each connection in a separate task.
275+ tokio:: spawn ( batcher. handle_connection ( stream, addr) ) ;
276+ }
277+ Err ( e) => {
278+ self . metrics . user_error ( & [ "connection_accept_error" , "" ] ) ;
279+ error ! ( "Couldn't accept new connection: {}" , e) ;
280+ }
281+ }
273282 }
274- Ok ( ( ) )
275283 }
276284
277285 /// Listen for Ethereum new blocks.
@@ -360,7 +368,24 @@ impl Batcher {
360368 addr : SocketAddr ,
361369 ) -> Result < ( ) , BatcherError > {
362370 info ! ( "Incoming TCP connection from: {}" , addr) ;
363- let ws_stream = tokio_tungstenite:: accept_async ( raw_stream) . await ?;
371+ self . metrics . open_connections . inc ( ) ;
372+
373+ let ws_stream_future = tokio_tungstenite:: accept_async ( raw_stream) ;
374+ let ws_stream =
375+ match timeout ( Duration :: from_secs ( CONNECTION_TIMEOUT ) , ws_stream_future) . await {
376+ Ok ( Ok ( stream) ) => stream,
377+ Ok ( Err ( e) ) => {
378+ warn ! ( "Error while establishing websocket connection: {}" , e) ;
379+ self . metrics . open_connections . dec ( ) ;
380+ return Ok ( ( ) ) ;
381+ }
382+ Err ( e) => {
383+ warn ! ( "Error while establishing websocket connection: {}" , e) ;
384+ self . metrics . open_connections . dec ( ) ;
385+ self . metrics . user_error ( & [ "user_timeout" , "" ] ) ;
386+ return Ok ( ( ) ) ;
387+ }
388+ } ;
364389
365390 debug ! ( "WebSocket connection established: {}" , addr) ;
366391 let ( outgoing, incoming) = ws_stream. split ( ) ;
@@ -379,8 +404,33 @@ impl Batcher {
379404 . send ( Message :: binary ( serialized_protocol_version_msg) )
380405 . await ?;
381406
382- match incoming
383- . try_filter ( |msg| future:: ready ( msg. is_binary ( ) ) )
407+ let mut incoming_filter = incoming. try_filter ( |msg| future:: ready ( msg. is_binary ( ) ) ) ;
408+ let future_msg = incoming_filter. try_next ( ) ;
409+
410+ // timeout to prevent a DOS attack
411+ match timeout ( Duration :: from_secs ( CONNECTION_TIMEOUT ) , future_msg) . await {
412+ Ok ( Ok ( Some ( msg) ) ) => {
413+ self . clone ( ) . handle_message ( msg, outgoing. clone ( ) ) . await ?;
414+ }
415+ Err ( elapsed) => {
416+ warn ! ( "[{}] {}" , & addr, elapsed) ;
417+ self . metrics . user_error ( & [ "user_timeout" , "" ] ) ;
418+ self . metrics . open_connections . dec ( ) ;
419+ return Ok ( ( ) ) ;
420+ }
421+ Ok ( Ok ( None ) ) => {
422+ info ! ( "[{}] Connection closed by the other side" , & addr) ;
423+ self . metrics . open_connections . dec ( ) ;
424+ return Ok ( ( ) ) ;
425+ }
426+ Ok ( Err ( e) ) => {
427+ error ! ( "Unexpected error: {}" , e) ;
428+ self . metrics . open_connections . dec ( ) ;
429+ return Ok ( ( ) ) ;
430+ }
431+ } ;
432+
433+ match incoming_filter
384434 . try_for_each ( |msg| self . clone ( ) . handle_message ( msg, outgoing. clone ( ) ) )
385435 . await
386436 {
0 commit comments