@@ -90,7 +90,12 @@ pub struct Batcher {
9090 /// to improve concurrency.
9191 batch_state : Mutex < BatchState > ,
9292 /// A mutex that signals an ongoing batch building process.
93- /// It remains locked until the batch has been fully built.
93+ /// It remains locked until the batch has been fully built and is ready to be submitted.
94+ ///
95+ /// When a new proof message arrives, before processing it
96+ /// we check that this mutex isn't locked and if it is we wait until unlocked
97+ ///
98+ /// This check covers the case where a new user submits a message while a batch is in construction
9499 /// Used to synchronize the processing of proofs during batch construction.
95100 building_batch_mutex : Mutex < ( ) > ,
96101 /// A map of per-user mutexes used to synchronize proof processing.
@@ -742,7 +747,7 @@ impl Batcher {
742747 // This is needed because we need to query the user state to make validations and
743748 // finally add the proof to the batch queue which must be done individually.
744749 // This allows us to process each user without having to lock the whole `batch_state`
745- debug ! ( "Trying to acquire user mutex for {:?}..." , addr_in_msg ) ;
750+ debug ! ( "Trying to acquire user mutex for {:?}..." , addr ) ;
746751 let user_mutex = {
747752 let mut map = self . user_proof_processing_mutexes . lock ( ) . await ;
748753 map. entry ( addr)
@@ -757,9 +762,9 @@ impl Batcher {
757762 // 4. If it isn't building then continue with the message
758763 //
759764 // This is done to give the batcher builder process priority
760- // and prevent a situation where we are the batcher wants to build a new batch
765+ // and prevent a situation where the batcher wants to build a new batch
761766 // but it has to wait for a ton of messages to be processed first
762- // Leading to a decrease batch throughput
767+ // Leading to a decrease in batch throughput
763768 let _user_mutex = loop {
764769 let _user_mutex = user_mutex. lock ( ) . await ;
765770 let res = self . building_batch_mutex . try_lock ( ) ;
@@ -778,9 +783,11 @@ impl Batcher {
778783 let batch_state_lock = self . batch_state . lock ( ) . await ;
779784 let last_max_fee = batch_state_lock. get_user_last_max_fee_limit ( & addr) . await ;
780785 let accumulated_fee = batch_state_lock. get_user_total_fees_in_queue ( & addr) . await ;
781- drop ( batch_state_lock) ;
786+ ( last_max_fee, accumulated_fee)
787+ } ;
782788
783- match ( last_max_fee, accumulated_fee) {
789+ let ( user_last_max_fee_limit, user_accumulated_fee) =
790+ match ( user_last_max_fee_limit, user_accumulated_fee) {
784791 ( Some ( last_max) , Some ( accumulated) ) => ( last_max, accumulated) ,
785792 _ => {
786793 send_message (
@@ -792,8 +799,7 @@ impl Batcher {
792799 self . metrics . user_error ( & [ "batcher_state_error" , "" ] ) ;
793800 return Ok ( ( ) ) ;
794801 }
795- }
796- } ;
802+ } ;
797803
798804 if !self . verify_user_has_enough_balance ( user_balance, user_accumulated_fee, msg_max_fee) {
799805 send_message (
@@ -844,6 +850,8 @@ impl Batcher {
844850 return Ok ( ( ) ) ;
845851 }
846852
853+ // We check this after replacement logic because if user wants to replace a proof, their
854+ // new_max_fee must be greater or equal than old_max_fee
847855 if msg_max_fee > user_last_max_fee_limit {
848856 warn ! ( "Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}" ) ;
849857 send_message (
@@ -958,61 +966,6 @@ impl Batcher {
958966 return Ok ( ( ) ) ;
959967 } ;
960968
961- let Some ( user_proof_count) = self
962- . batch_state
963- . lock ( )
964- . await
965- . get_user_proof_count ( & addr)
966- . await
967- else {
968- error ! ( "User state of address {addr} was not found when trying to update user state. This user state should have been present" ) ;
969- send_message (
970- ws_conn_sink. clone ( ) ,
971- SubmitProofResponseMessage :: AddToBatchError ,
972- )
973- . await ;
974- return Ok ( ( ) ) ;
975- } ;
976-
977- let Some ( current_total_fees_in_queue) = self
978- . batch_state
979- . lock ( )
980- . await
981- . get_user_total_fees_in_queue ( & addr)
982- . await
983- else {
984- error ! ( "User state of address {addr} was not found when trying to update user state. This user state should have been present" ) ;
985- send_message (
986- ws_conn_sink. clone ( ) ,
987- SubmitProofResponseMessage :: AddToBatchError ,
988- )
989- . await ;
990- return Ok ( ( ) ) ;
991- } ;
992-
993- // User state is updated
994- if self
995- . batch_state
996- . lock ( )
997- . await
998- . update_user_state (
999- & addr,
1000- msg_nonce + U256 :: one ( ) ,
1001- msg_max_fee,
1002- user_proof_count + 1 ,
1003- current_total_fees_in_queue + msg_max_fee,
1004- )
1005- . is_none ( )
1006- {
1007- error ! ( "User state of address {addr} was not found when trying to update user state. This user state should have been present" ) ;
1008- send_message (
1009- ws_conn_sink. clone ( ) ,
1010- SubmitProofResponseMessage :: AddToBatchError ,
1011- )
1012- . await ;
1013- return Ok ( ( ) ) ;
1014- } ;
1015-
1016969 // Finally, we remove the mutex from the map
1017970 //
1018971 // Note: this removal is safe even if other processes are waiting on the lock
@@ -1153,7 +1106,21 @@ impl Batcher {
11531106 // if they have the same nonce and sender, so we can remove the old entry
11541107 // by calling remove with the new entry
11551108 let mut batch_state_lock = self . batch_state . lock ( ) . await ;
1156- batch_state_lock. batch_queue . remove ( & replacement_entry) ;
1109+ if batch_state_lock
1110+ . batch_queue
1111+ . remove ( & replacement_entry)
1112+ . is_none ( )
1113+ {
1114+ std:: mem:: drop ( batch_state_lock) ;
1115+ warn ! ( "Replacement entry for {addr:?} was not present in batcher queue" ) ;
1116+ send_message (
1117+ ws_conn_sink. clone ( ) ,
1118+ SubmitProofResponseMessage :: AddToBatchError ,
1119+ )
1120+ . await ;
1121+ return ;
1122+ } ;
1123+
11571124 batch_state_lock. batch_queue . push (
11581125 replacement_entry. clone ( ) ,
11591126 BatchQueueEntryPriority :: new ( replacement_max_fee, nonce) ,
@@ -1229,7 +1196,7 @@ impl Batcher {
12291196 . await
12301197 }
12311198
1232- /// Adds verification data to the current batch queue.
1199+ /// Adds verification data to the current batch queue and updates the user state
12331200 async fn add_to_batch (
12341201 & self ,
12351202 verification_data : NoncedVerificationData ,
@@ -1263,6 +1230,46 @@ impl Batcher {
12631230
12641231 info ! ( "Current batch queue length: {}" , queue_len) ;
12651232
1233+ let Some ( user_proof_count) = batch_state_lock
1234+ . get_user_proof_count ( & proof_submitter_addr)
1235+ . await
1236+ else {
1237+ error ! ( "User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present" ) ;
1238+ std:: mem:: drop ( batch_state_lock) ;
1239+ return Err ( BatcherError :: AddressNotFoundInUserStates (
1240+ proof_submitter_addr,
1241+ ) ) ;
1242+ } ;
1243+
1244+ let Some ( current_total_fees_in_queue) = batch_state_lock
1245+ . get_user_total_fees_in_queue ( & proof_submitter_addr)
1246+ . await
1247+ else {
1248+ error ! ( "User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present" ) ;
1249+ std:: mem:: drop ( batch_state_lock) ;
1250+ return Err ( BatcherError :: AddressNotFoundInUserStates (
1251+ proof_submitter_addr,
1252+ ) ) ;
1253+ } ;
1254+
1255+ // User state is updated
1256+ if batch_state_lock
1257+ . update_user_state (
1258+ & proof_submitter_addr,
1259+ nonce + U256 :: one ( ) ,
1260+ max_fee,
1261+ user_proof_count + 1 ,
1262+ current_total_fees_in_queue + max_fee,
1263+ )
1264+ . is_none ( )
1265+ {
1266+ error ! ( "User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present" ) ;
1267+ std:: mem:: drop ( batch_state_lock) ;
1268+ return Err ( BatcherError :: AddressNotFoundInUserStates (
1269+ proof_submitter_addr,
1270+ ) ) ;
1271+ } ;
1272+
12661273 Ok ( ( ) )
12671274 }
12681275
@@ -1312,9 +1319,7 @@ impl Batcher {
13121319 info ! ( "Batch building: started, acquiring lock to stop processing new messages..." ) ;
13131320 let _building_batch_mutex = self . building_batch_mutex . lock ( ) . await ;
13141321
1315- info ! ( "Batch building: waiting until all the ongoing messages finish" ) ;
1316-
1317- // acquire all the user locks to make sure all the ongoing message have been processed
1322+ info ! ( "Batch building: waiting until all the user messages and proofs get processed" ) ;
13181323 let mutexes: Vec < Arc < Mutex < ( ) > > > = {
13191324 let user_proofs_lock = self . user_proof_processing_mutexes . lock ( ) . await ;
13201325 user_proofs_lock. values ( ) . cloned ( ) . collect ( )
@@ -1323,9 +1328,11 @@ impl Batcher {
13231328 let _ = user_mutex. lock ( ) . await ;
13241329 }
13251330 info ! ( "Batch building: all user locks acquired, proceeding to build batch" ) ;
1326- let batch_state_lock = self . batch_state . lock ( ) . await ;
13271331
1328- let batch_queue_copy = batch_state_lock. batch_queue . clone ( ) ;
1332+ let batch_queue_copy = {
1333+ let batch_state_lock = self . batch_state . lock ( ) . await ;
1334+ batch_state_lock. batch_queue . clone ( )
1335+ } ;
13291336 let finalized_batch = batch_queue:: try_build_batch (
13301337 batch_queue_copy,
13311338 gas_price,
0 commit comments