@@ -17,7 +17,6 @@ use types::batch_state::BatchState;
1717use types:: user_state:: UserState ;
1818
1919use batch_queue:: calculate_batch_size;
20- use dashmap:: DashMap ;
2120use std:: collections:: HashMap ;
2221use std:: env;
2322use std:: net:: SocketAddr ;
@@ -109,9 +108,9 @@ pub struct Batcher {
109108 /// Flag to indicate when recovery is in progress
110109 /// When true, message handlers will return ServerBusy responses
111110 /// It's used a way to "lock" all the user_states at the same time
112- /// If one needed is taken in the handle message it will timeout
111+ /// If one needed is taken in the handle message it will time out
113112 is_recovering_from_submission_failure : RwLock < bool > ,
114- user_states : DashMap < Address , Arc < Mutex < UserState > > > ,
113+ user_states : Arc < RwLock < HashMap < Address , Arc < Mutex < UserState > > > > > ,
115114
116115 last_uploaded_batch_block : Mutex < u64 > ,
117116
@@ -181,7 +180,7 @@ impl Batcher {
181180 let deployment_output =
182181 ContractDeploymentOutput :: new ( config. aligned_layer_deployment_config_file_path ) ;
183182
184- log :: info!(
183+ info ! (
185184 "Starting metrics server on port {}" ,
186185 config. batcher. metrics_port
187186 ) ;
@@ -262,7 +261,7 @@ impl Batcher {
262261 . await
263262 . expect ( "Failed to get fallback Service Manager contract" ) ;
264263
265- let user_states = DashMap :: new ( ) ;
264+ let user_states = Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ;
266265 let batch_state = BatchState :: new ( config. batcher . max_queue_size ) ;
267266 let non_paying_config = if let Some ( non_paying_config) = config. batcher . non_paying {
268267 warn ! ( "Non-paying address configuration detected. Will replace non-paying address {} with configured address." ,
@@ -276,7 +275,7 @@ impl Batcher {
276275 . expect ( "Could not get non-paying nonce from Ethereum" ) ;
277276
278277 let non_paying_user_state = UserState :: new ( nonpaying_nonce) ;
279- user_states. insert (
278+ user_states. write ( ) . await . insert (
280279 non_paying_config. replacement . address ( ) ,
281280 Arc :: new ( Mutex :: new ( non_paying_user_state) ) ,
282281 ) ;
@@ -335,7 +334,7 @@ impl Batcher {
335334 }
336335 }
337336
338- fn update_evicted_user_state_with_lock (
337+ async fn update_evicted_user_state_with_lock (
339338 & self ,
340339 removed_entry : & types:: batch_queue:: BatchQueueEntry ,
341340 batch_queue : & types:: batch_queue:: BatchQueue ,
@@ -350,7 +349,7 @@ impl Batcher {
350349 {
351350 Some ( ( last_entry, _) ) => last_entry. nonced_verification_data . max_fee ,
352351 None => {
353- self . user_states . remove ( & addr) ;
352+ self . user_states . write ( ) . await . remove ( & addr) ;
354353 return ;
355354 }
356355 } ;
@@ -376,12 +375,12 @@ impl Batcher {
376375 {
377376 Some ( ( last_entry, _) ) => last_entry. nonced_verification_data . max_fee ,
378377 None => {
379- self . user_states . remove ( & addr) ;
378+ self . user_states . write ( ) . await . remove ( & addr) ;
380379 return Some ( ( ) ) ;
381380 }
382381 } ;
383382
384- let user_state = self . user_states . get ( & addr) ?;
383+ let user_state = self . user_states . read ( ) . await . get ( & addr) ?. clone ( ) ;
385384 let mut user_state_guard = user_state. lock ( ) . await ;
386385 user_state_guard. proofs_in_batch -= 1 ;
387386 user_state_guard. nonce -= U256 :: one ( ) ;
@@ -392,7 +391,7 @@ impl Batcher {
392391
393392 fn calculate_new_user_states_data (
394393 & self ,
395- batch_queue : & types :: batch_queue:: BatchQueue ,
394+ batch_queue : & batch_queue:: BatchQueue ,
396395 ) -> HashMap < Address , ( usize , U256 , U256 ) > {
397396 let mut updated_user_states = HashMap :: new ( ) ;
398397 for ( entry, _) in batch_queue. iter ( ) {
@@ -735,7 +734,7 @@ impl Batcher {
735734 }
736735
737736 let cached_user_nonce = {
738- let user_state_ref = self . user_states . get ( & address) ;
737+ let user_state_ref = self . user_states . read ( ) . await . get ( & address) . cloned ( ) ;
739738 match user_state_ref {
740739 Some ( user_state_ref) => {
741740 let Some ( user_state_guard) = self
@@ -875,16 +874,20 @@ impl Batcher {
875874 // If it was not present, then the user nonce is queried to the Aligned contract.
876875 // Lastly, we get a lock of the batch state again and insert the user state if it was still missing.
877876
878- let is_user_in_state = self . user_states . contains_key ( & addr) ;
877+ let is_user_in_state = self . user_states . read ( ) . await . contains_key ( & addr) ;
879878
880879 if !is_user_in_state {
880+ debug ! ( "User state for address {addr:?} not found, creating a new one" ) ;
881881 // We add a dummy user state to grab a lock on the user state
882882 let dummy_user_state = UserState :: new ( U256 :: zero ( ) ) ;
883883 self . user_states
884+ . write ( )
885+ . await
884886 . insert ( addr, Arc :: new ( Mutex :: new ( dummy_user_state) ) ) ;
887+ debug ! ( "Dummy user state for address {addr:?} created" ) ;
885888 }
886889
887- let Some ( user_state_ref) = self . user_states . get ( & addr) else {
890+ let Some ( user_state_ref) = self . user_states . read ( ) . await . get ( & addr) . cloned ( ) else {
888891 error ! ( "This should never happen, user state has previously been inserted if it didn't exist" ) ;
889892 send_message (
890893 ws_conn_sink. clone ( ) ,
@@ -1042,7 +1045,9 @@ impl Batcher {
10421045
10431046 // Try to find any candidate whose lock we can acquire and immediately process them
10441047 for candidate_addr in eviction_candidates {
1045- if let Some ( user_state_arc) = self . user_states . get ( & candidate_addr) {
1048+ if let Some ( user_state_arc) =
1049+ self . user_states . read ( ) . await . get ( & candidate_addr) . cloned ( )
1050+ {
10461051 if let Ok ( mut user_guard) = user_state_arc. try_lock ( ) {
10471052 // Found someone whose lock we can get - now find and remove their entry
10481053 let entries_to_check: Vec < _ > = batch_state_lock
@@ -1076,7 +1081,8 @@ impl Batcher {
10761081 & removed,
10771082 & batch_state_lock. batch_queue ,
10781083 & mut user_guard,
1079- ) ;
1084+ )
1085+ . await ;
10801086
10811087 if let Some ( ref removed_entry_ws) = removed. messaging_sink {
10821088 let ws_sink = removed_entry_ws. clone ( ) ;
@@ -1145,8 +1151,6 @@ impl Batcher {
11451151 user_state_guard. last_max_fee_limit = max_fee;
11461152 user_state_guard. proofs_in_batch += 1 ;
11471153 user_state_guard. total_fees_in_queue += max_fee;
1148-
1149- info ! ( "Verification data message handled" ) ;
11501154 Ok ( ( ) )
11511155 }
11521156
@@ -1530,7 +1534,7 @@ impl Batcher {
15301534 ) -> Result < ( ) , BatcherError > {
15311535 // Update each user's state with proper lock ordering
15321536 for addr in affected_users {
1533- if let Some ( user_state) = self . user_states . get ( & addr) {
1537+ if let Some ( user_state) = self . user_states . read ( ) . await . get ( & addr) . cloned ( ) {
15341538 let mut user_state_guard = user_state. lock ( ) . await ; // First: user lock
15351539 let batch_state_lock = self . batch_state . lock ( ) . await ; // Second: batch lock
15361540
@@ -1565,7 +1569,10 @@ impl Batcher {
15651569 /// Cleans up user states after successful batch submission.
15661570 /// Resets last_max_fee_limit to U256::MAX for users who had proofs in the submitted batch
15671571 /// but now have no proofs left in the queue.
1568- fn cleanup_user_states_after_successful_submission ( & self , finalized_batch : & [ BatchQueueEntry ] ) {
1572+ async fn cleanup_user_states_after_successful_submission (
1573+ & self ,
1574+ finalized_batch : & [ BatchQueueEntry ] ,
1575+ ) {
15691576 use std:: collections:: HashSet ;
15701577
15711578 // Get unique users from the submitted batch
@@ -1589,7 +1596,8 @@ impl Batcher {
15891596 for user_addr in users_in_batch {
15901597 if !current_user_states. contains_key ( & user_addr) {
15911598 // User has no proofs left in queue - reset their max_fee_limit
1592- if let Some ( user_state_ref) = self . user_states . get ( & user_addr) {
1599+ if let Some ( user_state_ref) = self . user_states . read ( ) . await . get ( & user_addr) . cloned ( )
1600+ {
15931601 if let Ok ( mut user_state_guard) = user_state_ref. try_lock ( ) {
15941602 user_state_guard. last_max_fee_limit = U256 :: max_value ( ) ;
15951603 }
@@ -1810,7 +1818,8 @@ impl Batcher {
18101818 }
18111819
18121820 // Clean up user states for users who had proofs in this batch but now have no proofs left
1813- self . cleanup_user_states_after_successful_submission ( finalized_batch) ;
1821+ self . cleanup_user_states_after_successful_submission ( finalized_batch)
1822+ . await ;
18141823
18151824 connection:: send_batch_inclusion_data_responses ( finalized_batch, & batch_merkle_tree) . await
18161825 }
@@ -1828,7 +1837,7 @@ impl Batcher {
18281837
18291838 let Some ( nonpaying_replacement_addr) = self . get_nonpaying_replacement_addr ( ) else {
18301839 batch_state_lock. batch_queue . clear ( ) ;
1831- self . user_states . clear ( ) ;
1840+ self . user_states . write ( ) . await . clear ( ) ;
18321841 return ;
18331842 } ;
18341843
@@ -1840,13 +1849,13 @@ impl Batcher {
18401849 . await
18411850 else {
18421851 batch_state_lock. batch_queue . clear ( ) ;
1843- self . user_states . clear ( ) ;
1852+ self . user_states . write ( ) . await . clear ( ) ;
18441853 return ;
18451854 } ;
18461855 batch_state_lock. batch_queue . clear ( ) ;
1847- self . user_states . clear ( ) ;
1856+ self . user_states . write ( ) . await . clear ( ) ;
18481857 let nonpaying_user_state = UserState :: new ( nonpaying_replacement_addr_nonce) ;
1849- self . user_states . insert (
1858+ self . user_states . write ( ) . await . insert (
18501859 nonpaying_replacement_addr,
18511860 Arc :: new ( Mutex :: new ( nonpaying_user_state) ) ,
18521861 ) ;
0 commit comments