@@ -86,7 +86,7 @@ pub struct Batcher {
8686 service_manager : ServiceManager ,
8787 service_manager_fallback : ServiceManager ,
8888 batch_state : Mutex < BatchState > ,
89- user_states : DashMap < Address , Mutex < UserState > > ,
89+ user_states : DashMap < Address , Arc < Mutex < UserState > > > ,
9090 min_block_interval : u64 ,
9191 transaction_wait_timeout : u64 ,
9292 max_proof_size : usize ,
@@ -229,7 +229,7 @@ impl Batcher {
229229 let non_paying_user_state = UserState :: new ( nonpaying_nonce) ;
230230 user_states. insert (
231231 non_paying_config. replacement . address ( ) ,
232- Mutex :: new ( non_paying_user_state) ,
232+ Arc :: new ( Mutex :: new ( non_paying_user_state) ) ,
233233 ) ;
234234
235235 Some ( non_paying_config)
@@ -762,7 +762,6 @@ impl Batcher {
762762 }
763763 }
764764
765- info ! ( "Handling message" ) ;
766765
767766 // We don't need a batch state lock here, since if the user locks its funds
768767 // after the check, some blocks should pass until he can withdraw.
@@ -771,12 +770,37 @@ impl Batcher {
771770 return Ok ( ( ) ) ;
772771 }
773772
773+ info ! ( "Handling message, locking user state" ) ;
774+
775+
776+
774777 // We acquire the lock first only to query if the user is already present and the lock is dropped.
775778 // If it was not present, then the user nonce is queried to the Aligned contract.
776779 // Lastly, we get a lock of the batch state again and insert the user state if it was still missing.
777780
778781 let is_user_in_state = self . user_states . contains_key ( & addr) ;
782+
783+ if !is_user_in_state {
784+ // We add a dummy user state to grab a lock on the user state
785+ let dummy_user_state = UserState :: new ( U256 :: zero ( ) ) ;
786+ self . user_states . insert ( addr, Arc :: new ( Mutex :: new ( dummy_user_state) ) ) ;
787+ }
788+
789+ let Some ( user_state_ref) = self . user_states . get ( & addr) else {
790+ error ! ( "This should never happen, user state has previously been inserted if it didn't exist" ) ;
791+ send_message (
792+ ws_conn_sink. clone ( ) ,
793+ SubmitProofResponseMessage :: AddToBatchError ,
794+ )
795+ . await ;
796+ self . metrics . user_error ( & [ "batcher_state_error" , "" ] ) ;
797+ return Ok ( ( ) ) ;
798+ } ;
799+
800+ // We acquire the lock on the user state, now everything will be processed sequentially
801+ let _user_state_guard = user_state_ref. lock ( ) . await ;
779802
803+ // If the user state was not present, we need to get the nonce from the Ethereum contract and update the dummy user state
780804 if !is_user_in_state {
781805 let ethereum_user_nonce = match self . get_user_nonce_from_ethereum ( addr) . await {
782806 Ok ( ethereum_user_nonce) => ethereum_user_nonce,
@@ -794,7 +818,7 @@ impl Batcher {
794818 }
795819 } ;
796820 let user_state = UserState :: new ( ethereum_user_nonce) ;
797- self . user_states . entry ( addr) . or_insert ( Mutex :: new ( user_state) ) ;
821+ self . user_states . entry ( addr) . or_insert ( Arc :: new ( Mutex :: new ( user_state) ) ) ;
798822 }
799823
800824 // * ---------------------------------------------------*
@@ -823,6 +847,7 @@ impl Batcher {
823847 None => None ,
824848 }
825849 } ;
850+
826851 let Some ( user_last_max_fee_limit) = user_last_max_fee_limit else {
827852 send_message (
828853 ws_conn_sink. clone ( ) ,
@@ -1538,7 +1563,7 @@ impl Batcher {
15381563 self . user_states . clear ( ) ;
15391564 let nonpaying_user_state = UserState :: new ( nonpaying_replacement_addr_nonce) ;
15401565 self . user_states
1541- . insert ( nonpaying_replacement_addr, Mutex :: new ( nonpaying_user_state) ) ;
1566+ . insert ( nonpaying_replacement_addr, Arc :: new ( Mutex :: new ( nonpaying_user_state) ) ) ;
15421567
15431568 self . metrics . update_queue_metrics ( 0 , 0 ) ;
15441569 }
0 commit comments