@@ -84,9 +84,23 @@ pub struct Batcher {
8484 payment_service_fallback : BatcherPaymentService ,
8585 service_manager : ServiceManager ,
8686 service_manager_fallback : ServiceManager ,
87+ /// Holds both the user state and the proofs queue.
88+ ///
89+ /// We should consider splitting the user state and the queue into separate mutexes
90+ /// to improve concurrency.
8791 batch_state : Mutex < BatchState > ,
88- batch_building_mutex : Mutex < ( ) > ,
89- user_mutexes : Mutex < HashMap < Address , Arc < Mutex < ( ) > > > > ,
92+ /// A mutex that signals an ongoing batch building process.
93+ /// It remains locked until the batch has been fully built.
94+ /// Used to synchronize the processing of proofs during batch construction.
95+ building_batch_mutex : Mutex < ( ) > ,
96+ /// A map of per-user mutexes used to synchronize proof processing.
97+ /// It allows us to mutate the users state atomically,
98+ /// while avoiding the need to lock the entire [`batch_state`] structure.
99+ ///
100+ /// During batch building, the process also locks these per-user mutexes
101+ /// (after acquiring [`building_batch_mutex`]) to ensure that all ongoing
102+ /// proof messages complete and the state remains consistent.
103+ user_proof_processing_mutexes : Mutex < HashMap < Address , Arc < Mutex < ( ) > > > > ,
90104 min_block_interval : u64 ,
91105 transaction_wait_timeout : u64 ,
92106 max_proof_size : usize ,
@@ -95,7 +109,6 @@ pub struct Batcher {
95109 last_uploaded_batch_block : Mutex < u64 > ,
96110 pre_verification_is_enabled : bool ,
97111 non_paying_config : Option < NonPayingConfig > ,
98- posting_batch : Mutex < bool > ,
99112 disabled_verifiers : Mutex < U256 > ,
100113 aggregator_fee_percentage_multiplier : u128 ,
101114 aggregator_gas_cost : u128 ,
@@ -275,10 +288,9 @@ impl Batcher {
275288 . batcher
276289 . aggregator_fee_percentage_multiplier ,
277290 aggregator_gas_cost : config. batcher . aggregator_gas_cost ,
278- posting_batch : Mutex :: new ( false ) ,
279291 batch_state : Mutex :: new ( batch_state) ,
280- user_mutexes : Mutex :: new ( HashMap :: new ( ) ) ,
281- batch_building_mutex : Mutex :: new ( ( ) ) ,
292+ user_proof_processing_mutexes : Mutex :: new ( HashMap :: new ( ) ) ,
293+ building_batch_mutex : Mutex :: new ( ( ) ) ,
282294 disabled_verifiers : Mutex :: new ( disabled_verifiers) ,
283295 metrics,
284296 telemetry,
@@ -615,11 +627,10 @@ impl Batcher {
615627 debug ! ( "Received message with nonce: {msg_nonce:?}" ) ;
616628 self . metrics . received_proofs . inc ( ) ;
617629
618- // if this is locked, then it means that the a batch is being built
619- // so we need to stop the processing
620- debug ! ( "Checking if a batch is being built before proceeding with the message" ) ;
621- let _ = self . batch_building_mutex . lock ( ) . await ;
622- debug ! ( "Batch building has finished or did't started, proceeding with the message" ) ;
630+ // Make sure there are no batches being built before processing the message
631+ debug ! ( "Checking if there is an ongoing batch before processing the message..." ) ;
632+ let _ = self . building_batch_mutex . lock ( ) . await ;
633+ debug ! ( "Batch building mutex acquired. Proceeding with message processing." ) ;
623634
624635 // * ---------------------------------------------------*
625636 // * Perform validations over the message *
@@ -727,21 +738,21 @@ impl Batcher {
727738 return Ok ( ( ) ) ;
728739 } ;
729740
730- // For now on until the message is fully processed, the batch state is locked
741+ // For now on until the message is fully processed, the corresponding user mutex is acquired from `user_proof_processing_mutexes`
731742 // This is needed because we need to query the user state to make validations and
732- // finally add the proof to the batch queue.
733-
743+ // finally add the proof to the batch queue which must be done individually .
744+ // This allows us to process each user without having to lock the whole `batch_state`
734745 debug ! ( "Trying to acquire user mutex for {:?}..." , addr_in_msg) ;
735746 let user_mutex = {
736- let mut map = self . user_mutexes . lock ( ) . await ;
747+ let mut map = self . user_proof_processing_mutexes . lock ( ) . await ;
737748 map. entry ( addr)
738749 . or_insert_with ( || Arc :: new ( Mutex :: new ( ( ) ) ) )
739750 . clone ( )
740751 } ;
741752
742753 // This looks very ugly but basically, we are doing the following:
743754 // 1. We try to acquire the `user_mutex`: this can take some time if there is another task with it
744- // 2. While that time that passes, the batcher might have tried to build a new batch, so we check the `batch_building_mutex `
755+ // 2. While that time that passes, the batcher might have tried to build a new batch, so we check the `building_batch_mutex `
745756 // 3. If it is taken, then release the lock so the batcher can continue building (as it is waiting for all user mutex to finish)
746757 // 4. If it isn't building then continue with the message
747758 //
@@ -751,7 +762,7 @@ impl Batcher {
751762 // Leading to a decrease batch throughput
752763 let _user_mutex = loop {
753764 let _user_mutex = user_mutex. lock ( ) . await ;
754- let res = self . batch_building_mutex . try_lock ( ) ;
765+ let res = self . building_batch_mutex . try_lock ( ) ;
755766 if res. is_ok ( ) {
756767 break _user_mutex;
757768 } else {
@@ -833,13 +844,6 @@ impl Batcher {
833844 return Ok ( ( ) ) ;
834845 }
835846
836- // We check this after replacement logic because if user wants to replace a proof, their
837- // new_max_fee must be greater or equal than old_max_fee
838- //
839- // Note: we don't do this before the handle_replacement_message as this operation can block for some time
840- // this is run again in the handle_replacement_message
841- // by enforcing stricter rules in replacements (a min bump + min fee) we can be sure this is run on valid message that user will actually pay for it
842- // and so running the pre-verification isn't free
843847 if msg_max_fee > user_last_max_fee_limit {
844848 warn ! ( "Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}" ) ;
845849 send_message (
@@ -851,8 +855,20 @@ impl Batcher {
851855 return Ok ( ( ) ) ;
852856 }
853857
854- // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
855- if let Err ( e) = self . verify_proof ( & nonced_verification_data) . await {
858+ // * ---------------------------------------------------*
859+ // * Validate proof *
860+ // * ---------------------------------------------------*
861+
862+ // Note: While it may seem obvious to run this before `handle_replacement_message` and avoid repeating code
863+ // We intentionally do not run this verification before `handle_replacement_message`
864+ // because this function is "expensive" and it may block for a few milliseconds.
865+ //
866+ // When handling replacement message, before running the verification we make sure
867+ // the user has bumped the fee enough so that running the pre-verification is justified.
868+ if let Err ( e) = self
869+ . verify_proof ( & nonced_verification_data. verification_data )
870+ . await
871+ {
856872 send_message (
857873 ws_conn_sink. clone ( ) ,
858874 SubmitProofResponseMessage :: InvalidProof ( e) ,
@@ -901,6 +917,7 @@ impl Batcher {
901917 batch_state_lock. update_user_state_on_entry_removal ( & removed_entry) ;
902918
903919 if let Some ( removed_entry_ws) = removed_entry. messaging_sink {
920+ std:: mem:: drop ( batch_state_lock) ;
904921 send_message (
905922 removed_entry_ws,
906923 SubmitProofResponseMessage :: UnderpricedProof ,
@@ -941,21 +958,67 @@ impl Batcher {
941958 return Ok ( ( ) ) ;
942959 } ;
943960
944- if self
961+ let Some ( user_proof_count) = self
962+ . batch_state
963+ . lock ( )
964+ . await
965+ . get_user_proof_count ( & addr)
966+ . await
967+ else {
968+ error ! ( "User state of address {addr} was not found when trying to update user state. This user state should have been present" ) ;
969+ send_message (
970+ ws_conn_sink. clone ( ) ,
971+ SubmitProofResponseMessage :: AddToBatchError ,
972+ )
973+ . await ;
974+ return Ok ( ( ) ) ;
975+ } ;
976+
977+ let Some ( current_total_fees_in_queue) = self
945978 . batch_state
946979 . lock ( )
947980 . await
948- . update_user_after_adding_proof ( addr, msg_nonce, msg_max_fee)
981+ . get_user_total_fees_in_queue ( & addr)
982+ . await
983+ else {
984+ error ! ( "User state of address {addr} was not found when trying to update user state. This user state should have been present" ) ;
985+ send_message (
986+ ws_conn_sink. clone ( ) ,
987+ SubmitProofResponseMessage :: AddToBatchError ,
988+ )
989+ . await ;
990+ return Ok ( ( ) ) ;
991+ } ;
992+
993+ // User state is updated
994+ if self
995+ . batch_state
996+ . lock ( )
949997 . await
950- . is_err ( )
998+ . update_user_state (
999+ & addr,
1000+ msg_nonce + U256 :: one ( ) ,
1001+ msg_max_fee,
1002+ user_proof_count + 1 ,
1003+ current_total_fees_in_queue + msg_max_fee,
1004+ )
1005+ . is_none ( )
9511006 {
1007+ error ! ( "User state of address {addr} was not found when trying to update user state. This user state should have been present" ) ;
9521008 send_message (
9531009 ws_conn_sink. clone ( ) ,
9541010 SubmitProofResponseMessage :: AddToBatchError ,
9551011 )
9561012 . await ;
9571013 return Ok ( ( ) ) ;
958- }
1014+ } ;
1015+
1016+ // Finally, we remove the mutex from the map
1017+ //
1018+ // Note: this removal is safe even if other processes are waiting on the lock
1019+ // This is because it is wrapped on an Arc so the variable will still live until all clones are dropped.
1020+ let mut user_mutexes = self . user_proof_processing_mutexes . lock ( ) . await ;
1021+ user_mutexes. remove ( & addr) ;
9591022
9601023 info ! ( "Verification data message handled" ) ;
9611024 Ok ( ( ) )
@@ -1023,8 +1086,11 @@ impl Batcher {
10231086 return ;
10241087 }
10251088
1026- // if all went well, verify the proof
1027- if let Err ( e) = self . verify_proof ( & nonced_verification_data) . await {
1089+ // If all went well, verify the proof
1090+ if let Err ( e) = self
1091+ . verify_proof ( & nonced_verification_data. verification_data )
1092+ . await
1093+ {
10281094 send_message (
10291095 ws_conn_sink. clone ( ) ,
10301096 SubmitProofResponseMessage :: InvalidProof ( e) ,
@@ -1201,11 +1267,16 @@ impl Batcher {
12011267 }
12021268
12031269 /// Given a new block number listened from the blockchain, checks if the current batch is ready to be posted.
1270+ ///
12041271 /// There are essentially two conditions to be checked:
12051272 /// * Has the current batch reached the minimum size to be posted?
12061273 /// * Has the received block number surpassed the maximum interval with respect to the last posted batch block?
12071274 ///
1208- /// Then the batch will be made as big as possible given this two conditions:
1275+ /// If both are met then:
1276+ /// * We acquire the building batch mutex to stop processing new proof messages
1277+ /// * We acquire all the users locks to wait until all current proof messages are processed
1278+ ///
1279+ /// Once we hold them, the biggest possible batch will be built, making sure that:
12091280 /// * The serialized batch size needs to be smaller than the maximum batch size
12101281 /// * The batch submission fee is less than the lowest `max fee` included the batch,
12111282 /// * And the batch submission fee is more than the highest `max fee` not included the batch.
@@ -1220,18 +1291,7 @@ impl Batcher {
12201291 block_number : u64 ,
12211292 gas_price : U256 ,
12221293 ) -> Option < Vec < BatchQueueEntry > > {
1223- info ! ( "Batch building: started, acquiring lock to stop processing new messages..." ) ;
1224- let _batch_building_mutex = self . batch_building_mutex . lock ( ) . await ;
1225-
1226- info ! ( "Batch building: waiting until all the ongoing messages finish" ) ;
1227- // acquire all the user locks to make sure all the ongoing message have been processed
1228- for user_mutex in self . user_mutexes . lock ( ) . await . values ( ) {
1229- let _ = user_mutex. lock ( ) . await ;
1230- }
1231- info ! ( "Batch building: all locks acquired, proceeding to build batch" ) ;
1232-
1233- let batch_state_lock = self . batch_state . lock ( ) . await ;
1234- let current_batch_len = batch_state_lock. batch_queue . len ( ) ;
1294+ let current_batch_len = self . batch_state . lock ( ) . await . batch_queue . len ( ) ;
12351295 if current_batch_len < 1 {
12361296 info ! (
12371297 "Current batch has {} proofs. Waiting for more proofs..." ,
@@ -1249,17 +1309,17 @@ impl Batcher {
12491309 return None ;
12501310 }
12511311
1252- // Check if a batch is currently being posted
1253- let mut batch_posting = self . posting_batch . lock ( ) . await ;
1254- if * batch_posting {
1255- info ! (
1256- "Batch is currently being posted. Waiting for the current batch to be finalized..."
1257- ) ;
1258- return None ;
1312+ info ! ( "Batch building: started, acquiring lock to stop processing new messages..." ) ;
1313+ let _building_batch_mutex = self . building_batch_mutex . lock ( ) . await ;
1314+
1315+ info ! ( "Batch building: waiting until all the ongoing messages finish" ) ;
1316+ // acquire all the user locks to make sure all the ongoing message have been processed
1317+ for user_mutex in self . user_proof_processing_mutexes . lock ( ) . await . values ( ) {
1318+ let _ = user_mutex . lock ( ) . await ;
12591319 }
1320+ info ! ( "Batch building: all user locks acquired, proceeding to build batch" ) ;
1321+ let batch_state_lock = self . batch_state . lock ( ) . await ;
12601322
1261- // Set the batch posting flag to true
1262- * batch_posting = true ;
12631323 let batch_queue_copy = batch_state_lock. batch_queue . clone ( ) ;
12641324 let finalized_batch = batch_queue:: try_build_batch (
12651325 batch_queue_copy,
@@ -1269,7 +1329,6 @@ impl Batcher {
12691329 self . constant_gas_cost ( ) ,
12701330 )
12711331 . inspect_err ( |e| {
1272- * batch_posting = false ;
12731332 match e {
12741333 // We can't post a batch since users are not willing to pay the needed fee, wait for more proofs
12751334 BatcherError :: BatchCostTooHigh => {
@@ -1523,10 +1582,6 @@ impl Batcher {
15231582 . finalize_batch ( finalized_batch, modified_gas_price)
15241583 . await ;
15251584
1526- // Resetting this here to avoid doing it on every return path of `finalize_batch` function
1527- let mut batch_posting = self . posting_batch . lock ( ) . await ;
1528- * batch_posting = false ;
1529-
15301585 batch_finalization_result?;
15311586 }
15321587
@@ -2045,14 +2100,14 @@ impl Batcher {
20452100 true
20462101 }
20472102
2103+ /// Takes [`VerificationData`] and spawns a blocking task to verify the proof
20482104 async fn verify_proof (
20492105 & self ,
2050- nonced_verification_data : & NoncedVerificationData ,
2106+ verification_data : & VerificationData ,
20512107 ) -> Result < ( ) , ProofInvalidReason > {
20522108 if !self . pre_verification_is_enabled {
20532109 return Ok ( ( ) ) ;
20542110 }
2055- let verification_data = & nonced_verification_data. verification_data ;
20562111 if self
20572112 . is_verifier_disabled ( verification_data. proving_system )
20582113 . await
0 commit comments