@@ -39,8 +39,8 @@ use aligned_sdk::common::types::{
3939
4040use aws_sdk_s3:: client:: Client as S3Client ;
4141use eth:: payment_service:: { BatcherPaymentService , CreateNewTaskFeeParams , SignerMiddlewareT } ;
42- use ethers:: prelude:: { Middleware , Provider } ;
43- use ethers:: types:: { Address , Signature , TransactionReceipt , U256 } ;
42+ use ethers:: prelude:: { Middleware , Provider , Http } ;
43+ use ethers:: types:: { Address , Signature , TransactionReceipt , U256 , U64 } ;
4444use futures_util:: { future, join, SinkExt , StreamExt , TryStreamExt } ;
4545use lambdaworks_crypto:: merkle_tree:: merkle:: MerkleTree ;
4646use lambdaworks_crypto:: merkle_tree:: traits:: IsMerkleTreeBackend ;
@@ -86,6 +86,8 @@ pub struct Batcher {
8686 eth_ws_url_fallback : String ,
8787 batcher_signer : Arc < SignerMiddlewareT > ,
8888 batcher_signer_fallback : Arc < SignerMiddlewareT > ,
89+ eth_http_provider : Provider < Http > ,
90+ eth_http_provider_fallback : Provider < Http > ,
8991 chain_id : U256 ,
9092 payment_service : BatcherPaymentService ,
9193 payment_service_fallback : BatcherPaymentService ,
@@ -315,6 +317,8 @@ impl Batcher {
315317 eth_ws_url_fallback : config. eth_ws_url_fallback ,
316318 batcher_signer,
317319 batcher_signer_fallback,
320+ eth_http_provider,
321+ eth_http_provider_fallback,
318322 chain_id,
319323 payment_service,
320324 payment_service_fallback,
@@ -491,6 +495,160 @@ impl Batcher {
491495 . map_err ( |e| e. inner ( ) )
492496 }
493497
498+ /// Poll for BalanceUnlocked events from BatcherPaymentService contract.
499+ /// Runs every 10 minutes and checks the last 100 blocks for events.
500+ /// When an event is detected, removes user's proofs from queue and resets UserState.
501+ pub async fn poll_balance_unlocked_events ( self : Arc < Self > ) -> Result < ( ) , BatcherError > {
502+ let mut interval = tokio:: time:: interval ( tokio:: time:: Duration :: from_secs ( 20 ) ) ; // 10 minutes
503+
504+ loop {
505+ interval. tick ( ) . await ;
506+
507+ if let Err ( e) = self . process_balance_unlocked_events ( ) . await {
508+ error ! ( "Error processing BalanceUnlocked events: {:?}" , e) ;
509+ // Continue polling even if there's an error
510+ }
511+ }
512+ }
513+
514+ async fn process_balance_unlocked_events ( & self ) -> Result < ( ) , BatcherError > {
515+ // Get current block number using HTTP providers
516+ let current_block = match self . get_current_block_number ( ) . await {
517+ Ok ( block) => block,
518+ Err ( e) => {
519+ warn ! ( "Failed to get current block number: {:?}" , e) ;
520+ return Ok ( ( ) ) ;
521+ }
522+ } ;
523+
524+ // Calculate the block range (last 100 blocks)
525+ let from_block = current_block. saturating_sub ( U64 :: from ( 100 ) ) ;
526+
527+ // Create filter for BalanceUnlocked events
528+ let filter = self . payment_service
529+ . balance_unlocked_filter ( )
530+ . from_block ( from_block)
531+ . to_block ( current_block) ;
532+
533+ // Query events
534+ let events = match filter. query ( ) . await {
535+ Ok ( events) => events,
536+ Err ( e) => {
537+ warn ! ( "Failed to query BalanceUnlocked events: {:?}" , e) ;
538+ return Ok ( ( ) ) ;
539+ }
540+ } ;
541+
542+ info ! ( "Found {} BalanceUnlocked events in blocks {} to {}" ,
543+ events. len( ) , from_block, current_block) ;
544+
545+ // Process each event
546+ for event in events {
547+ let user_address = event. user ;
548+ info ! ( "Processing BalanceUnlocked event for user: {:?}" , user_address) ;
549+
550+ // Check if user has proofs in queue
551+ if self . user_has_proofs_in_queue ( user_address) . await {
552+ info ! ( "User {:?} has proofs in queue, removing them and resetting UserState" , user_address) ;
553+ self . remove_user_proofs_and_reset_state ( user_address) . await ;
554+ } else {
555+ info ! ( "User {:?} has no proofs in queue, ignoring event" , user_address) ;
556+ }
557+ }
558+
559+ Ok ( ( ) )
560+ }
561+
562+ async fn get_current_block_number ( & self ) -> Result < U64 , BatcherError > {
563+ // Try primary provider first
564+ match self . eth_http_provider . get_block_number ( ) . await {
565+ Ok ( block) => Ok ( block) ,
566+ Err ( _) => {
567+ // Fallback to secondary provider
568+ self . eth_http_provider_fallback . get_block_number ( ) . await
569+ . map_err ( |e| BatcherError :: EthereumProviderError ( e. to_string ( ) ) )
570+ }
571+ }
572+ }
573+
574+ async fn user_has_proofs_in_queue ( & self , user_address : Address ) -> bool {
575+ let user_states = self . user_states . read ( ) . await ;
576+ if let Some ( user_state) = user_states. get ( & user_address) {
577+ if let Some ( user_state_guard) = self . try_user_lock_with_timeout ( user_address, user_state. lock ( ) ) . await {
578+ user_state_guard. proofs_in_batch > 0
579+ } else {
580+ false
581+ }
582+ } else {
583+ false
584+ }
585+ }
586+
587+ async fn remove_user_proofs_and_reset_state ( & self , user_address : Address ) {
588+ // Follow locking rules: acquire user_states before batch_state to avoid deadlocks
589+ let user_states = self . user_states . write ( ) . await ;
590+
591+ // Use timeout for batch lock
592+ let batch_state_guard = match self . try_batch_lock_with_timeout ( self . batch_state . lock ( ) ) . await {
593+ Some ( guard) => guard,
594+ None => {
595+ warn ! ( "Failed to acquire batch lock for user {:?}, skipping removal" , user_address) ;
596+ return ;
597+ }
598+ } ;
599+
600+ let mut batch_state_guard = batch_state_guard;
601+ let mut proofs_to_remove = Vec :: new ( ) ;
602+ let mut websocket_sinks = Vec :: new ( ) ;
603+
604+ // Collect all entries for this user and their websocket connections
605+ for ( entry, _) in batch_state_guard. batch_queue . iter ( ) {
606+ if entry. sender == user_address {
607+ // Store websocket sink before removing the entry
608+ if let Some ( ws_sink) = entry. messaging_sink . as_ref ( ) {
609+ websocket_sinks. push ( ws_sink. clone ( ) ) ;
610+ }
611+ proofs_to_remove. push ( entry. clone ( ) ) ;
612+ }
613+ }
614+
615+ // Notify users via websocket before removing their proofs
616+ for ws_sink in & websocket_sinks {
617+ send_message (
618+ ws_sink. clone ( ) ,
619+ aligned_sdk:: common:: types:: SubmitProofResponseMessage :: UserFundsUnlocked ,
620+ ) . await ;
621+ }
622+
623+ // Remove collected entries
624+ for entry in proofs_to_remove {
625+ batch_state_guard. batch_queue . remove ( & entry) ;
626+ info ! ( "Removed proof for user {:?} from batch queue" , user_address) ;
627+ }
628+
629+ // Close websocket connections
630+ for ws_sink in websocket_sinks {
631+ let mut sink_guard = ws_sink. write ( ) . await ;
632+ if let Err ( e) = sink_guard. close ( ) . await {
633+ warn ! ( "Error closing websocket for user {:?}: {:?}" , user_address, e) ;
634+ } else {
635+ info ! ( "Closed websocket connection for user {:?}" , user_address) ;
636+ }
637+ }
638+
639+ // Reset UserState using timeout
640+ if let Some ( user_state) = user_states. get ( & user_address) {
641+ if let Some ( mut user_state_guard) = self . try_user_lock_with_timeout ( user_address, user_state. lock ( ) ) . await {
642+ user_state_guard. proofs_in_batch = 0 ;
643+ user_state_guard. total_fees_in_queue = U256 :: zero ( ) ;
644+ user_state_guard. last_max_fee_limit = U256 :: max_value ( ) ;
645+ info ! ( "Reset UserState for user {:?}" , user_address) ;
646+ } else {
647+ warn ! ( "Failed to acquire user lock for {:?}, skipping UserState reset" , user_address) ;
648+ }
649+ }
650+ }
651+
494652 pub async fn listen_new_blocks_retryable (
495653 self : Arc < Self > ,
496654 ) -> Result < ( ) , RetryError < BatcherError > > {
0 commit comments