@@ -7,10 +7,10 @@ use eth::utils::{calculate_bumped_gas_price, get_batcher_signer, get_gas_price};
77use ethers:: contract:: ContractError ;
88use ethers:: signers:: Signer ;
99use retry:: batcher_retryables:: {
10- cancel_create_new_task_retryable, create_new_task_retryable, get_user_balance_retryable ,
11- get_user_nonce_from_ethereum_retryable , simulate_create_new_task_retryable ,
12- user_balance_is_unlocked_retryable , get_current_block_number_retryable ,
13- query_balance_unlocked_events_retryable ,
10+ cancel_create_new_task_retryable, create_new_task_retryable,
11+ get_current_block_number_retryable , get_user_balance_retryable ,
12+ get_user_nonce_from_ethereum_retryable , query_balance_unlocked_events_retryable ,
13+ simulate_create_new_task_retryable , user_balance_is_unlocked_retryable ,
1414} ;
1515use retry:: { retry_function, RetryError } ;
1616use tokio:: time:: { timeout, Instant } ;
@@ -40,7 +40,7 @@ use aligned_sdk::common::types::{
4040
4141use aws_sdk_s3:: client:: Client as S3Client ;
4242use eth:: payment_service:: { BatcherPaymentService , CreateNewTaskFeeParams , SignerMiddlewareT } ;
43- use ethers:: prelude:: { Middleware , Provider , Http } ;
43+ use ethers:: prelude:: { Http , Middleware , Provider } ;
4444use ethers:: types:: { Address , Signature , TransactionReceipt , U256 , U64 } ;
4545use futures_util:: { future, join, SinkExt , StreamExt , TryStreamExt } ;
4646use lambdaworks_crypto:: merkle_tree:: merkle:: MerkleTree ;
@@ -333,7 +333,9 @@ impl Batcher {
333333 max_batch_proof_qty : config. batcher . max_batch_proof_qty ,
334334 amount_of_proofs_for_min_max_fee : config. batcher . amount_of_proofs_for_min_max_fee ,
335335 min_bump_percentage : U256 :: from ( config. batcher . min_bump_percentage ) ,
336- balance_unlock_polling_interval_seconds : config. batcher . balance_unlock_polling_interval_seconds ,
336+ balance_unlock_polling_interval_seconds : config
337+ . batcher
338+ . balance_unlock_polling_interval_seconds ,
337339 last_uploaded_batch_block : Mutex :: new ( last_uploaded_batch_block) ,
338340 pre_verification_is_enabled : config. batcher . pre_verification_is_enabled ,
339341 non_paying_config,
@@ -502,11 +504,13 @@ impl Batcher {
502504 /// Runs at configurable intervals and checks recent blocks for events (2x the polling interval).
503505 /// When an event is detected, removes user's proofs from queue and resets UserState.
504506 pub async fn poll_balance_unlocked_events ( self : Arc < Self > ) -> Result < ( ) , BatcherError > {
505- let mut interval = tokio:: time:: interval ( tokio:: time:: Duration :: from_secs ( self . balance_unlock_polling_interval_seconds ) ) ;
506-
507+ let mut interval = tokio:: time:: interval ( tokio:: time:: Duration :: from_secs (
508+ self . balance_unlock_polling_interval_seconds ,
509+ ) ) ;
510+
507511 loop {
508512 interval. tick ( ) . await ;
509-
513+
510514 if let Err ( e) = self . process_balance_unlocked_events ( ) . await {
511515 error ! ( "Error processing BalanceUnlocked events: {:?}" , e) ;
512516 // Continue polling even if there's an error
@@ -528,37 +532,59 @@ impl Batcher {
528532 // Formula: interval / 12 * 2 (assuming 12-second block times, look back 2x the interval)
529533 let block_range = ( self . balance_unlock_polling_interval_seconds / 12 ) * 2 ;
530534 let from_block = current_block. saturating_sub ( U64 :: from ( block_range) ) ;
531-
535+
532536 // Query events with retry logic
533- let events = match self . query_balance_unlocked_events ( from_block, current_block) . await {
537+ let events = match self
538+ . query_balance_unlocked_events ( from_block, current_block)
539+ . await
540+ {
534541 Ok ( events) => events,
535542 Err ( e) => {
536- warn ! ( "Failed to query BalanceUnlocked events after retries: {:?}" , e) ;
543+ warn ! (
544+ "Failed to query BalanceUnlocked events after retries: {:?}" ,
545+ e
546+ ) ;
537547 return Ok ( ( ) ) ;
538548 }
539549 } ;
540550
541- info ! ( "Found {} BalanceUnlocked events in blocks {} to {}" ,
542- events. len( ) , from_block, current_block) ;
551+ info ! (
552+ "Found {} BalanceUnlocked events in blocks {} to {}" ,
553+ events. len( ) ,
554+ from_block,
555+ current_block
556+ ) ;
543557
544558 // Process each event
545559 for event in events {
546560 let user_address = event. user ;
547- info ! ( "Processing BalanceUnlocked event for user: {:?}" , user_address) ;
548-
561+ info ! (
562+ "Processing BalanceUnlocked event for user: {:?}" ,
563+ user_address
564+ ) ;
565+
549566 // Check if user has proofs in queue
550567 if self . user_has_proofs_in_queue ( user_address) . await {
551- info ! ( "User {:?} has proofs in queue, verifying funds are still unlocked" , user_address) ;
552-
568+ info ! (
569+ "User {:?} has proofs in queue, verifying funds are still unlocked" ,
570+ user_address
571+ ) ;
572+
553573 // Double-check that funds are still unlocked by calling the contract
554574 if self . user_balance_is_unlocked ( & user_address) . await {
555575 info ! ( "User {:?} funds confirmed unlocked, removing proofs and resetting UserState" , user_address) ;
556576 self . remove_user_proofs_and_reset_state ( user_address) . await ;
557577 } else {
558- info ! ( "User {:?} funds are now locked, ignoring stale unlock event" , user_address) ;
578+ info ! (
579+ "User {:?} funds are now locked, ignoring stale unlock event" ,
580+ user_address
581+ ) ;
559582 }
560583 } else {
561- info ! ( "User {:?} has no proofs in queue, ignoring event" , user_address) ;
584+ info ! (
585+ "User {:?} has no proofs in queue, ignoring event" ,
586+ user_address
587+ ) ;
562588 }
563589 }
564590
@@ -591,7 +617,12 @@ impl Batcher {
591617 /// Queries BalanceUnlocked events from the BatcherPaymentService contract.
592618 /// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
593619 /// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
594- async fn query_balance_unlocked_events ( & self , from_block : U64 , to_block : U64 ) -> Result < Vec < aligned_sdk:: eth:: batcher_payment_service:: BalanceUnlockedFilter > , BatcherError > {
620+ async fn query_balance_unlocked_events (
621+ & self ,
622+ from_block : U64 ,
623+ to_block : U64 ,
624+ ) -> Result < Vec < aligned_sdk:: eth:: batcher_payment_service:: BalanceUnlockedFilter > , BatcherError >
625+ {
595626 retry_function (
596627 || {
597628 query_balance_unlocked_events_retryable (
@@ -616,7 +647,10 @@ impl Batcher {
616647 async fn user_has_proofs_in_queue ( & self , user_address : Address ) -> bool {
617648 let user_states = self . user_states . read ( ) . await ;
618649 if let Some ( user_state) = user_states. get ( & user_address) {
619- if let Some ( user_state_guard) = self . try_user_lock_with_timeout ( user_address, user_state. lock ( ) ) . await {
650+ if let Some ( user_state_guard) = self
651+ . try_user_lock_with_timeout ( user_address, user_state. lock ( ) )
652+ . await
653+ {
620654 user_state_guard. proofs_in_batch > 0
621655 } else {
622656 false
@@ -629,20 +663,28 @@ impl Batcher {
629663 async fn remove_user_proofs_and_reset_state ( & self , user_address : Address ) {
630664 // Follow locking rules: acquire user_states before batch_state to avoid deadlocks
631665 let user_states = self . user_states . write ( ) . await ;
632-
666+
633667 // Use timeout for batch lock
634- let batch_state_guard = match self . try_batch_lock_with_timeout ( self . batch_state . lock ( ) ) . await {
668+ let batch_state_guard = match self
669+ . try_batch_lock_with_timeout ( self . batch_state . lock ( ) )
670+ . await
671+ {
635672 Some ( guard) => guard,
636673 None => {
637- warn ! ( "Failed to acquire batch lock for user {:?}, skipping removal" , user_address) ;
674+ warn ! (
675+ "Failed to acquire batch lock for user {:?}, skipping removal" ,
676+ user_address
677+ ) ;
638678 return ;
639679 }
640680 } ;
641-
681+
642682 let mut batch_state_guard = batch_state_guard;
643-
683+
644684 // Process all entries for this user
645- while let Some ( entry) = batch_state_guard. batch_queue . iter ( )
685+ while let Some ( entry) = batch_state_guard
686+ . batch_queue
687+ . iter ( )
646688 . find ( |( entry, _) | entry. sender == user_address)
647689 . map ( |( entry, _) | entry. clone ( ) )
648690 {
@@ -651,32 +693,46 @@ impl Batcher {
651693 send_message (
652694 ws_sink. clone ( ) ,
653695 SubmitProofResponseMessage :: UserFundsUnlocked ,
654- ) . await ;
696+ )
697+ . await ;
655698
656699 // Close websocket connection
657700 let mut sink_guard = ws_sink. write ( ) . await ;
658701 if let Err ( e) = sink_guard. close ( ) . await {
659- warn ! ( "Error closing websocket for user {:?}: {:?}" , user_address, e) ;
702+ warn ! (
703+ "Error closing websocket for user {:?}: {:?}" ,
704+ user_address, e
705+ ) ;
660706 } else {
661707 info ! ( "Closed websocket connection for user {:?}" , user_address) ;
662708 }
663709 }
664710
665711 // Remove the entry from batch queue
666712 batch_state_guard. batch_queue . remove ( & entry) ;
667- info ! ( "Removed proof with nonce {} for user {:?} from batch queue" , entry. nonced_verification_data. nonce, user_address) ;
713+ info ! (
714+ "Removed proof with nonce {} for user {:?} from batch queue" ,
715+ entry. nonced_verification_data. nonce, user_address
716+ ) ;
668717 }
669718
670719 // Reset UserState using timeout
671720 if let Some ( user_state) = user_states. get ( & user_address) {
672- if let Some ( mut user_state_guard) = self . try_user_lock_with_timeout ( user_address, user_state. lock ( ) ) . await {
673- user_state_guard. nonce -= U256 :: from ( user_state_guard. proofs_in_batch ) ;
721+ if let Some ( mut user_state_guard) = self
722+ . try_user_lock_with_timeout ( user_address, user_state. lock ( ) )
723+ . await
724+ {
725+ let proofs_count = user_state_guard. proofs_in_batch ;
726+ user_state_guard. nonce -= U256 :: from ( proofs_count) ;
674727 user_state_guard. proofs_in_batch = 0 ;
675728 user_state_guard. total_fees_in_queue = U256 :: zero ( ) ;
676729 user_state_guard. last_max_fee_limit = U256 :: max_value ( ) ;
677730 info ! ( "Reset UserState for user {:?}" , user_address) ;
678731 } else {
679- warn ! ( "Failed to acquire user lock for {:?}, skipping UserState reset" , user_address) ;
732+ warn ! (
733+ "Failed to acquire user lock for {:?}, skipping UserState reset" ,
734+ user_address
735+ ) ;
680736 }
681737 }
682738 }
0 commit comments