@@ -11,18 +11,20 @@ use retry::batcher_retryables::{
1111 get_user_nonce_from_ethereum_retryable, user_balance_is_unlocked_retryable,
1212} ;
1313use retry:: { retry_function, RetryError } ;
14+ use tokio:: time:: timeout;
1415use types:: batch_state:: BatchState ;
1516use types:: user_state:: UserState ;
1617
1718use std:: collections:: HashMap ;
1819use std:: env;
1920use std:: net:: SocketAddr ;
2021use std:: sync:: Arc ;
22+ use std:: time:: Duration ;
2123
2224use aligned_sdk:: core:: constants:: {
2325 ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF , AGGREGATOR_GAS_COST , BUMP_BACKOFF_FACTOR ,
24- BUMP_MAX_RETRIES , BUMP_MAX_RETRY_DELAY , BUMP_MIN_RETRY_DELAY , CONSTANT_GAS_COST ,
25- DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER , DEFAULT_MAX_FEE_PER_PROOF ,
26+ BUMP_MAX_RETRIES , BUMP_MAX_RETRY_DELAY , BUMP_MIN_RETRY_DELAY , CONNECTION_TIMEOUT ,
27+ CONSTANT_GAS_COST , DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER , DEFAULT_MAX_FEE_PER_PROOF ,
2628 ETHEREUM_CALL_BACKOFF_FACTOR , ETHEREUM_CALL_MAX_RETRIES , ETHEREUM_CALL_MAX_RETRY_DELAY ,
2729 ETHEREUM_CALL_MIN_RETRY_DELAY , GAS_PRICE_PERCENTAGE_MULTIPLIER , PERCENTAGE_DIVIDER ,
2830 RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER ,
@@ -79,7 +81,7 @@ pub struct Batcher {
7981 service_manager : ServiceManager ,
8082 service_manager_fallback : ServiceManager ,
8183 batch_state : Mutex < BatchState > ,
82- max_block_interval : u64 ,
84+ min_block_interval : u64 ,
8385 transaction_wait_timeout : u64 ,
8486 max_proof_size : usize ,
8587 max_batch_byte_size : usize ,
@@ -242,7 +244,7 @@ impl Batcher {
242244 payment_service_fallback,
243245 service_manager,
244246 service_manager_fallback,
245- max_block_interval : config. batcher . block_interval ,
247+ min_block_interval : config. batcher . block_interval ,
246248 transaction_wait_timeout : config. batcher . transaction_wait_timeout ,
247249 max_proof_size : config. batcher . max_proof_size ,
248250 max_batch_byte_size : config. batcher . max_batch_byte_size ,
@@ -265,13 +267,19 @@ impl Batcher {
265267 . map_err ( |e| BatcherError :: TcpListenerError ( e. to_string ( ) ) ) ?;
266268 info ! ( "Listening on: {}" , address) ;
267269
268- // Let's spawn the handling of each connection in a separate task.
269- while let Ok ( ( stream, addr) ) = listener. accept ( ) . await {
270- self . metrics . open_connections . inc ( ) ;
271- let batcher = self . clone ( ) ;
272- tokio:: spawn ( batcher. handle_connection ( stream, addr) ) ;
270+ loop {
271+ match listener. accept ( ) . await {
272+ Ok ( ( stream, addr) ) => {
273+ let batcher = self . clone ( ) ;
274+ // Let's spawn the handling of each connection in a separate task.
275+ tokio:: spawn ( batcher. handle_connection ( stream, addr) ) ;
276+ }
277+ Err ( e) => {
278+ self . metrics . user_error ( & [ "connection_accept_error" , "" ] ) ;
279+ error ! ( "Couldn't accept new connection: {}" , e) ;
280+ }
281+ }
273282 }
274- Ok ( ( ) )
275283 }
276284
277285 /// Listen for Ethereum new blocks.
@@ -360,7 +368,24 @@ impl Batcher {
360368 addr : SocketAddr ,
361369 ) -> Result < ( ) , BatcherError > {
362370 info ! ( "Incoming TCP connection from: {}" , addr) ;
363- let ws_stream = tokio_tungstenite:: accept_async ( raw_stream) . await ?;
371+ self . metrics . open_connections . inc ( ) ;
372+
373+ let ws_stream_future = tokio_tungstenite:: accept_async ( raw_stream) ;
374+ let ws_stream =
375+ match timeout ( Duration :: from_secs ( CONNECTION_TIMEOUT ) , ws_stream_future) . await {
376+ Ok ( Ok ( stream) ) => stream,
377+ Ok ( Err ( e) ) => {
378+ warn ! ( "Error while establishing websocket connection: {}" , e) ;
379+ self . metrics . open_connections . dec ( ) ;
380+ return Ok ( ( ) ) ;
381+ }
382+ Err ( e) => {
383+ warn ! ( "Error while establishing websocket connection: {}" , e) ;
384+ self . metrics . open_connections . dec ( ) ;
385+ self . metrics . user_error ( & [ "user_timeout" , "" ] ) ;
386+ return Ok ( ( ) ) ;
387+ }
388+ } ;
364389
365390 debug ! ( "WebSocket connection established: {}" , addr) ;
366391 let ( outgoing, incoming) = ws_stream. split ( ) ;
@@ -379,8 +404,33 @@ impl Batcher {
379404 . send ( Message :: binary ( serialized_protocol_version_msg) )
380405 . await ?;
381406
382- match incoming
383- . try_filter ( |msg| future:: ready ( msg. is_binary ( ) ) )
407+ let mut incoming_filter = incoming. try_filter ( |msg| future:: ready ( msg. is_binary ( ) ) ) ;
408+ let future_msg = incoming_filter. try_next ( ) ;
409+
410+ // timeout to prevent a DOS attack
411+ match timeout ( Duration :: from_secs ( CONNECTION_TIMEOUT ) , future_msg) . await {
412+ Ok ( Ok ( Some ( msg) ) ) => {
413+ self . clone ( ) . handle_message ( msg, outgoing. clone ( ) ) . await ?;
414+ }
415+ Err ( elapsed) => {
416+ warn ! ( "[{}] {}" , & addr, elapsed) ;
417+ self . metrics . user_error ( & [ "user_timeout" , "" ] ) ;
418+ self . metrics . open_connections . dec ( ) ;
419+ return Ok ( ( ) ) ;
420+ }
421+ Ok ( Ok ( None ) ) => {
422+ info ! ( "[{}] Connection closed by the other side" , & addr) ;
423+ self . metrics . open_connections . dec ( ) ;
424+ return Ok ( ( ) ) ;
425+ }
426+ Ok ( Err ( e) ) => {
427+ error ! ( "Unexpected error: {}" , e) ;
428+ self . metrics . open_connections . dec ( ) ;
429+ return Ok ( ( ) ) ;
430+ }
431+ } ;
432+
433+ match incoming_filter
384434 . try_for_each ( |msg| self . clone ( ) . handle_message ( msg, outgoing. clone ( ) ) )
385435 . await
386436 {
@@ -1082,9 +1132,9 @@ impl Batcher {
10821132 return None ;
10831133 }
10841134
1085- if block_number < * last_uploaded_batch_block_lock + self . max_block_interval {
1135+ if block_number < * last_uploaded_batch_block_lock + self . min_block_interval {
10861136 info ! (
1087- "Current batch not ready to be posted. Minimium amount of {} blocks have not passed. Block passed: {}" , self . max_block_interval ,
1137+ "Current batch not ready to be posted. Minimium amount of {} blocks have not passed. Block passed: {}" , self . min_block_interval ,
10881138 block_number - * last_uploaded_batch_block_lock,
10891139 ) ;
10901140 return None ;
@@ -1418,7 +1468,7 @@ impl Batcher {
14181468 }
14191469 }
14201470
1421- /// Sends a `create_new_task` transaction to Ethereum and waits for a maximum of 3 blocks for the receipt.
1471+ /// Sends a `create_new_task` transaction to Ethereum and waits for a maximum of 8 blocks for the receipt.
14221472 /// Retries up to `ETHEREUM_CALL_MAX_RETRIES` times using exponential backoff on recoverable errors while trying to send the transaction:
14231473 /// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
14241474 /// `ReceiptNotFoundError` is treated as non-recoverable, and the transaction will be canceled using `cancel_create_new_task_tx` in that case.
@@ -1469,7 +1519,7 @@ impl Batcher {
14691519
14701520 /// Sends a transaction to Ethereum with the same nonce as the previous one to override it.
14711521 /// Retries on recoverable errors with exponential backoff.
1472- /// Bumps the fee if not included in 3 blocks, using `calculate_bumped_gas_price`.
1522+ /// Bumps the fee if not included in 6 blocks, using `calculate_bumped_gas_price`.
14731523 /// In the first 5 attemps, bumps the fee every 3 blocks. Then exponential backoff takes over.
14741524 /// After 2 hours (attempt 13), retries occur hourly for 1 day (33 retries).
14751525 pub async fn cancel_create_new_task_tx ( & self , old_tx_gas_price : U256 ) {
0 commit comments