Skip to content

Commit e60ad68

Browse files
committed
Restore proofs
1 parent 1c4a59e commit e60ad68

File tree

1 file changed

+113
-9
lines changed

1 file changed

+113
-9
lines changed

crates/batcher/src/lib.rs

Lines changed: 113 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,6 +1479,107 @@ impl Batcher {
14791479
}
14801480
}
14811481

1482+
/// Restores proofs to the queue after batch submission failure.
1483+
/// Uses similar logic to user proof submission, including handling queue capacity.
1484+
/// NOTE: Nonce ordering is preserved by the priority queue's eviction order:
1485+
/// - Lower fees get evicted first
1486+
/// - For same fees, higher nonces get evicted first
1487+
/// This ensures we never have nonce N+1 without nonce N in the queue.
1488+
async fn restore_proofs_after_batch_failure(&self, failed_batch: &[BatchQueueEntry]) {
1489+
info!("Restoring {} proofs to queue after batch failure", failed_batch.len());
1490+
1491+
let mut batch_state_lock = self.batch_state.lock().await;
1492+
let mut restored_entries = Vec::new();
1493+
1494+
for entry in failed_batch {
1495+
let priority = BatchQueueEntryPriority::new(
1496+
entry.nonced_verification_data.max_fee,
1497+
entry.nonced_verification_data.nonce,
1498+
);
1499+
1500+
// Check if queue is full
1501+
if batch_state_lock.is_queue_full() {
1502+
// Use same logic as user submission - evict lowest priority if this one is higher
1503+
if let Some((lowest_entry, _)) = batch_state_lock.batch_queue.peek() {
1504+
let lowest_fee = lowest_entry.nonced_verification_data.max_fee;
1505+
let restore_fee = entry.nonced_verification_data.max_fee;
1506+
1507+
if restore_fee > lowest_fee {
1508+
// Evict the lowest priority entry (preserves nonce ordering)
1509+
if let Some((evicted_entry, _)) = batch_state_lock.batch_queue.pop() {
1510+
warn!("Queue full during restoration, evicting proof from sender {} with nonce {} (fee: {})",
1511+
evicted_entry.sender, evicted_entry.nonced_verification_data.nonce, evicted_entry.nonced_verification_data.max_fee);
1512+
1513+
// Update user state for evicted entry
1514+
self.update_evicted_user_state(&evicted_entry, &batch_state_lock.batch_queue).await;
1515+
1516+
// Notify the evicted user via websocket
1517+
if let Some(evicted_ws_sink) = evicted_entry.messaging_sink {
1518+
connection::send_message(
1519+
evicted_ws_sink,
1520+
aligned_sdk::common::types::SubmitProofResponseMessage::UnderpricedProof,
1521+
)
1522+
.await;
1523+
}
1524+
}
1525+
} else {
1526+
warn!("Queue full and restored proof has lower priority, dropping proof from sender {} with nonce {} (fee: {})",
1527+
entry.sender, entry.nonced_verification_data.nonce, entry.nonced_verification_data.max_fee);
1528+
continue;
1529+
}
1530+
}
1531+
}
1532+
1533+
// Add the proof back to the queue
1534+
batch_state_lock.batch_queue.push(entry.clone(), priority);
1535+
restored_entries.push(entry);
1536+
}
1537+
1538+
info!("Restored {} proofs to queue, new queue length: {}", restored_entries.len(), batch_state_lock.batch_queue.len());
1539+
1540+
// Update user states for successfully restored proofs
1541+
self.update_user_states_for_restored_proofs(&restored_entries, &batch_state_lock).await;
1542+
}
1543+
1544+
/// Updates user states for proofs that were successfully restored to the queue.
1545+
/// This essentially undoes the optimistic user state updates that were made during extraction.
1546+
async fn update_user_states_for_restored_proofs(
1547+
&self,
1548+
restored_entries: &[&BatchQueueEntry],
1549+
batch_state_lock: &tokio::sync::MutexGuard<'_, crate::types::batch_state::BatchState>,
1550+
) {
1551+
use std::collections::HashMap;
1552+
1553+
// Group restored entries by user address
1554+
let mut users_restored_proofs: HashMap<Address, Vec<&BatchQueueEntry>> = HashMap::new();
1555+
for entry in restored_entries {
1556+
users_restored_proofs.entry(entry.sender).or_default().push(entry);
1557+
}
1558+
1559+
// Calculate new user states based on current queue (including restored proofs)
1560+
let new_user_states = self.calculate_new_user_states_data(&batch_state_lock.batch_queue);
1561+
1562+
// Update user states for each user who had proofs restored
1563+
for (user_addr, _user_entries) in users_restored_proofs {
1564+
if let Some(user_state_ref) = self.user_states.get(&user_addr) {
1565+
let mut user_state_guard = user_state_ref.lock().await;
1566+
1567+
// Update based on current queue state (which includes restored proofs)
1568+
if let Some((proof_count, max_fee_limit, total_fees_in_queue)) = new_user_states.get(&user_addr) {
1569+
user_state_guard.proofs_in_batch = *proof_count;
1570+
user_state_guard.last_max_fee_limit = *max_fee_limit;
1571+
user_state_guard.total_fees_in_queue = *total_fees_in_queue;
1572+
1573+
info!("Restored user state for {}: {} proofs, total fees: {}",
1574+
user_addr, proof_count, total_fees_in_queue);
1575+
} else {
1576+
// This shouldn't happen since we just added proofs for this user
1577+
warn!("User {} had proofs restored but not found in queue calculation", user_addr);
1578+
}
1579+
}
1580+
}
1581+
}
1582+
14821583
/// Takes the finalized batch as input and:
14831584
/// builds the merkle tree
14841585
/// posts verification data batch to s3
@@ -1489,16 +1590,15 @@ impl Batcher {
14891590
async fn finalize_batch(
14901591
&self,
14911592
block_number: u64,
1492-
finalized_batch: Vec<BatchQueueEntry>,
1593+
finalized_batch: &[BatchQueueEntry],
14931594
gas_price: U256,
14941595
) -> Result<(), BatcherError> {
14951596
// Acquire write lock to ensure exclusive access during batch creation (blocks all user processing)
14961597
let _batch_processing_guard = self.batch_processing_lock.write().await;
14971598

14981599
let nonced_batch_verifcation_data: Vec<NoncedVerificationData> = finalized_batch
1499-
.clone()
1500-
.into_iter()
1501-
.map(|entry| entry.nonced_verification_data)
1600+
.iter()
1601+
.map(|entry| entry.nonced_verification_data.clone())
15021602
.collect();
15031603

15041604
let batch_verification_data: Vec<VerificationData> = nonced_batch_verifcation_data
@@ -1511,9 +1611,8 @@ impl Batcher {
15111611

15121612
info!("Finalizing batch. Length: {}", finalized_batch.len());
15131613
let batch_data_comm: Vec<VerificationDataCommitment> = finalized_batch
1514-
.clone()
1515-
.into_iter()
1516-
.map(|entry| entry.verification_data_commitment)
1614+
.iter()
1615+
.map(|entry| entry.verification_data_commitment.clone())
15171616
.collect();
15181617

15191618
let batch_merkle_tree: MerkleTree<VerificationCommitmentBatch> =
@@ -1664,14 +1763,19 @@ impl Batcher {
16641763
{
16651764

16661765
let batch_finalization_result = self
1667-
.finalize_batch(block_number, finalized_batch, modified_gas_price)
1766+
.finalize_batch(block_number, &finalized_batch, modified_gas_price)
16681767
.await;
16691768

16701769
// Resetting this here to avoid doing it on every return path of `finalize_batch` function
16711770
let mut batch_posting = self.posting_batch.lock().await;
16721771
*batch_posting = false;
16731772

1674-
batch_finalization_result?;
1773+
// If batch finalization failed, restore the proofs to the queue
1774+
if let Err(e) = batch_finalization_result {
1775+
error!("Batch finalization failed, restoring proofs to queue: {:?}", e);
1776+
self.restore_proofs_after_batch_failure(&finalized_batch).await;
1777+
return Err(e);
1778+
}
16751779
}
16761780

16771781
Ok(())

0 commit comments

Comments
 (0)