@@ -598,42 +598,31 @@ impl Batcher {
598598 } ;
599599
600600 let mut batch_state_guard = batch_state_guard;
601- let mut proofs_to_remove = Vec :: new ( ) ;
602- let mut websocket_sinks = Vec :: new ( ) ;
603601
604- // Collect all entries for this user and their websocket connections
605- for ( entry, _) in batch_state_guard. batch_queue . iter ( ) {
606- if entry. sender == user_address {
607- // Store websocket sink before removing the entry
608- if let Some ( ws_sink) = entry. messaging_sink . as_ref ( ) {
609- websocket_sinks. push ( ws_sink. clone ( ) ) ;
602+ // Process all entries for this user directly
603+ while let Some ( entry) = batch_state_guard. batch_queue . iter ( )
604+ . find ( |( entry, _) | entry. sender == user_address)
605+ . map ( |( entry, _) | entry. clone ( ) )
606+ {
607+ // Notify user via websocket before removing the proof
608+ if let Some ( ws_sink) = entry. messaging_sink . as_ref ( ) {
609+ send_message (
610+ ws_sink. clone ( ) ,
611+ SubmitProofResponseMessage :: UserFundsUnlocked ,
612+ ) . await ;
613+
614+ // Close websocket connection
615+ let mut sink_guard = ws_sink. write ( ) . await ;
616+ if let Err ( e) = sink_guard. close ( ) . await {
617+ warn ! ( "Error closing websocket for user {:?}: {:?}" , user_address, e) ;
618+ } else {
619+ info ! ( "Closed websocket connection for user {:?}" , user_address) ;
610620 }
611- proofs_to_remove. push ( entry. clone ( ) ) ;
612621 }
613- }
614-
615- // Notify users via websocket before removing their proofs
616- for ws_sink in & websocket_sinks {
617- send_message (
618- ws_sink. clone ( ) ,
619- aligned_sdk:: common:: types:: SubmitProofResponseMessage :: UserFundsUnlocked ,
620- ) . await ;
621- }
622622
623- // Remove collected entries
624- for entry in proofs_to_remove {
623+ // Remove the entry from batch queue
625624 batch_state_guard. batch_queue . remove ( & entry) ;
626- info ! ( "Removed proof for user {:?} from batch queue" , user_address) ;
627- }
628-
629- // Close websocket connections
630- for ws_sink in websocket_sinks {
631- let mut sink_guard = ws_sink. write ( ) . await ;
632- if let Err ( e) = sink_guard. close ( ) . await {
633- warn ! ( "Error closing websocket for user {:?}: {:?}" , user_address, e) ;
634- } else {
635- info ! ( "Closed websocket connection for user {:?}" , user_address) ;
636- }
625+ info ! ( "Removed proof with nonce {} for user {:?} from batch queue" , entry. nonced_verification_data. nonce, user_address) ;
637626 }
638627
639628 // Reset UserState using timeout
0 commit comments