@@ -95,16 +95,16 @@ pub struct Batcher {
9595 non_paying_config : Option < NonPayingConfig > ,
9696 aggregator_fee_percentage_multiplier : u128 ,
9797 aggregator_gas_cost : u128 ,
98-
98+
9999 // Shared state (Mutex)
100100 /// The general business rule is:
101101 /// - User processing can be done in parallel unless a batch creation is happening
102102 /// - Batch creation needs to be able to change all the states, so all processing
103103 /// needs to be stopped, and all user_states locks need to be taken
104104 batch_state : Mutex < BatchState > ,
105105 user_states : DashMap < Address , Arc < Mutex < UserState > > > ,
106- /// When posting a task, this is taken as a write to stop new threads to update
107- /// user_states, ideally we would want a bigger mutex on the whole user_states, but this can't be done
106+ /// When posting a task, this is taken as a write to stop new threads to update
107+ /// user_states, ideally we would want a bigger mutex on the whole user_states, but this can't be done
108108 batch_processing_lock : RwLock < ( ) > ,
109109
110110 last_uploaded_batch_block : Mutex < u64 > ,
@@ -114,10 +114,8 @@ pub struct Batcher {
114114 /// the batch creation task
115115 posting_batch : Mutex < bool > ,
116116
117-
118117 disabled_verifiers : Mutex < U256 > ,
119-
120-
118+
121119 // Observability and monitoring
122120 pub metrics : metrics:: BatcherMetrics ,
123121 pub telemetry : TelemetrySender ,
@@ -303,7 +301,6 @@ impl Batcher {
303301 }
304302 }
305303
306-
307304 async fn update_evicted_user_state (
308305 & self ,
309306 removed_entry : & types:: batch_queue:: BatchQueueEntry ,
@@ -690,7 +687,7 @@ impl Batcher {
690687 ) -> Result < ( ) , Error > {
691688 // Acquire read lock to allow concurrent user processing but block during batch creation
692689 let _batch_processing_guard = self . batch_processing_lock . read ( ) . await ;
693-
690+
694691 let msg_nonce = client_msg. verification_data . nonce ;
695692 debug ! ( "Received message with nonce: {msg_nonce:?}" ) ;
696693 self . metrics . received_proofs . inc ( ) ;
@@ -1122,8 +1119,7 @@ impl Batcher {
11221119 ) ;
11231120
11241121 // update max_fee_limit
1125- let updated_max_fee_limit_in_batch =
1126- batch_state_lock. get_user_min_fee_in_batch ( & addr) ;
1122+ let updated_max_fee_limit_in_batch = batch_state_lock. get_user_min_fee_in_batch ( & addr) ;
11271123 {
11281124 let user_state = self . user_states . get ( & addr) ;
11291125 match user_state {
@@ -1254,8 +1250,8 @@ impl Batcher {
12541250 /// an empty batch, even if the block interval has been reached.
12551251 /// Once the batch meets the conditions for submission, the finalized batch is then passed to the
12561252 /// `finalize_batch` function.
1257- /// This function doesn't remove the proofs from the queue.
1258- async fn is_batch_ready (
1253+ /// This function removes the proofs from the queue immediately to avoid race conditions .
1254+ async fn extract_batch_if_ready (
12591255 & self ,
12601256 block_number : u64 ,
12611257 gas_price : U256 ,
@@ -1291,9 +1287,12 @@ impl Batcher {
12911287
12921288 // Set the batch posting flag to true
12931289 * batch_posting = true ;
1294- let batch_queue_copy = batch_state_lock. batch_queue . clone ( ) ;
1295- let finalized_batch = batch_queue:: try_build_batch (
1296- batch_queue_copy,
1290+
1291+ // PHASE 1: Extract the batch directly from the queue to avoid race conditions
1292+ let mut batch_state_lock = batch_state_lock; // Make mutable
1293+
1294+ let finalized_batch = batch_queue:: extract_batch_directly (
1295+ & mut batch_state_lock. batch_queue ,
12971296 gas_price,
12981297 self . max_batch_byte_size ,
12991298 self . max_batch_proof_qty ,
@@ -1313,26 +1312,31 @@ impl Batcher {
13131312 } )
13141313 . ok ( ) ?;
13151314
1315+ info ! (
1316+ "Extracted {} proofs from queue for batch processing" ,
1317+ finalized_batch. len( )
1318+ ) ;
1319+
1320+ // PHASE 1.5: Update user states immediately after batch extraction to make the operation atomic
1321+ // We assume the batch posting will be successful, so we update user states now
1322+ if let Err ( e) = self . update_user_states_after_batch_extraction ( & batch_state_lock) . await {
1323+ error ! ( "Failed to update user states after batch extraction: {:?}" , e) ;
1324+ // We could potentially put the batch back in the queue here if needed
1325+ * batch_posting = false ;
1326+ return None ;
1327+ }
1328+
13161329 Some ( finalized_batch)
13171330 }
13181331
1319- /// Takes the submitted proofs and removes them from the queue.
1320- /// This function should be called only AFTER the submission was confirmed onchain
1321- async fn remove_proofs_from_queue (
1322- & self ,
1323- finalized_batch : Vec < BatchQueueEntry > ,
1324- ) -> Result < ( ) , BatcherError > {
1325- info ! ( "Removing proofs from queue..." ) ;
1326- let mut batch_state_lock = self . batch_state . lock ( ) . await ;
1327-
1328- finalized_batch. iter ( ) . for_each ( |entry| {
1329- if batch_state_lock. batch_queue . remove ( entry) . is_none ( ) {
1330- // If this happens, we have a bug in our code
1331- error ! ( "Some proofs were not found in the queue. This should not happen." ) ;
1332- }
1333- } ) ;
1332+ /// Updates user states after successful batch submission.
1333+ /// This function should be called only AFTER the submission was confirmed onchain.
1334+ /// Note: Proofs were already removed from the queue during the extraction phase.
1335+ async fn update_user_states_after_batch_submission ( & self ) -> Result < ( ) , BatcherError > {
1336+ info ! ( "Updating user states after batch submission..." ) ;
1337+ let batch_state_lock = self . batch_state . lock ( ) . await ;
13341338
1335- // now we calculate the new user_states
1339+ // Calculate the new user_states based on the current queue (proofs already removed)
13361340 let new_user_states = self . calculate_new_user_states_data ( & batch_state_lock. batch_queue ) ;
13371341
13381342 let user_addresses: Vec < Address > =
@@ -1376,6 +1380,105 @@ impl Batcher {
13761380 Ok ( ( ) )
13771381 }
13781382
1383+ /// Updates user states immediately after batch extraction to make the operation atomic.
1384+ /// This function should be called right after extracting proofs from the queue.
1385+ /// We assume the batch posting will be successful and update user states optimistically.
1386+ /// IMPORTANT: Preserves last_max_fee_limit when users have no proofs left in queue.
1387+ async fn update_user_states_after_batch_extraction (
1388+ & self ,
1389+ batch_state_lock : & tokio:: sync:: MutexGuard < ' _ , crate :: types:: batch_state:: BatchState > ,
1390+ ) -> Result < ( ) , BatcherError > {
1391+ info ! ( "Updating user states after batch extraction..." ) ;
1392+
1393+ // Calculate the new user_states based on the current queue (proofs already removed)
1394+ let new_user_states = self . calculate_new_user_states_data ( & batch_state_lock. batch_queue ) ;
1395+
1396+ let user_addresses: Vec < Address > =
1397+ self . user_states . iter ( ) . map ( |entry| * entry. key ( ) ) . collect ( ) ;
1398+
1399+ for addr in user_addresses. iter ( ) {
1400+ // FIXME: The case where a the update functions return `None` can only happen when the user was not found
1401+ // in the `user_states` map should not really happen here, but doing this check so that we don't unwrap.
1402+ // Once https://github.com/yetanotherco/aligned_layer/issues/1046 is done we could return a more
1403+ // informative error.
1404+
1405+ // Now we update the user states related to the batch (proof count in batch and min fee in batch)
1406+ {
1407+ let user_state = self . user_states . get ( addr) ;
1408+ match user_state {
1409+ Some ( user_state) => {
1410+ let mut user_state_guard = user_state. lock ( ) . await ;
1411+
1412+ if let Some ( ( proof_count, max_fee_limit, total_fees_in_queue) ) = new_user_states. get ( addr) {
1413+ // User still has proofs in the queue
1414+ user_state_guard. proofs_in_batch = * proof_count;
1415+ user_state_guard. last_max_fee_limit = * max_fee_limit;
1416+ user_state_guard. total_fees_in_queue = * total_fees_in_queue;
1417+ } else {
1418+ // User has no more proofs in the queue - only update count and total fees
1419+ // but preserve the last_max_fee_limit to avoid setting it to U256::MAX
1420+ // This is important for rollback scenarios where we need to restore proofs
1421+ user_state_guard. proofs_in_batch = 0 ;
1422+ user_state_guard. total_fees_in_queue = U256 :: zero ( ) ;
1423+ // Keep user_state_guard.last_max_fee_limit unchanged
1424+ }
1425+ }
1426+ None => {
1427+ return Err ( BatcherError :: QueueRemoveError (
1428+ "Could not update user state" . into ( ) ,
1429+ ) ) ;
1430+ }
1431+ }
1432+ }
1433+ }
1434+
1435+ // Update metrics
1436+ let queue_len = batch_state_lock. batch_queue . len ( ) ;
1437+ let queue_size_bytes = calculate_batch_size ( & batch_state_lock. batch_queue ) ?;
1438+
1439+ self . metrics
1440+ . update_queue_metrics ( queue_len as i64 , queue_size_bytes as i64 ) ;
1441+
1442+ Ok ( ( ) )
1443+ }
1444+
1445+ /// Cleans up user states after successful batch submission.
1446+ /// Resets last_max_fee_limit to U256::MAX for users who had proofs in the submitted batch
1447+ /// but now have no proofs left in the queue.
1448+ fn cleanup_user_states_after_successful_submission ( & self , finalized_batch : & [ BatchQueueEntry ] ) {
1449+ use std:: collections:: HashSet ;
1450+
1451+ // Get unique users from the submitted batch
1452+ let users_in_batch: HashSet < Address > = finalized_batch. iter ( )
1453+ . map ( |entry| entry. sender )
1454+ . collect ( ) ;
1455+
1456+ // Check current queue state to see which users still have proofs
1457+ let batch_state_lock = match self . batch_state . try_lock ( ) {
1458+ Ok ( lock) => lock,
1459+ Err ( _) => {
1460+ // If we can't get the lock, skip cleanup - it's not critical
1461+ warn ! ( "Could not acquire batch state lock for user state cleanup" ) ;
1462+ return ;
1463+ }
1464+ } ;
1465+
1466+ let current_user_states = self . calculate_new_user_states_data ( & batch_state_lock. batch_queue ) ;
1467+
1468+ // For each user in the batch, check if they now have no proofs left
1469+ for user_addr in users_in_batch {
1470+ if !current_user_states. contains_key ( & user_addr) {
1471+ // User has no proofs left in queue - reset their max_fee_limit
1472+ if let Some ( user_state_ref) = self . user_states . get ( & user_addr) {
1473+ if let Ok ( mut user_state_guard) = user_state_ref. try_lock ( ) {
1474+ user_state_guard. last_max_fee_limit = U256 :: max_value ( ) ;
1475+ }
1476+ // If we can't get the lock, skip this user - not critical
1477+ }
1478+ }
1479+ }
1480+ }
1481+
13791482 /// Takes the finalized batch as input and:
13801483 /// builds the merkle tree
13811484 /// posts verification data batch to s3
@@ -1391,7 +1494,7 @@ impl Batcher {
13911494 ) -> Result < ( ) , BatcherError > {
13921495 // Acquire write lock to ensure exclusive access during batch creation (blocks all user processing)
13931496 let _batch_processing_guard = self . batch_processing_lock . write ( ) . await ;
1394-
1497+
13951498 let nonced_batch_verifcation_data: Vec < NoncedVerificationData > = finalized_batch
13961499 . clone ( )
13971500 . into_iter ( )
@@ -1468,8 +1571,8 @@ impl Batcher {
14681571 BatcherError :: TransactionSendError (
14691572 TransactionSendError :: SubmissionInsufficientBalance ,
14701573 ) => {
1471- // TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch
1472- // this would also need a message sent to the clients
1574+ // TODO: In the future, we should re-add the failed batch back to the queue
1575+ // For now, we flush everything as a safety measure
14731576 self . flush_queue_and_clear_nonce_cache ( ) . await ;
14741577 }
14751578 _ => {
@@ -1480,11 +1583,11 @@ impl Batcher {
14801583 return Err ( e) ;
14811584 } ;
14821585
1483- // Once the submit is succesfull, we remove the submitted proofs from the queue
1484- // TODO handle error case:
1485- if let Err ( e ) = self . remove_proofs_from_queue ( finalized_batch . clone ( ) ) . await {
1486- error ! ( "Unexpected error while updating queue: {:?}" , e ) ;
1487- }
1586+ // Note: Proofs were already removed from the queue during extraction phase
1587+ // User states were also already updated atomically during extraction
1588+
1589+ // Clean up user states for users who had proofs in this batch but now have no proofs left
1590+ self . cleanup_user_states_after_successful_submission ( & finalized_batch ) ;
14881591
14891592 connection:: send_batch_inclusion_data_responses ( finalized_batch, & batch_merkle_tree) . await
14901593 }
@@ -1554,8 +1657,11 @@ impl Batcher {
15541657 let modified_gas_price = gas_price * U256 :: from ( GAS_PRICE_PERCENTAGE_MULTIPLIER )
15551658 / U256 :: from ( PERCENTAGE_DIVIDER ) ;
15561659
1557- if let Some ( finalized_batch) = self . is_batch_ready ( block_number, modified_gas_price) . await {
1558- // TODO (Mauro): There is a race condition here,
1660+ // TODO (Mauro): Take all the user locks here
1661+ if let Some ( finalized_batch) = self
1662+ . extract_batch_if_ready ( block_number, modified_gas_price)
1663+ . await
1664+ {
15591665
15601666 let batch_finalization_result = self
15611667 . finalize_batch ( block_number, finalized_batch, modified_gas_price)
0 commit comments