Skip to content

Commit 26328ec

Browse files
committed
Simplify batch queue creation algorithm
1 parent 9f0fd15 commit 26328ec

File tree

2 files changed

+83
-51
lines changed

2 files changed

+83
-51
lines changed

crates/batcher/src/lib.rs

Lines changed: 83 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -896,7 +896,6 @@ impl Batcher {
896896
return Ok(());
897897
}
898898

899-
900899
// When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
901900
if self.pre_verification_is_enabled {
902901
let verification_data = &nonced_verification_data.verification_data;
@@ -936,11 +935,10 @@ impl Batcher {
936935
return Ok(());
937936
}
938937
}
939-
938+
940939
// * ---------------------------------------------------------------------*
941940
// * Perform validation over batcher queue *
942941
// * ---------------------------------------------------------------------*
943-
944942

945943
let mut batch_state_lock = self.batch_state.lock().await;
946944
if batch_state_lock.is_queue_full() {
@@ -970,13 +968,19 @@ impl Batcher {
970968
let entries_to_check: Vec<_> = batch_state_lock
971969
.batch_queue
972970
.iter()
973-
.filter(|(entry, _)| entry.sender == candidate_addr && new_proof_fee > entry.nonced_verification_data.max_fee)
971+
.filter(|(entry, _)| {
972+
entry.sender == candidate_addr
973+
&& new_proof_fee > entry.nonced_verification_data.max_fee
974+
})
974975
.map(|(entry, _)| entry.clone())
975976
.collect();
976-
977+
977978
if let Some(target_entry) = entries_to_check.into_iter().next() {
978-
let removed_entry = batch_state_lock.batch_queue.remove(&target_entry).map(|(e, _)| e);
979-
979+
let removed_entry = batch_state_lock
980+
.batch_queue
981+
.remove(&target_entry)
982+
.map(|(e, _)| e);
983+
980984
if let Some(removed) = removed_entry {
981985
info!(
982986
"Incoming proof (nonce: {}, fee: {}) replacing proof from sender {} with nonce {} (fee: {})",
@@ -988,8 +992,12 @@ impl Batcher {
988992
);
989993

990994
// Update the evicted user's state immediately
991-
self.update_evicted_user_state_with_lock(&removed, &batch_state_lock.batch_queue, &mut user_guard);
992-
995+
self.update_evicted_user_state_with_lock(
996+
&removed,
997+
&batch_state_lock.batch_queue,
998+
&mut user_guard,
999+
);
1000+
9931001
// Notify the evicted user
9941002
if let Some(ref removed_entry_ws) = removed.messaging_sink {
9951003
send_message(
@@ -1089,7 +1097,7 @@ impl Batcher {
10891097
) {
10901098
let replacement_max_fee = nonced_verification_data.max_fee;
10911099
let nonce = nonced_verification_data.nonce;
1092-
1100+
10931101
// Take user state lock first to maintain proper lock ordering
10941102
let user_state = match self.user_states.get(&addr) {
10951103
Some(user_state) => user_state,
@@ -1192,7 +1200,7 @@ impl Batcher {
11921200
// update max_fee_limit and total_fees_in_queue using already held user_state_guard
11931201
let updated_max_fee_limit_in_batch = batch_state_guard.get_user_min_fee_in_batch(&addr);
11941202
user_state_guard.last_max_fee_limit = updated_max_fee_limit_in_batch;
1195-
1203+
11961204
let fee_difference = replacement_max_fee - original_max_fee;
11971205
user_state_guard.total_fees_in_queue += fee_difference;
11981206
}
@@ -1325,7 +1333,7 @@ impl Batcher {
13251333

13261334
// PHASE 1: Extract the batch directly from the queue to avoid race conditions
13271335
let mut batch_state_lock = batch_state_lock; // Make mutable
1328-
1336+
13291337
let finalized_batch = batch_queue::extract_batch_directly(
13301338
&mut batch_state_lock.batch_queue,
13311339
gas_price,
@@ -1355,7 +1363,6 @@ impl Batcher {
13551363
Some(finalized_batch)
13561364
}
13571365

1358-
13591366
/// Updates user states based on current queue state after batch operations.
13601367
/// Used for both successful batch confirmation and failed batch restoration.
13611368
/// Updates proofs_in_batch, total_fees_in_queue, and last_max_fee_limit based on current queue state.
@@ -1369,11 +1376,14 @@ impl Batcher {
13691376
if let Some(user_state) = self.user_states.get(&addr) {
13701377
let mut user_state_guard = user_state.lock().await; // First: user lock
13711378
let batch_state_lock = self.batch_state.lock().await; // Second: batch lock
1372-
1379+
13731380
// Calculate what each user's state should be based on current queue contents
1374-
let current_queue_user_states = self.calculate_new_user_states_data(&batch_state_lock.batch_queue);
1375-
1376-
if let Some((proof_count, min_max_fee_in_queue, total_fees_in_queue)) = current_queue_user_states.get(&addr) {
1381+
let current_queue_user_states =
1382+
self.calculate_new_user_states_data(&batch_state_lock.batch_queue);
1383+
1384+
if let Some((proof_count, min_max_fee_in_queue, total_fees_in_queue)) =
1385+
current_queue_user_states.get(&addr)
1386+
{
13771387
// User has proofs in queue - use calculated values
13781388
user_state_guard.proofs_in_batch = *proof_count;
13791389
user_state_guard.total_fees_in_queue = *total_fees_in_queue;
@@ -1384,7 +1394,7 @@ impl Batcher {
13841394
user_state_guard.total_fees_in_queue = U256::zero();
13851395
user_state_guard.last_max_fee_limit = U256::MAX;
13861396
}
1387-
1397+
13881398
drop(batch_state_lock); // Release batch lock
13891399
drop(user_state_guard); // Release user lock
13901400
} else {
@@ -1400,12 +1410,11 @@ impl Batcher {
14001410
/// but now have no proofs left in the queue.
14011411
fn cleanup_user_states_after_successful_submission(&self, finalized_batch: &[BatchQueueEntry]) {
14021412
use std::collections::HashSet;
1403-
1413+
14041414
// Get unique users from the submitted batch
1405-
let users_in_batch: HashSet<Address> = finalized_batch.iter()
1406-
.map(|entry| entry.sender)
1407-
.collect();
1408-
1415+
let users_in_batch: HashSet<Address> =
1416+
finalized_batch.iter().map(|entry| entry.sender).collect();
1417+
14091418
// Check current queue state to see which users still have proofs
14101419
let batch_state_lock = match self.batch_state.try_lock() {
14111420
Ok(lock) => lock,
@@ -1415,9 +1424,10 @@ impl Batcher {
14151424
return;
14161425
}
14171426
};
1418-
1419-
let current_user_states = self.calculate_new_user_states_data(&batch_state_lock.batch_queue);
1420-
1427+
1428+
let current_user_states =
1429+
self.calculate_new_user_states_data(&batch_state_lock.batch_queue);
1430+
14211431
// For each user in the batch, check if they now have no proofs left
14221432
for user_addr in users_in_batch {
14231433
if !current_user_states.contains_key(&user_addr) {
@@ -1439,8 +1449,11 @@ impl Batcher {
14391449
/// - For same fees, higher nonces get evicted first
14401450
/// This ensures we never have nonce N+1 without nonce N in the queue.
14411451
async fn restore_proofs_after_batch_failure(&self, failed_batch: &[BatchQueueEntry]) {
1442-
info!("Restoring {} proofs to queue after batch failure", failed_batch.len());
1443-
1452+
info!(
1453+
"Restoring {} proofs to queue after batch failure",
1454+
failed_batch.len()
1455+
);
1456+
14441457
let mut batch_state_lock = self.batch_state.lock().await;
14451458
let mut restored_entries = Vec::new();
14461459

@@ -1462,10 +1475,14 @@ impl Batcher {
14621475
if let Some((evicted_entry, _)) = batch_state_lock.batch_queue.pop() {
14631476
warn!("Queue full during restoration, evicting proof from sender {} with nonce {} (fee: {})",
14641477
evicted_entry.sender, evicted_entry.nonced_verification_data.nonce, evicted_entry.nonced_verification_data.max_fee);
1465-
1478+
14661479
// Update user state for evicted entry
1467-
self.update_evicted_user_state_async(&evicted_entry, &batch_state_lock.batch_queue).await;
1468-
1480+
self.update_evicted_user_state_async(
1481+
&evicted_entry,
1482+
&batch_state_lock.batch_queue,
1483+
)
1484+
.await;
1485+
14691486
// Notify the evicted user via websocket
14701487
if let Some(evicted_ws_sink) = evicted_entry.messaging_sink {
14711488
connection::send_message(
@@ -1488,23 +1505,31 @@ impl Batcher {
14881505
restored_entries.push(entry);
14891506
}
14901507

1491-
info!("Restored {} proofs to queue, new queue length: {}", restored_entries.len(), batch_state_lock.batch_queue.len());
1492-
1508+
info!(
1509+
"Restored {} proofs to queue, new queue length: {}",
1510+
restored_entries.len(),
1511+
batch_state_lock.batch_queue.len()
1512+
);
1513+
14931514
// Get unique users from restored entries
1494-
let users_with_restored_proofs: std::collections::HashSet<Address> = restored_entries.iter()
1495-
.map(|entry| entry.sender)
1496-
.collect();
1497-
1515+
let users_with_restored_proofs: std::collections::HashSet<Address> =
1516+
restored_entries.iter().map(|entry| entry.sender).collect();
1517+
14981518
drop(batch_state_lock); // Release batch lock before user state updates
1499-
1519+
15001520
// Update user states for successfully restored proofs
15011521
info!("Updating user states after proof restoration...");
1502-
if let Err(e) = self.update_user_states_from_queue_state(users_with_restored_proofs).await {
1503-
error!("Failed to update user states after proof restoration: {:?}", e);
1522+
if let Err(e) = self
1523+
.update_user_states_from_queue_state(users_with_restored_proofs)
1524+
.await
1525+
{
1526+
error!(
1527+
"Failed to update user states after proof restoration: {:?}",
1528+
e
1529+
);
15041530
}
15051531
}
15061532

1507-
15081533
/// Takes the finalized batch as input and:
15091534
/// builds the merkle tree
15101535
/// posts verification data batch to s3
@@ -1610,14 +1635,19 @@ impl Batcher {
16101635
// Note: Proofs were already removed from the queue during extraction phase
16111636
// Now update user states based on current queue state after successful submission
16121637
info!("Updating user states after batch confirmation...");
1613-
let users_in_batch: std::collections::HashSet<Address> = finalized_batch.iter()
1614-
.map(|entry| entry.sender)
1615-
.collect();
1616-
if let Err(e) = self.update_user_states_from_queue_state(users_in_batch).await {
1617-
error!("Failed to update user states after batch confirmation: {:?}", e);
1638+
let users_in_batch: std::collections::HashSet<Address> =
1639+
finalized_batch.iter().map(|entry| entry.sender).collect();
1640+
if let Err(e) = self
1641+
.update_user_states_from_queue_state(users_in_batch)
1642+
.await
1643+
{
1644+
error!(
1645+
"Failed to update user states after batch confirmation: {:?}",
1646+
e
1647+
);
16181648
// Continue with the rest of the process since batch was already submitted successfully
16191649
}
1620-
1650+
16211651
// Clean up user states for users who had proofs in this batch but now have no proofs left
16221652
self.cleanup_user_states_after_successful_submission(finalized_batch);
16231653

@@ -1694,7 +1724,6 @@ impl Batcher {
16941724
.extract_batch_if_ready(block_number, modified_gas_price)
16951725
.await
16961726
{
1697-
16981727
let batch_finalization_result = self
16991728
.finalize_batch(block_number, &finalized_batch, modified_gas_price)
17001729
.await;
@@ -1705,8 +1734,12 @@ impl Batcher {
17051734

17061735
// If batch finalization failed, restore the proofs to the queue
17071736
if let Err(e) = batch_finalization_result {
1708-
error!("Batch finalization failed, restoring proofs to queue: {:?}", e);
1709-
self.restore_proofs_after_batch_failure(&finalized_batch).await;
1737+
error!(
1738+
"Batch finalization failed, restoring proofs to queue: {:?}",
1739+
e
1740+
);
1741+
self.restore_proofs_after_batch_failure(&finalized_batch)
1742+
.await;
17101743
return Err(e);
17111744
}
17121745
}

crates/batcher/src/types/batch_queue.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ pub(crate) fn calculate_batch_size(batch_queue: &BatchQueue) -> Result<usize, Ba
146146
}
147147
}
148148

149-
150149
/// Directly extracts a batch from the given queue, modifying the queue in place.
151150
/// This avoids the inefficiency of cloning the queue and then removing entries individually.
152151
/// Uses the same logic as try_build_batch but works directly on the original queue.

0 commit comments

Comments
 (0)