diff --git a/crates/Cargo.lock b/crates/Cargo.lock index 744392b748..1d03e6005a 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -103,7 +103,6 @@ dependencies = [ "bytes", "ciborium", "clap", - "dashmap", "dotenvy", "env_logger", "ethers", @@ -139,6 +138,7 @@ dependencies = [ "log", "reqwest 0.12.15", "serde", + "serde_bytes", "serde_json", "serde_repr", "sha3", @@ -2040,20 +2040,6 @@ dependencies = [ "syn 2.0.100", ] -[[package]] -name = "dashmap" -version = "6.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" -dependencies = [ - "cfg-if", - "crossbeam-utils", - "hashbrown 0.14.5", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "dashu" version = "0.4.2" @@ -6712,6 +6698,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8437fd221bde2d4ca316d61b90e337e9e702b3820b87d63caa9ba6c02bd06d96" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.219" diff --git a/crates/batcher/src/connection.rs b/crates/batcher/src/connection.rs index b32e511c52..946922db30 100644 --- a/crates/batcher/src/connection.rs +++ b/crates/batcher/src/connection.rs @@ -18,7 +18,7 @@ use tokio_tungstenite::{ pub(crate) type WsMessageSink = Arc, Message>>>; pub(crate) async fn send_batch_inclusion_data_responses( - finalized_batch: &[BatchQueueEntry], + finalized_batch: Vec, batch_merkle_tree: &MerkleTree, ) -> Result<(), BatcherError> { // Finalized_batch is ordered as the PriorityQueue, ordered by: ascending max_fee && if max_fee is equal, by descending nonce. diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index 7a1bf78718..fc8bb44b26 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -47,9 +47,6 @@ use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend; use log::{debug, error, info, warn}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{Mutex, MutexGuard, RwLock}; - -// Message handler lock timeout -const MESSAGE_HANDLER_LOCK_TIMEOUT: Duration = Duration::from_secs(10); use tokio_tungstenite::tungstenite::{Error, Message}; use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority}; use types::errors::{BatcherError, TransactionSendError}; @@ -75,7 +72,6 @@ mod zk_utils; pub const LISTEN_NEW_BLOCKS_MAX_TIMES: usize = usize::MAX; pub struct Batcher { - // Configuration parameters s3_client: S3Client, s3_bucket_name: String, download_endpoint: String, @@ -91,42 +87,19 @@ pub struct Batcher { payment_service_fallback: BatcherPaymentService, service_manager: ServiceManager, service_manager_fallback: ServiceManager, + batch_state: Mutex, min_block_interval: u64, transaction_wait_timeout: u64, max_proof_size: usize, max_batch_byte_size: usize, max_batch_proof_qty: usize, + last_uploaded_batch_block: Mutex, pre_verification_is_enabled: bool, non_paying_config: Option, - aggregator_fee_percentage_multiplier: u128, - aggregator_gas_cost: u128, - - // Shared state access: - // Two kinds of threads interact with the shared state: - // 1. User message processing threads (run in parallel) - // 2. Batch creation thread (runs sequentially, includes failure recovery) - // - // Locking rules: - // - To avoid deadlocks, always acquire `user_states` before `batch_state`. - // - During failure recovery, restoring a valid state may require breaking this rule: - // additional user locks might be acquired *after* the batch lock. - // (See the `restore` algorithm in the `batch_queue` module.) - // - // Because of this exception, user message handling uses lock acquisition with timeouts. - batch_state: Mutex, - - user_states: Arc>>>>, - - last_uploaded_batch_block: Mutex, - - /// This is used to avoid multiple batches being submitted at the same time - /// It could be removed in the future by changing how we spawn - /// the batch creation task posting_batch: Mutex, - disabled_verifiers: Mutex, - - // Observability and monitoring + aggregator_fee_percentage_multiplier: u128, + aggregator_gas_cost: u128, pub metrics: metrics::BatcherMetrics, pub telemetry: TelemetrySender, } @@ -185,7 +158,7 @@ impl Batcher { let deployment_output = ContractDeploymentOutput::new(config.aligned_layer_deployment_config_file_path); - info!( + log::info!( "Starting metrics server on port {}", config.batcher.metrics_port ); @@ -266,8 +239,8 @@ impl Batcher { .await .expect("Failed to get fallback Service Manager contract"); - let user_states = Arc::new(RwLock::new(HashMap::new())); - let batch_state = BatchState::new(config.batcher.max_queue_size); + let mut user_states = HashMap::new(); + let mut batch_state = BatchState::new(config.batcher.max_queue_size); let non_paying_config = if let Some(non_paying_config) = config.batcher.non_paying { warn!("Non-paying address configuration detected. Will replace non-paying address {} with configured address.", non_paying_config.address); @@ -280,11 +253,13 @@ impl Batcher { .expect("Could not get non-paying nonce from Ethereum"); let non_paying_user_state = UserState::new(nonpaying_nonce); - user_states.write().await.insert( + user_states.insert( non_paying_config.replacement.address(), - Arc::new(Mutex::new(non_paying_user_state)), + non_paying_user_state, ); + batch_state = + BatchState::new_with_user_states(user_states, config.batcher.max_queue_size); Some(non_paying_config) } else { None @@ -331,120 +306,12 @@ impl Batcher { aggregator_gas_cost: config.batcher.aggregator_gas_cost, posting_batch: Mutex::new(false), batch_state: Mutex::new(batch_state), - user_states, disabled_verifiers: Mutex::new(disabled_verifiers), metrics, telemetry, } } - async fn update_evicted_user_state_with_lock( - &self, - removed_entry: &types::batch_queue::BatchQueueEntry, - batch_queue: &types::batch_queue::BatchQueue, - user_state_guard: &mut tokio::sync::MutexGuard<'_, crate::types::user_state::UserState>, - ) { - let addr = removed_entry.sender; - - let new_last_max_fee_limit = match batch_queue - .iter() - .filter(|(e, _)| e.sender == addr) - .next_back() - { - Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee, - None => { - self.user_states.write().await.remove(&addr); - return; - } - }; - - user_state_guard.proofs_in_batch -= 1; - user_state_guard.nonce -= U256::one(); - user_state_guard.total_fees_in_queue -= removed_entry.nonced_verification_data.max_fee; - user_state_guard.last_max_fee_limit = new_last_max_fee_limit; - } - - // Fallback async version for restoration path where we don't have pre-held locks - async fn update_evicted_user_state_async( - &self, - removed_entry: &types::batch_queue::BatchQueueEntry, - batch_queue: &types::batch_queue::BatchQueue, - ) -> Option<()> { - let addr = removed_entry.sender; - - let new_last_max_fee_limit = match batch_queue - .iter() - .filter(|(e, _)| e.sender == addr) - .next_back() - { - Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee, - None => { - self.user_states.write().await.remove(&addr); - return Some(()); - } - }; - - let user_state = self.user_states.read().await.get(&addr)?.clone(); - let mut user_state_guard = user_state.lock().await; - user_state_guard.proofs_in_batch -= 1; - user_state_guard.nonce -= U256::one(); - user_state_guard.total_fees_in_queue -= removed_entry.nonced_verification_data.max_fee; - user_state_guard.last_max_fee_limit = new_last_max_fee_limit; - Some(()) - } - - fn calculate_new_user_states_data( - &self, - batch_queue: &batch_queue::BatchQueue, - ) -> HashMap { - let mut updated_user_states = HashMap::new(); - for (entry, _) in batch_queue.iter() { - let addr = entry.sender; - let max_fee = entry.nonced_verification_data.max_fee; - - let (proof_count, max_fee_limit, total_fees_in_queue) = updated_user_states - .entry(addr) - .or_insert((0, max_fee, U256::zero())); - - *proof_count += 1; - *total_fees_in_queue += max_fee; - if max_fee < *max_fee_limit { - *max_fee_limit = max_fee; - } - } - updated_user_states - } - - /// Helper to apply 15-second timeout to user lock acquisition with consistent logging and metrics - async fn try_user_lock_with_timeout(&self, addr: Address, lock_future: F) -> Option - where - F: std::future::Future, - { - match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, lock_future).await { - Ok(result) => Some(result), - Err(_) => { - warn!("User lock acquisition timed out for address {}", addr); - self.metrics.inc_message_handler_user_lock_timeout(); - None - } - } - } - - /// Helper to apply 15-second timeout to batch lock acquisition with consistent logging and metrics - async fn try_batch_lock_with_timeout(&self, lock_future: F) -> Option - where - F: std::future::Future, - { - match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, lock_future).await { - Ok(result) => Some(result), - Err(_) => { - warn!("Batch lock acquisition timed out"); - self.metrics.inc_message_handler_batch_lock_timeout(); - None - } - } - } - pub async fn listen_connections(self: Arc, address: &str) -> Result<(), BatcherError> { // Create the event loop and TCP listener we'll accept connections on. let listener = TcpListener::bind(address) @@ -727,35 +594,8 @@ impl Batcher { } let cached_user_nonce = { - let user_states_guard = match timeout( - MESSAGE_HANDLER_LOCK_TIMEOUT, - self.user_states.read(), - ) - .await - { - Ok(guard) => guard, - Err(_) => { - warn!("User states read lock acquisition timed out in handle_get_nonce_for_address_msg"); - self.metrics.inc_message_handler_user_states_lock_timeouts(); - send_message(ws_conn_sink, GetNonceResponseMessage::ServerBusy).await; - return Ok(()); - } - }; - let user_state_ref = user_states_guard.get(&address).cloned(); - match user_state_ref { - Some(user_state_ref) => { - let Some(user_state_guard) = self - .try_user_lock_with_timeout(address, user_state_ref.lock()) - .await - else { - send_message(ws_conn_sink.clone(), GetNonceResponseMessage::ServerBusy) - .await; - return Ok(()); - }; - Some(user_state_guard.nonce) - } - None => None, - } + let batch_state_lock = self.batch_state.lock().await; + batch_state_lock.get_user_nonce(&address).await }; let user_nonce = if let Some(user_nonce) = cached_user_nonce { @@ -853,6 +693,48 @@ impl Batcher { nonced_verification_data = aux_verification_data } + // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients + if self.pre_verification_is_enabled { + let verification_data = &nonced_verification_data.verification_data; + if self + .is_verifier_disabled(verification_data.proving_system) + .await + { + warn!( + "Verifier for proving system {} is disabled, skipping verification", + verification_data.proving_system + ); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier( + verification_data.proving_system, + )), + ) + .await; + self.metrics.user_error(&[ + "disabled_verifier", + &format!("{}", verification_data.proving_system), + ]); + return Ok(()); + } + + if !zk_utils::verify(verification_data).await { + error!("Invalid proof detected. Verification failed"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof), + ) + .await; + self.metrics.user_error(&[ + "rejected_proof", + &format!("{}", verification_data.proving_system), + ]); + return Ok(()); + } + } + + info!("Handling message"); + // We don't need a batch state lock here, since if the user locks its funds // after the check, some blocks should pass until he can withdraw. // It is safe to do just do this here. @@ -860,26 +742,17 @@ impl Batcher { return Ok(()); } - info!("Handling message, locking user state"); - // We acquire the lock first only to query if the user is already present and the lock is dropped. // If it was not present, then the user nonce is queried to the Aligned contract. // Lastly, we get a lock of the batch state again and insert the user state if it was still missing. - let is_user_in_state = match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, self.user_states.read()) - .await + let is_user_in_state: bool; { - Ok(user_states_guard) => user_states_guard.contains_key(&addr), - Err(_) => { - warn!("User states read lock acquisition timed out in handle_submit_proof_msg (user check)"); - self.metrics.inc_message_handler_user_states_lock_timeouts(); - send_message(ws_conn_sink, SubmitProofResponseMessage::ServerBusy).await; - return Ok(()); - } - }; + let batch_state_lock = self.batch_state.lock().await; + is_user_in_state = batch_state_lock.user_states.contains_key(&addr); + } if !is_user_in_state { - // If the user state was not present, we need to get the nonce from the Ethereum contract let ethereum_user_nonce = match self.get_user_nonce_from_ethereum(addr).await { Ok(ethereum_user_nonce) => ethereum_user_nonce, Err(e) => { @@ -895,54 +768,14 @@ impl Batcher { return Ok(()); } }; - debug!("User state for address {addr:?} not found, creating a new one"); - // We add a dummy user state to grab a lock on the user state - let dummy_user_state = UserState::new(ethereum_user_nonce); - match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, self.user_states.write()).await { - Ok(mut user_states_guard) => { - user_states_guard.insert(addr, Arc::new(Mutex::new(dummy_user_state))); - } - Err(_) => { - warn!("User states write lock acquisition timed out in handle_submit_proof_msg (user creation)"); - self.metrics.inc_message_handler_user_states_lock_timeouts(); - send_message(ws_conn_sink, SubmitProofResponseMessage::ServerBusy).await; - return Ok(()); - } - }; - debug!("Dummy user state for address {addr:?} created"); + let user_state = UserState::new(ethereum_user_nonce); + let mut batch_state_lock = self.batch_state.lock().await; + batch_state_lock + .user_states + .entry(addr) + .or_insert(user_state); } - let user_state_ref = match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, self.user_states.read()) - .await - { - Ok(user_states_guard) => user_states_guard.get(&addr).cloned(), - Err(_) => { - warn!("User states read lock acquisition timed out in handle_submit_proof_msg (user retrieval)"); - self.metrics.inc_message_handler_user_states_lock_timeouts(); - send_message(ws_conn_sink, SubmitProofResponseMessage::ServerBusy).await; - return Ok(()); - } - }; - let Some(user_state_ref) = user_state_ref else { - error!("This should never happen, user state has previously been inserted if it didn't exist"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); - }; - - // We acquire the lock on the user state, now everything will be processed sequentially - let Some(mut user_state_guard) = self - .try_user_lock_with_timeout(addr, user_state_ref.lock()) - .await - else { - send_message(ws_conn_sink.clone(), SubmitProofResponseMessage::ServerBusy).await; - return Ok(()); - }; - // * ---------------------------------------------------* // * Perform validations over user state * // * ---------------------------------------------------* @@ -958,12 +791,40 @@ impl Batcher { return Ok(()); }; + // For now on until the message is fully processed, the batch state is locked + // This is needed because we need to query the user state to make validations and + // finally add the proof to the batch queue. + + let mut batch_state_lock = self.batch_state.lock().await; + let msg_max_fee = nonced_verification_data.max_fee; - let user_last_max_fee_limit = user_state_guard.last_max_fee_limit; + let Some(user_last_max_fee_limit) = + batch_state_lock.get_user_last_max_fee_limit(&addr).await + else { + std::mem::drop(batch_state_lock); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + self.metrics.user_error(&["batcher_state_error", ""]); + return Ok(()); + }; - let user_accumulated_fee = user_state_guard.total_fees_in_queue; + let Some(user_accumulated_fee) = batch_state_lock.get_user_total_fees_in_queue(&addr).await + else { + std::mem::drop(batch_state_lock); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + self.metrics.user_error(&["batcher_state_error", ""]); + return Ok(()); + }; if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) { + std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::InsufficientBalance(addr), @@ -973,9 +834,22 @@ impl Batcher { return Ok(()); } - let expected_nonce = user_state_guard.nonce; + let cached_user_nonce = batch_state_lock.get_user_nonce(&addr).await; + + let Some(expected_nonce) = cached_user_nonce else { + error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted"); + std::mem::drop(batch_state_lock); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + self.metrics.user_error(&["batcher_state_error", ""]); + return Ok(()); + }; if expected_nonce < msg_nonce { + std::mem::drop(batch_state_lock); warn!("Invalid nonce for address {addr}, expected nonce: {expected_nonce:?}, received nonce: {msg_nonce:?}"); send_message( ws_conn_sink.clone(), @@ -988,17 +862,14 @@ impl Batcher { // In this case, the message might be a replacement one. If it is valid, // we replace the old entry with the new from the replacement message. - // Notice this stops the normal flow of the handle_submit_proof. - // We pass the already-held user_state_guard to avoid double-locking - // This will take the batch lock internally if expected_nonce > msg_nonce { info!("Possible replacement message received: Expected nonce {expected_nonce:?} - message nonce: {msg_nonce:?}"); self.handle_replacement_message( + batch_state_lock, nonced_verification_data, ws_conn_sink.clone(), client_msg.signature, addr, - user_state_guard, ) .await; @@ -1008,6 +879,7 @@ impl Batcher { // We check this after replacement logic because if user wants to replace a proof, their // new_max_fee must be greater or equal than old_max_fee if msg_max_fee > user_last_max_fee_limit { + std::mem::drop(batch_state_lock); warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}"); send_message( ws_conn_sink.clone(), @@ -1018,116 +890,56 @@ impl Batcher { return Ok(()); } - if !self - .verify_proof_if_enabled( - &nonced_verification_data.verification_data, - ws_conn_sink.clone(), - ) - .await - { - return Ok(()); - } - // * ---------------------------------------------------------------------* // * Perform validation over batcher queue * // * ---------------------------------------------------------------------* - let Some(mut batch_state_lock) = self - .try_batch_lock_with_timeout(self.batch_state.lock()) - .await - else { - send_message(ws_conn_sink.clone(), SubmitProofResponseMessage::ServerBusy).await; - return Ok(()); - }; if batch_state_lock.is_queue_full() { debug!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry."); + // This cannot panic, if the batch queue is full it has at least one item + let (lowest_priority_entry, _) = batch_state_lock + .batch_queue + .peek() + .expect("Batch queue was expected to be full, but somehow no item was inside"); + + let lowest_fee_in_queue = lowest_priority_entry.nonced_verification_data.max_fee; + let new_proof_fee = nonced_verification_data.max_fee; - let mut evicted_entry = None; - // Collect addresses of potential candidates (lightweight) - let eviction_candidates: Vec
= batch_state_lock - .batch_queue - .iter() - .filter_map(|(entry, _)| { - if new_proof_fee > entry.nonced_verification_data.max_fee { - Some(entry.sender) - } else { - None - } - }) - .collect(); + // We will keep the proof with the highest fee + // Note: we previously checked that if it's a new proof from the same user the fee is the same or lower + // So this will never eject a proof of the same user with a lower nonce + // which is the expected behaviour + if new_proof_fee > lowest_fee_in_queue { + // This cannot panic, if the batch queue is full it has at least one item + let (removed_entry, _) = batch_state_lock + .batch_queue + .pop() + .expect("Batch queue was expected to be full, but somehow no item was inside"); - // Try to find any candidate whose lock we can acquire and immediately process them - for candidate_addr in eviction_candidates { - if let Some(user_state_arc) = - self.user_states.read().await.get(&candidate_addr).cloned() - { - if let Ok(mut user_guard) = user_state_arc.try_lock() { - // Found someone whose lock we can get - now find and remove their entry - let entries_to_check: Vec<_> = batch_state_lock - .batch_queue - .iter() - .filter(|(entry, _)| { - entry.sender == candidate_addr - && new_proof_fee > entry.nonced_verification_data.max_fee - }) - .map(|(entry, _)| entry.clone()) - .collect(); - - if let Some(target_entry) = entries_to_check.into_iter().next() { - let removed_entry = batch_state_lock - .batch_queue - .remove(&target_entry) - .map(|(e, _)| e); - - if let Some(removed) = removed_entry { - info!( - "Incoming proof (nonce: {}, fee: {}) replacing proof from sender {} with nonce {} (fee: {})", - nonced_verification_data.nonce, - new_proof_fee, - removed.sender, - removed.nonced_verification_data.nonce, - removed.nonced_verification_data.max_fee - ); - - // Update the evicted user's state immediately - self.update_evicted_user_state_with_lock( - &removed, - &batch_state_lock.batch_queue, - &mut user_guard, - ) - .await; - - if let Some(ref removed_entry_ws) = removed.messaging_sink { - let ws_sink = removed_entry_ws.clone(); - // Usually we just drop the locks, but this time - // We still need to keep them since we are doing more work - // So we send the message in an async manner - tokio::spawn(async move { - send_message( - ws_sink, - SubmitProofResponseMessage::UnderpricedProof, - ) - .await; - }); - } - - evicted_entry = Some(removed); - break; - } - } - } - } - } + info!( + "Incoming proof (nonce: {}, fee: {}) has higher fee. Replacing lowest fee proof from sender {} with nonce {}.", + nonced_verification_data.nonce, + nonced_verification_data.max_fee, + removed_entry.sender, + removed_entry.nonced_verification_data.nonce + ); - // Check if we successfully evicted someone - if evicted_entry.is_none() { - // No lock could be acquired or no evictable entry found - reject this proof + batch_state_lock.update_user_state_on_entry_removal(&removed_entry); + + if let Some(removed_entry_ws) = removed_entry.messaging_sink { + send_message( + removed_entry_ws, + SubmitProofResponseMessage::UnderpricedProof, + ) + .await; + }; + } else { info!( - "Incoming proof (nonce: {}, fee: {}) rejected - queue is full and no evictable entries found.", + "Incoming proof (nonce: {}, fee: {}) has lower priority than all entries in the full queue. Rejecting submission.", nonced_verification_data.nonce, - new_proof_fee + nonced_verification_data.max_fee ); std::mem::drop(batch_state_lock); send_message( @@ -1146,7 +958,7 @@ impl Batcher { if let Err(e) = self .add_to_batch( batch_state_lock, - &nonced_verification_data, + nonced_verification_data, ws_conn_sink.clone(), signature, addr, @@ -1159,13 +971,7 @@ impl Batcher { return Ok(()); }; - // Update user state now that entry has been successfully added to batch - let max_fee = nonced_verification_data.max_fee; - let nonce = nonced_verification_data.nonce; - user_state_guard.nonce = nonce + U256::one(); - user_state_guard.last_max_fee_limit = max_fee; - user_state_guard.proofs_in_batch += 1; - user_state_guard.total_fees_in_queue += max_fee; + info!("Verification data message handled"); Ok(()) } @@ -1194,25 +1000,16 @@ impl Batcher { /// Returns true if the message was replaced in the batch, false otherwise async fn handle_replacement_message( &self, + mut batch_state_lock: MutexGuard<'_, BatchState>, nonced_verification_data: NoncedVerificationData, ws_conn_sink: WsMessageSink, signature: Signature, addr: Address, - mut user_state_guard: tokio::sync::MutexGuard<'_, UserState>, ) { let replacement_max_fee = nonced_verification_data.max_fee; let nonce = nonced_verification_data.nonce; - let Some(mut batch_state_guard) = self - .try_batch_lock_with_timeout(self.batch_state.lock()) - .await - else { - drop(user_state_guard); - send_message(ws_conn_sink.clone(), SubmitProofResponseMessage::ServerBusy).await; - return; - }; - let Some(entry) = batch_state_guard.get_entry(addr, nonce) else { - drop(batch_state_guard); - drop(user_state_guard); + let Some(entry) = batch_state_lock.get_entry(addr, nonce) else { + std::mem::drop(batch_state_lock); warn!("Invalid nonce for address {addr}. Queue entry with nonce {nonce} not found"); send_message( ws_conn_sink.clone(), @@ -1224,36 +1021,21 @@ impl Batcher { }; let original_max_fee = entry.nonced_verification_data.max_fee; - // Require 10% fee increase to prevent DoS attacks. While this could theoretically overflow, - // it would require an attacker to have an impractical amount of Ethereum to reach U256::MAX - let min_required_fee = original_max_fee + (original_max_fee / U256::from(10)); // 10% increase (1.1x) - if replacement_max_fee < min_required_fee { - drop(batch_state_guard); - drop(user_state_guard); - info!("Replacement message fee increase too small for address {addr}. Original: {original_max_fee:?}, received: {replacement_max_fee:?}, minimum required: {min_required_fee:?}"); + if original_max_fee > replacement_max_fee { + std::mem::drop(batch_state_lock); + warn!("Invalid replacement message for address {addr}, had max fee: {original_max_fee:?}, received fee: {replacement_max_fee:?}"); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::InvalidReplacementMessage, ) .await; - self.metrics.user_error(&["insufficient_fee_increase", ""]); + self.metrics + .user_error(&["invalid_replacement_message", ""]); return; } info!("Replacing message for address {addr} with nonce {nonce} and max fee {replacement_max_fee}"); - if !self - .verify_proof_if_enabled( - &nonced_verification_data.verification_data, - ws_conn_sink.clone(), - ) - .await - { - drop(batch_state_guard); - drop(user_state_guard); - return; - } - // The replacement entry is built from the old entry and validated for then to be replaced let mut replacement_entry = entry.clone(); replacement_entry.signature = signature; @@ -1279,9 +1061,8 @@ impl Batcher { } replacement_entry.messaging_sink = Some(ws_conn_sink.clone()); - if !batch_state_guard.replacement_entry_is_valid(&replacement_entry) { - drop(batch_state_guard); - drop(user_state_guard); + if !batch_state_lock.replacement_entry_is_valid(&replacement_entry) { + std::mem::drop(batch_state_lock); warn!("Invalid replacement message"); send_message( ws_conn_sink.clone(), @@ -1302,66 +1083,45 @@ impl Batcher { // note that the entries are considered equal for the priority queue // if they have the same nonce and sender, so we can remove the old entry // by calling remove with the new entry - batch_state_guard.batch_queue.remove(&replacement_entry); - batch_state_guard.batch_queue.push( + batch_state_lock.batch_queue.remove(&replacement_entry); + batch_state_lock.batch_queue.push( replacement_entry.clone(), BatchQueueEntryPriority::new(replacement_max_fee, nonce), ); - // update max_fee_limit and total_fees_in_queue using already held user_state_guard - let updated_max_fee_limit_in_batch = batch_state_guard.get_user_min_fee_in_batch(&addr); - user_state_guard.last_max_fee_limit = updated_max_fee_limit_in_batch; - - let fee_difference = replacement_max_fee - original_max_fee; - user_state_guard.total_fees_in_queue += fee_difference; - } - - async fn verify_proof_if_enabled( - &self, - verification_data: &aligned_sdk::common::types::VerificationData, - ws_conn_sink: WsMessageSink, - ) -> bool { - if !self.pre_verification_is_enabled { - return true; - } - - if self - .is_verifier_disabled(verification_data.proving_system) - .await + // update max_fee_limit + let updated_max_fee_limit_in_batch = batch_state_lock.get_user_min_fee_in_batch(&addr); + if batch_state_lock + .update_user_max_fee_limit(&addr, updated_max_fee_limit_in_batch) + .is_none() { - warn!( - "Verifier for proving system {} is disabled", - verification_data.proving_system - ); + std::mem::drop(batch_state_lock); + warn!("User state for address {addr:?} was not present in batcher user states, but it should be"); send_message( - ws_conn_sink, - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier( - verification_data.proving_system, - )), + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, ) .await; - self.metrics.user_error(&[ - "disabled_verifier", - &format!("{}", verification_data.proving_system), - ]); - return false; - } + return; + }; - if !zk_utils::verify(verification_data).await { - error!("Invalid proof detected. Verification failed"); + // update total_fees_in_queue + if batch_state_lock + .update_user_total_fees_in_queue_of_replacement_message( + &addr, + original_max_fee, + replacement_max_fee, + ) + .is_none() + { + std::mem::drop(batch_state_lock); + warn!("User state for address {addr:?} was not present in batcher user states, but it should be"); send_message( - ws_conn_sink, - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof), + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, ) .await; - self.metrics.user_error(&[ - "rejected_proof", - &format!("{}", verification_data.proving_system), - ]); - return false; - } - - true + }; } async fn disabled_verifiers(&self) -> Result> { @@ -1403,7 +1163,7 @@ impl Batcher { async fn add_to_batch( &self, mut batch_state_lock: MutexGuard<'_, BatchState>, - verification_data: &NoncedVerificationData, + verification_data: NoncedVerificationData, ws_conn_sink: WsMessageSink, proof_submitter_sig: Signature, proof_submitter_addr: Address, @@ -1416,7 +1176,7 @@ impl Batcher { let nonce = verification_data.nonce; batch_state_lock.batch_queue.push( BatchQueueEntry::new( - verification_data.clone(), + verification_data, verification_data_comm, ws_conn_sink, proof_submitter_sig, @@ -1433,7 +1193,45 @@ impl Batcher { info!("Current batch queue length: {}", queue_len); - // User state will be updated by the caller who already has the lock + let Some(user_proof_count) = batch_state_lock + .get_user_proof_count(&proof_submitter_addr) + .await + else { + error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); + std::mem::drop(batch_state_lock); + return Err(BatcherError::AddressNotFoundInUserStates( + proof_submitter_addr, + )); + }; + + let Some(current_total_fees_in_queue) = batch_state_lock + .get_user_total_fees_in_queue(&proof_submitter_addr) + .await + else { + error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); + std::mem::drop(batch_state_lock); + return Err(BatcherError::AddressNotFoundInUserStates( + proof_submitter_addr, + )); + }; + + // User state is updated + if batch_state_lock + .update_user_state( + &proof_submitter_addr, + nonce + U256::one(), + max_fee, + user_proof_count + 1, + current_total_fees_in_queue + max_fee, + ) + .is_none() + { + error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); + std::mem::drop(batch_state_lock); + return Err(BatcherError::AddressNotFoundInUserStates( + proof_submitter_addr, + )); + }; Ok(()) } @@ -1452,8 +1250,8 @@ impl Batcher { /// an empty batch, even if the block interval has been reached. /// Once the batch meets the conditions for submission, the finalized batch is then passed to the /// `finalize_batch` function. - /// This function removes the proofs from the queue immediately to avoid race conditions. - async fn extract_batch_if_ready( + /// This function doesn't remove the proofs from the queue. + async fn is_batch_ready( &self, block_number: u64, gas_price: U256, @@ -1489,12 +1287,9 @@ impl Batcher { // Set the batch posting flag to true *batch_posting = true; - - // PHASE 1: Extract the batch directly from the queue to avoid race conditions - let mut batch_state_lock = batch_state_lock; // Make mutable - - let finalized_batch = batch_queue::extract_batch_directly( - &mut batch_state_lock.batch_queue, + let batch_queue_copy = batch_state_lock.batch_queue.clone(); + let finalized_batch = batch_queue::try_build_batch( + batch_queue_copy, gas_price, self.max_batch_byte_size, self.max_batch_proof_qty, @@ -1514,205 +1309,66 @@ impl Batcher { }) .ok()?; - info!( - "Extracted {} proofs from queue for batch processing", - finalized_batch.len() - ); - - // Update queue metrics after successful batch extraction - let queue_len = batch_state_lock.batch_queue.len(); - match calculate_batch_size(&batch_state_lock.batch_queue) { - Ok(queue_size_bytes) => { - self.metrics - .update_queue_metrics(queue_len as i64, queue_size_bytes as i64); - } - Err(e) => { - error!( - "Failed to calculate batch size for queue metrics update: {:?}", - e - ); - // Still update queue length metric, set size to 0 due to calculation error - self.metrics.update_queue_metrics(queue_len as i64, 0); - } - } - Some(finalized_batch) } - /// Updates user states based on current queue state after batch operations. - /// Used for both successful batch confirmation and failed batch restoration. - /// Updates proofs_in_batch, total_fees_in_queue, and last_max_fee_limit based on current queue state. - /// Uses proper lock ordering: user_state -> batch_state to avoid deadlocks. - async fn update_user_states_from_queue_state( + /// Takes the submitted proofs and removes them from the queue. + /// This function should be called only AFTER the submission was confirmed onchain + async fn remove_proofs_from_queue( &self, - affected_users: std::collections::HashSet
, + finalized_batch: Vec, ) -> Result<(), BatcherError> { - // Update each user's state with proper lock ordering - for addr in affected_users { - if let Some(user_state) = self.user_states.read().await.get(&addr).cloned() { - let mut user_state_guard = user_state.lock().await; // First: user lock - let batch_state_lock = self.batch_state.lock().await; // Second: batch lock - - // Calculate what each user's state should be based on current queue contents - let current_queue_user_states = - self.calculate_new_user_states_data(&batch_state_lock.batch_queue); - - if let Some((proof_count, min_max_fee_in_queue, total_fees_in_queue)) = - current_queue_user_states.get(&addr) - { - // User has proofs in queue - use calculated values - user_state_guard.proofs_in_batch = *proof_count; - user_state_guard.total_fees_in_queue = *total_fees_in_queue; - user_state_guard.last_max_fee_limit = *min_max_fee_in_queue; - } else { - // User not found in queue - reset to defaults - user_state_guard.proofs_in_batch = 0; - user_state_guard.total_fees_in_queue = U256::zero(); - user_state_guard.last_max_fee_limit = U256::MAX; - } - - drop(batch_state_lock); // Release batch lock - drop(user_state_guard); // Release user lock - } else { - warn!("User state not found for address {}", addr); - } - } - - Ok(()) - } - - /// Cleans up user states after successful batch submission. - /// Resets last_max_fee_limit to U256::MAX for users who had proofs in the submitted batch - /// but now have no proofs left in the queue. - async fn cleanup_user_states_after_successful_submission( - &self, - finalized_batch: &[BatchQueueEntry], - ) { - use std::collections::HashSet; - - // Get unique users from the submitted batch - let users_in_batch: HashSet
= - finalized_batch.iter().map(|entry| entry.sender).collect(); - - // Check current queue state to see which users still have proofs - let batch_state_lock = match self.batch_state.try_lock() { - Ok(lock) => lock, - Err(_) => { - // If we can't get the lock, skip cleanup - it's not critical - warn!("Could not acquire batch state lock for user state cleanup"); - return; - } - }; - - let current_user_states = - self.calculate_new_user_states_data(&batch_state_lock.batch_queue); - - // For each user in the batch, check if they now have no proofs left - for user_addr in users_in_batch { - if !current_user_states.contains_key(&user_addr) { - // User has no proofs left in queue - reset their max_fee_limit - if let Some(user_state_ref) = self.user_states.read().await.get(&user_addr).cloned() - { - if let Ok(mut user_state_guard) = user_state_ref.try_lock() { - user_state_guard.last_max_fee_limit = U256::max_value(); - } - // If we can't get the lock, skip this user - not critical - } - } - } - } - - /// Restores proofs to the queue after batch submission failure. - /// Uses similar logic to user proof submission, including handling queue capacity. - /// NOTE: Nonce ordering is preserved by the priority queue's eviction order: - /// - Lower fees get evicted first - /// - For same fees, higher nonces get evicted first - /// This ensures we never have nonce N+1 without nonce N in the queue. - async fn restore_proofs_after_batch_failure(&self, failed_batch: &[BatchQueueEntry]) { - info!( - "Restoring {} proofs to queue after batch failure", - failed_batch.len() - ); - - let user_states_lock = self.user_states.write().await; + info!("Removing proofs from queue..."); let mut batch_state_lock = self.batch_state.lock().await; - let mut restored_entries = Vec::new(); - - for entry in failed_batch { - let priority = BatchQueueEntryPriority::new( - entry.nonced_verification_data.max_fee, - entry.nonced_verification_data.nonce, - ); - // Check if queue is full - if batch_state_lock.is_queue_full() { - // Use same logic as user submission - evict lowest priority if this one is higher - if let Some((lowest_entry, _)) = batch_state_lock.batch_queue.peek() { - let lowest_fee = lowest_entry.nonced_verification_data.max_fee; - let restore_fee = entry.nonced_verification_data.max_fee; - - if restore_fee > lowest_fee { - // Evict the lowest priority entry (preserves nonce ordering) - if let Some((evicted_entry, _)) = batch_state_lock.batch_queue.pop() { - warn!("Queue full during restoration, evicting proof from sender {} with nonce {} (fee: {})", - evicted_entry.sender, evicted_entry.nonced_verification_data.nonce, evicted_entry.nonced_verification_data.max_fee); - - // Update user state for evicted entry - self.update_evicted_user_state_async( - &evicted_entry, - &batch_state_lock.batch_queue, - ) - .await; - - // Notify the evicted user via websocket - if let Some(evicted_ws_sink) = evicted_entry.messaging_sink { - connection::send_message( - evicted_ws_sink, - aligned_sdk::common::types::SubmitProofResponseMessage::UnderpricedProof, - ) - .await; - } - } - } else { - warn!("Queue full and restored proof has lower priority, dropping proof from sender {} with nonce {} (fee: {})", - entry.sender, entry.nonced_verification_data.nonce, entry.nonced_verification_data.max_fee); - continue; - } - } + finalized_batch.iter().for_each(|entry| { + if batch_state_lock.batch_queue.remove(entry).is_none() { + // If this happens, we have a bug in our code + error!("Some proofs were not found in the queue. This should not happen."); } - - // Add the proof back to the queue - batch_state_lock.batch_queue.push(entry.clone(), priority); - restored_entries.push(entry); + }); + + // now we calculate the new user_states + let new_user_states = // proofs, max_fee_limit, total_fees_in_queue + batch_state_lock.calculate_new_user_states_data(); + + let user_addresses: Vec
= batch_state_lock.user_states.keys().cloned().collect(); + let default_value = (0, U256::MAX, U256::zero()); + for addr in user_addresses.iter() { + let (proof_count, max_fee_limit, total_fees_in_queue) = + new_user_states.get(addr).unwrap_or(&default_value); + + // FIXME: The case where a the update functions return `None` can only happen when the user was not found + // in the `user_states` map should not really happen here, but doing this check so that we don't unwrap. + // Once https://github.com/yetanotherco/aligned_layer/issues/1046 is done we could return a more + // informative error. + + // Now we update the user states related to the batch (proof count in batch and min fee in batch) + batch_state_lock + .update_user_proof_count(addr, *proof_count) + .ok_or(BatcherError::QueueRemoveError( + "Could not update_user_proof_count".into(), + ))?; + batch_state_lock + .update_user_max_fee_limit(addr, *max_fee_limit) + .ok_or(BatcherError::QueueRemoveError( + "Could not update_user_max_fee_limit".into(), + ))?; + batch_state_lock + .update_user_total_fees_in_queue(addr, *total_fees_in_queue) + .ok_or(BatcherError::QueueRemoveError( + "Could not update_user_total_fees_in_queue".into(), + ))?; } - info!( - "Restored {} proofs to queue, new queue length: {}", - restored_entries.len(), - batch_state_lock.batch_queue.len() - ); - - // Get unique users from restored entries - let users_with_restored_proofs: std::collections::HashSet
= - restored_entries.iter().map(|entry| entry.sender).collect(); + // Update metrics + let queue_len = batch_state_lock.batch_queue.len(); + let queue_size_bytes = calculate_batch_size(&batch_state_lock.batch_queue)?; - // At this point we have a valid queue with updated evicted users states - // Only auxiliary user data (max_min_fee) can be "inconsistent" - // but we can keep updating it without locking the queue - info!("Queue recovered from submission failure, resuming user processing and updating user states metadata"); - std::mem::drop(user_states_lock); - std::mem::drop(batch_state_lock); + self.metrics + .update_queue_metrics(queue_len as i64, queue_size_bytes as i64); - info!("Updating user states after proof restoration..."); - if let Err(e) = self - .update_user_states_from_queue_state(users_with_restored_proofs) - .await - { - error!( - "Failed to update user states after proof restoration: {:?}", - e - ); - } + Ok(()) } /// Takes the finalized batch as input and: @@ -1725,12 +1381,13 @@ impl Batcher { async fn finalize_batch( &self, block_number: u64, - finalized_batch: &[BatchQueueEntry], + finalized_batch: Vec, gas_price: U256, ) -> Result<(), BatcherError> { let nonced_batch_verifcation_data: Vec = finalized_batch - .iter() - .map(|entry| entry.nonced_verification_data.clone()) + .clone() + .into_iter() + .map(|entry| entry.nonced_verification_data) .collect(); let batch_verification_data: Vec = nonced_batch_verifcation_data @@ -1743,8 +1400,9 @@ impl Batcher { info!("Finalizing batch. Length: {}", finalized_batch.len()); let batch_data_comm: Vec = finalized_batch - .iter() - .map(|entry| entry.verification_data_commitment.clone()) + .clone() + .into_iter() + .map(|entry| entry.verification_data_commitment) .collect(); let batch_merkle_tree: MerkleTree = @@ -1783,7 +1441,7 @@ impl Batcher { &batch_bytes, &batch_merkle_tree.root, leaves, - finalized_batch, + &finalized_batch, gas_price, ) .await @@ -1802,8 +1460,8 @@ impl Batcher { BatcherError::TransactionSendError( TransactionSendError::SubmissionInsufficientBalance, ) => { - // TODO: In the future, we should re-add the failed batch back to the queue - // For now, we flush everything as a safety measure + // TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch + // this would also need a message sent to the clients self.flush_queue_and_clear_nonce_cache().await; } _ => { @@ -1814,26 +1472,12 @@ impl Batcher { return Err(e); }; - // Note: Proofs were already removed from the queue during extraction phase - // Now update user states based on current queue state after successful submission - info!("Updating user states after batch confirmation..."); - let users_in_batch: std::collections::HashSet
= - finalized_batch.iter().map(|entry| entry.sender).collect(); - if let Err(e) = self - .update_user_states_from_queue_state(users_in_batch) - .await - { - error!( - "Failed to update user states after batch confirmation: {:?}", - e - ); - // Continue with the rest of the process since batch was already submitted successfully + // Once the submit is succesfull, we remove the submitted proofs from the queue + // TODO handle error case: + if let Err(e) = self.remove_proofs_from_queue(finalized_batch.clone()).await { + error!("Unexpected error while updating queue: {:?}", e); } - // Clean up user states for users who had proofs in this batch but now have no proofs left - self.cleanup_user_states_after_successful_submission(finalized_batch) - .await; - connection::send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await } @@ -1850,7 +1494,7 @@ impl Batcher { let Some(nonpaying_replacement_addr) = self.get_nonpaying_replacement_addr() else { batch_state_lock.batch_queue.clear(); - self.user_states.write().await.clear(); + batch_state_lock.user_states.clear(); return; }; @@ -1862,16 +1506,15 @@ impl Batcher { .await else { batch_state_lock.batch_queue.clear(); - self.user_states.write().await.clear(); + batch_state_lock.user_states.clear(); return; }; batch_state_lock.batch_queue.clear(); - self.user_states.write().await.clear(); + batch_state_lock.user_states.clear(); let nonpaying_user_state = UserState::new(nonpaying_replacement_addr_nonce); - self.user_states.write().await.insert( - nonpaying_replacement_addr, - Arc::new(Mutex::new(nonpaying_user_state)), - ); + batch_state_lock + .user_states + .insert(nonpaying_replacement_addr, nonpaying_user_state); self.metrics.update_queue_metrics(0, 0); } @@ -1902,28 +1545,16 @@ impl Batcher { let modified_gas_price = gas_price * U256::from(GAS_PRICE_PERCENTAGE_MULTIPLIER) / U256::from(PERCENTAGE_DIVIDER); - if let Some(finalized_batch) = self - .extract_batch_if_ready(block_number, modified_gas_price) - .await - { + if let Some(finalized_batch) = self.is_batch_ready(block_number, modified_gas_price).await { let batch_finalization_result = self - .finalize_batch(block_number, &finalized_batch, modified_gas_price) + .finalize_batch(block_number, finalized_batch, modified_gas_price) .await; // Resetting this here to avoid doing it on every return path of `finalize_batch` function let mut batch_posting = self.posting_batch.lock().await; *batch_posting = false; - // If batch finalization failed, restore the proofs to the queue - if let Err(e) = batch_finalization_result { - error!( - "Batch finalization failed, restoring proofs to queue: {:?}", - e - ); - self.restore_proofs_after_batch_failure(&finalized_batch) - .await; - return Err(e); - } + batch_finalization_result?; } Ok(()) diff --git a/crates/batcher/src/metrics.rs b/crates/batcher/src/metrics.rs index b68a5dc16b..8ee61c27b1 100644 --- a/crates/batcher/src/metrics.rs +++ b/crates/batcher/src/metrics.rs @@ -27,9 +27,6 @@ pub struct BatcherMetrics { pub cancel_create_new_task_duration: IntGauge, pub batcher_gas_cost_create_task_total: GenericCounter, pub batcher_gas_cost_cancel_task_total: GenericCounter, - pub message_handler_user_lock_timeouts: IntCounter, - pub message_handler_batch_lock_timeouts: IntCounter, - pub message_handler_user_states_lock_timeouts: IntCounter, pub available_data_services: IntGauge, } @@ -88,21 +85,6 @@ impl BatcherMetrics { "Number of available data services (0-2)" ))?; - let message_handler_user_lock_timeouts = register_int_counter!(opts!( - "message_handler_user_lock_timeouts_count", - "Message Handler User Lock Timeouts" - ))?; - - let message_handler_batch_lock_timeouts = register_int_counter!(opts!( - "message_handler_batch_lock_timeouts_count", - "Message Handler Batch Lock Timeouts" - ))?; - - let message_handler_user_states_lock_timeouts = register_int_counter!(opts!( - "message_handler_user_states_lock_timeouts_count", - "Message Handler User States Lock Timeouts" - ))?; - registry.register(Box::new(open_connections.clone()))?; registry.register(Box::new(received_proofs.clone()))?; registry.register(Box::new(sent_batches.clone()))?; @@ -119,9 +101,6 @@ impl BatcherMetrics { registry.register(Box::new(cancel_create_new_task_duration.clone()))?; registry.register(Box::new(batcher_gas_cost_create_task_total.clone()))?; registry.register(Box::new(batcher_gas_cost_cancel_task_total.clone()))?; - registry.register(Box::new(message_handler_user_lock_timeouts.clone()))?; - registry.register(Box::new(message_handler_batch_lock_timeouts.clone()))?; - registry.register(Box::new(message_handler_user_states_lock_timeouts.clone()))?; registry.register(Box::new(available_data_services.clone()))?; let metrics_route = warp::path!("metrics") @@ -151,12 +130,10 @@ impl BatcherMetrics { cancel_create_new_task_duration, batcher_gas_cost_create_task_total, batcher_gas_cost_cancel_task_total, - message_handler_user_lock_timeouts, - message_handler_batch_lock_timeouts, - message_handler_user_states_lock_timeouts, available_data_services, }) } + pub async fn metrics_handler(registry: prometheus::Registry) -> Result { use prometheus::Encoder; let encoder = prometheus::TextEncoder::new(); @@ -188,16 +165,4 @@ impl BatcherMetrics { self.queue_len.set(queue_len); self.queue_size_bytes.set(queue_size); } - - pub fn inc_message_handler_user_lock_timeout(&self) { - self.message_handler_user_lock_timeouts.inc(); - } - - pub fn inc_message_handler_batch_lock_timeout(&self) { - self.message_handler_batch_lock_timeouts.inc(); - } - - pub fn inc_message_handler_user_states_lock_timeouts(&self) { - self.message_handler_user_states_lock_timeouts.inc(); - } } diff --git a/crates/batcher/src/types/batch_queue.rs b/crates/batcher/src/types/batch_queue.rs index 7461b4ac3d..35ea3ce0c4 100644 --- a/crates/batcher/src/types/batch_queue.rs +++ b/crates/batcher/src/types/batch_queue.rs @@ -103,7 +103,7 @@ impl Ord for BatchQueueEntryPriority { // Implementation of lowest-first: let ord: std::cmp::Ordering = other.max_fee.cmp(&self.max_fee); // This means, less max_fee will go first - // We want this because we will .pop() to remove unwanted elements, low fee submissions. + // We want this because we will .pop() to remove unwanted elements, low fee submitions. if ord == std::cmp::Ordering::Equal { // Case of same max_fee: @@ -138,71 +138,57 @@ pub(crate) fn calculate_batch_size(batch_queue: &BatchQueue) -> Result Result, BatcherError> { - let mut batch_size = calculate_batch_size(batch_queue)?; - let mut rejected_entries = Vec::new(); - - // Remove entries that won't pay enough, or that makes a queue that is too big - loop { - let should_remove = if let Some((entry, _)) = batch_queue.peek() { - let batch_len = batch_queue.len(); - let fee_per_proof = calculate_fee_per_proof(batch_len, gas_price, constant_gas_cost); - - // if batch is not acceptable: - batch_size > max_batch_byte_size - || fee_per_proof > entry.nonced_verification_data.max_fee - || batch_len > max_batch_proof_qty - } else { - false - }; - - if should_remove { - // Remove this entry (it won't pay enough) and save it - let (rejected_entry, rejected_priority) = batch_queue.pop().unwrap(); - - // Update batch size - let verification_data_size = rejected_entry - .nonced_verification_data - .cbor_size_upper_bound(); + let mut finalized_batch = batch_queue; + let mut batch_size = calculate_batch_size(&finalized_batch)?; + + while let Some((entry, _)) = finalized_batch.peek() { + let batch_len = finalized_batch.len(); + let fee_per_proof = calculate_fee_per_proof(batch_len, gas_price, constant_gas_cost); + + // if batch is not acceptable: + if batch_size > max_batch_byte_size + || fee_per_proof > entry.nonced_verification_data.max_fee + || batch_len > max_batch_proof_qty + { + // Update the state for the next iteration: + // * Subtract this entry size to the size of the batch size. + // * Push the current entry to the resulting batch queue. + + let verification_data_size = entry.nonced_verification_data.cbor_size_upper_bound(); batch_size -= verification_data_size; - rejected_entries.push((rejected_entry, rejected_priority)); - } else { - // At this point, we found a viable batch - break - break; - } - } + finalized_batch.pop(); - // Check if we have a viable batch - if batch_queue.is_empty() { - // No viable batch found - put back the rejected entries - for (entry, priority) in rejected_entries { - batch_queue.push(entry, priority); + continue; } - return Err(BatcherError::BatchCostTooHigh); - } - // Extract remaining entries - let mut batch_for_posting = Vec::new(); - while let Some((entry, _)) = batch_queue.pop() { - batch_for_posting.push(entry); + // At this point, we break since we found a batch that can be submitted + break; } - // Put back the rejected entries (they stay in the queue for later) - for (entry, priority) in rejected_entries { - batch_queue.push(entry, priority); + // If `finalized_batch` is empty, this means that all the batch queue was traversed and we didn't find + // any user willing to pay fot the fee per proof. + if finalized_batch.is_empty() { + return Err(BatcherError::BatchCostTooHigh); } - Ok(batch_for_posting) + Ok(finalized_batch.clone().into_sorted_vec()) } fn calculate_fee_per_proof(batch_len: usize, gas_price: U256, constant_gas_cost: u128) -> U256 { @@ -312,9 +298,8 @@ mod test { batch_queue.push(entry_3, batch_priority_3); let gas_price = U256::from(1); - let mut test_queue = batch_queue.clone(); - let finalized_batch = extract_batch_directly( - &mut test_queue, + let finalized_batch = try_build_batch( + batch_queue.clone(), gas_price, 5000000, 50, @@ -425,9 +410,8 @@ mod test { batch_queue.push(entry_3, batch_priority_3); let gas_price = U256::from(1); - let mut test_queue = batch_queue.clone(); - let finalized_batch = extract_batch_directly( - &mut test_queue, + let finalized_batch = try_build_batch( + batch_queue.clone(), gas_price, 5000000, 50, @@ -536,9 +520,8 @@ mod test { batch_queue.push(entry_3.clone(), batch_priority_3.clone()); let gas_price = U256::from(1); - let mut test_queue = batch_queue.clone(); - let finalized_batch = extract_batch_directly( - &mut test_queue, + let finalized_batch = try_build_batch( + batch_queue.clone(), gas_price, 5000000, 2, @@ -647,9 +630,8 @@ mod test { batch_queue.push(entry_3, batch_priority_3); let gas_price = U256::from(1); - let mut test_queue = batch_queue.clone(); - let finalized_batch = extract_batch_directly( - &mut test_queue, + let finalized_batch = try_build_batch( + batch_queue.clone(), gas_price, 5000000, 50, @@ -764,9 +746,8 @@ mod test { batch_queue.push(entry_3, batch_priority_3); let gas_price = U256::from(1); - let mut test_queue = batch_queue.clone(); - let finalized_batch = extract_batch_directly( - &mut test_queue, + let finalized_batch = try_build_batch( + batch_queue.clone(), gas_price, 5000000, 50, @@ -787,77 +768,6 @@ mod test { ); } - #[test] - fn batch_finalization_algorithm_works_single_high_fee_proof() { - // Test the scenario: 1 proof with high fee that should be viable - let proof_generator_addr = Address::random(); - let payment_service_addr = Address::random(); - let sender_addr = Address::random(); - let bytes_for_verification_data = vec![42_u8; 10]; - let dummy_signature = Signature { - r: U256::from(1), - s: U256::from(2), - v: 3, - }; - let verification_data = VerificationData { - proving_system: ProvingSystemId::Risc0, - proof: bytes_for_verification_data.clone(), - pub_input: Some(bytes_for_verification_data.clone()), - verification_key: Some(bytes_for_verification_data.clone()), - vm_program_code: Some(bytes_for_verification_data), - proof_generator_addr, - }; - let chain_id = U256::from(42); - - // Single entry with very high fee - should definitely be viable - let nonce = U256::from(1); - let high_max_fee = U256::from(1_000_000_000_000_000_000u128); // Very high fee - 1 ETH - let nonced_verification_data = NoncedVerificationData::new( - verification_data, - nonce, - high_max_fee, - chain_id, - payment_service_addr, - ); - let vd_commitment: VerificationDataCommitment = nonced_verification_data.clone().into(); - let entry = BatchQueueEntry::new_for_testing( - nonced_verification_data, - vd_commitment, - dummy_signature, - sender_addr, - ); - let batch_priority = BatchQueueEntryPriority::new(high_max_fee, nonce); - - let mut batch_queue = BatchQueue::new(); - batch_queue.push(entry, batch_priority); - - let gas_price = U256::from(10_000_000_000u64); // 10 gwei gas price - let mut test_queue = batch_queue.clone(); - let finalized_batch = extract_batch_directly( - &mut test_queue, - gas_price, - 5000000, // Large byte size limit - 50, // Large proof quantity limit - DEFAULT_CONSTANT_GAS_COST, - ); - - // This should succeed and return the single proof - assert!( - finalized_batch.is_ok(), - "Should successfully extract batch with single high-fee proof" - ); - let batch = finalized_batch.unwrap(); - assert_eq!(batch.len(), 1, "Batch should contain exactly 1 proof"); - assert_eq!(batch[0].nonced_verification_data.max_fee, high_max_fee); - - // The queue should now be empty (no rejected entries to put back) - assert_eq!( - test_queue.len(), - 0, - "Queue should be empty after extracting the single viable proof" - ); - } - #[test] fn batch_finalization_algorithm_works_not_bigger_than_max_batch_proof_qty() { // The following information will be the same for each entry, it is just some dummy data to see @@ -952,9 +862,8 @@ mod test { // The max batch len is 2, so the algorithm should stop at the second entry. let max_batch_proof_qty = 2; - let mut test_queue = batch_queue.clone(); - let finalized_batch = extract_batch_directly( - &mut test_queue, + let finalized_batch = try_build_batch( + batch_queue.clone(), gas_price, 5000000, max_batch_proof_qty, @@ -973,4 +882,208 @@ mod test { max_fee_1 ); } + + #[test] + fn test_batch_size_limit_enforcement_with_real_sp1_proofs() { + use aligned_sdk::common::types::VerificationData; + use aligned_sdk::communication::serialization::cbor_serialize; + use std::fs; + + let proof_generator_addr = Address::random(); + let payment_service_addr = Address::random(); + let chain_id = U256::from(42); + let dummy_signature = Signature { + r: U256::from(1), + s: U256::from(2), + v: 3, + }; + + // Load actual SP1 proof files + let proof_path = "../../scripts/test_files/sp1/sp1_fibonacci_5_0_0.proof"; + let elf_path = "../../scripts/test_files/sp1/sp1_fibonacci_5_0_0.elf"; + let pub_input_path = "../../scripts/test_files/sp1/sp1_fibonacci_5_0_0.pub"; + + let proof_data = match fs::read(proof_path) { + Ok(data) => data, + Err(_) => return, // Skip test if files not found + }; + + let elf_data = match fs::read(elf_path) { + Ok(data) => data, + Err(_) => return, + }; + + let pub_input_data = match fs::read(pub_input_path) { + Ok(data) => data, + Err(_) => return, + }; + + let verification_data = VerificationData { + proving_system: ProvingSystemId::SP1, + proof: proof_data, + pub_input: Some(pub_input_data), + verification_key: None, + vm_program_code: Some(elf_data), + proof_generator_addr, + }; + + // Create 10 entries using the same SP1 proof data + let mut batch_queue = BatchQueue::new(); + let max_fee = U256::from(1_000_000_000_000_000u128); + + for i in 0..10 { + let sender_addr = Address::random(); + let nonce = U256::from(i + 1); + + let nonced_verification_data = NoncedVerificationData::new( + verification_data.clone(), + nonce, + max_fee, + chain_id, + payment_service_addr, + ); + + let vd_commitment: VerificationDataCommitment = nonced_verification_data.clone().into(); + let entry = BatchQueueEntry::new_for_testing( + nonced_verification_data, + vd_commitment, + dummy_signature, + sender_addr, + ); + let batch_priority = BatchQueueEntryPriority::new(max_fee, nonce); + batch_queue.push(entry, batch_priority); + } + + // Test with a 5MB batch size limit + let batch_size_limit = 5_000_000; // 5MB + let gas_price = U256::from(1); + + let finalized_batch = try_build_batch( + batch_queue.clone(), + gas_price, + batch_size_limit, + 50, // max proof qty + DEFAULT_CONSTANT_GAS_COST, + ) + .unwrap(); + + // Verify the finalized batch respects the size limit + let finalized_verification_data: Vec = finalized_batch + .iter() + .map(|entry| entry.nonced_verification_data.verification_data.clone()) + .collect(); + + let finalized_serialized = cbor_serialize(&finalized_verification_data).unwrap(); + let finalized_actual_size = finalized_serialized.len(); + + // Assert the batch respects the limit + assert!( + finalized_actual_size <= batch_size_limit, + "Finalized batch size {} exceeds limit {}", + finalized_actual_size, + batch_size_limit + ); + + // Verify some entries were included (not empty batch) + assert!(!finalized_batch.is_empty(), "Batch should not be empty"); + + // Verify not all entries were included (some should be rejected due to size limit) + assert!( + finalized_batch.len() < 10, + "Batch should not include all entries due to size limit" + ); + } + + #[test] + fn test_cbor_size_upper_bound_accuracy() { + use aligned_sdk::common::types::VerificationData; + use aligned_sdk::communication::serialization::cbor_serialize; + use std::fs; + + let proof_generator_addr = Address::random(); + let payment_service_addr = Address::random(); + let chain_id = U256::from(42); + + // Load actual SP1 proof files + let proof_path = "../../scripts/test_files/sp1/sp1_fibonacci_5_0_0.proof"; + let elf_path = "../../scripts/test_files/sp1/sp1_fibonacci_5_0_0.elf"; + let pub_input_path = "../../scripts/test_files/sp1/sp1_fibonacci_5_0_0.pub"; + + let proof_data = match fs::read(proof_path) { + Ok(data) => data, + Err(_) => return, // Skip test if files not found + }; + + let elf_data = match fs::read(elf_path) { + Ok(data) => data, + Err(_) => return, + }; + + let pub_input_data = match fs::read(pub_input_path) { + Ok(data) => data, + Err(_) => return, + }; + + let verification_data = VerificationData { + proving_system: ProvingSystemId::SP1, + proof: proof_data, + pub_input: Some(pub_input_data), + verification_key: None, + vm_program_code: Some(elf_data), + proof_generator_addr, + }; + + let nonced_verification_data = NoncedVerificationData::new( + verification_data.clone(), + U256::from(1), + U256::from(1_000_000_000_000_000u128), + chain_id, + payment_service_addr, + ); + + // Test cbor_size_upper_bound() accuracy + let estimated_size = nonced_verification_data.cbor_size_upper_bound(); + + // Compare with actual CBOR serialization of the full NoncedVerificationData + let actual_nonced_serialized = cbor_serialize(&nonced_verification_data).unwrap(); + let actual_nonced_size = actual_nonced_serialized.len(); + + // Also test the inner VerificationData for additional validation + let actual_inner_serialized = cbor_serialize(&verification_data).unwrap(); + let actual_inner_size = actual_inner_serialized.len(); + + // Verify CBOR encodes binary data efficiently (with serde_bytes fix), this misses some overhead but the proof is big enough as to not matter + + let raw_total = verification_data.proof.len() + + verification_data.vm_program_code.as_ref().unwrap().len() + + verification_data.pub_input.as_ref().unwrap().len(); + + let cbor_efficiency_ratio = actual_inner_size as f64 / raw_total as f64; + + // With serde_bytes, CBOR should be very efficient (close to 1.0x) + assert!( + cbor_efficiency_ratio < 1.01, + "CBOR serialization should be efficient with serde_bytes. Ratio: {:.3}x", + cbor_efficiency_ratio + ); + + // Verify CBOR encodes binary data efficiently with serde_bytes + // Should be close to 1.0x overhead (raw data size vs CBOR size) + + // The estimation should be an upper bound + assert!( + estimated_size >= actual_nonced_size, + "cbor_size_upper_bound() should be an upper bound. Estimated: {}, Actual: {}", + estimated_size, + actual_nonced_size + ); + + // The estimation should also be reasonable (not wildly over-estimated) + let estimation_overhead = estimated_size as f64 / actual_nonced_size as f64; + assert!( + estimation_overhead < 1.1, + "Estimation should be reasonable, not wildly over-estimated. Overhead: {:.3}x", + estimation_overhead + ); + } } diff --git a/crates/batcher/src/types/batch_state.rs b/crates/batcher/src/types/batch_state.rs index 1530195192..481ca44f74 100644 --- a/crates/batcher/src/types/batch_state.rs +++ b/crates/batcher/src/types/batch_state.rs @@ -1,9 +1,15 @@ -use super::batch_queue::{BatchQueue, BatchQueueEntry}; +use std::collections::{hash_map::Entry, HashMap}; + +use super::{ + batch_queue::{BatchQueue, BatchQueueEntry}, + user_state::UserState, +}; use ethers::types::{Address, U256}; use log::debug; pub(crate) struct BatchState { pub(crate) batch_queue: BatchQueue, + pub(crate) user_states: HashMap, pub(crate) max_size: usize, } @@ -13,6 +19,18 @@ impl BatchState { pub(crate) fn new(max_size: usize) -> Self { Self { batch_queue: BatchQueue::new(), + user_states: HashMap::new(), + max_size, + } + } + + pub(crate) fn new_with_user_states( + user_states: HashMap, + max_size: usize, + ) -> Self { + Self { + batch_queue: BatchQueue::new(), + user_states, max_size, } } @@ -26,6 +44,30 @@ impl BatchState { .find(|entry| entry.sender == sender && entry.nonced_verification_data.nonce == nonce) } + pub(crate) fn get_user_state(&self, addr: &Address) -> Option<&UserState> { + self.user_states.get(addr) + } + + pub(crate) async fn get_user_nonce(&self, addr: &Address) -> Option { + let user_state = self.get_user_state(addr)?; + Some(user_state.nonce) + } + + pub(crate) async fn get_user_last_max_fee_limit(&self, addr: &Address) -> Option { + let user_state = self.get_user_state(addr)?; + Some(user_state.last_max_fee_limit) + } + + pub(crate) async fn get_user_total_fees_in_queue(&self, addr: &Address) -> Option { + let user_state = self.get_user_state(addr)?; + Some(user_state.total_fees_in_queue) + } + + pub(crate) async fn get_user_proof_count(&self, addr: &Address) -> Option { + let user_state = self.get_user_state(addr)?; + Some(user_state.proofs_in_batch) + } + pub(crate) fn get_user_min_fee_in_batch(&self, addr: &Address) -> U256 { self.batch_queue .iter() @@ -35,8 +77,127 @@ impl BatchState { .unwrap_or(U256::max_value()) } + // SETTERS: + + pub(crate) fn update_user_max_fee_limit( + &mut self, + addr: &Address, + new_max_fee_limit: U256, + ) -> Option { + // TODO refactor to return Result, or something less redundant + if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { + user_state.get_mut().last_max_fee_limit = new_max_fee_limit; + return Some(new_max_fee_limit); + } + None + } + + pub(crate) fn update_user_proof_count( + &mut self, + addr: &Address, + new_proof_count: usize, + ) -> Option { + // TODO refactor to return Result, or something less redundant + if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { + user_state.get_mut().proofs_in_batch = new_proof_count; + return Some(new_proof_count); + } + None + } + + pub(crate) fn update_user_nonce(&mut self, addr: &Address, new_nonce: U256) -> Option { + // TODO refactor to return Result, or something less redundant + if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { + user_state.get_mut().nonce = new_nonce; + return Some(new_nonce); + } + None + } + + pub(crate) fn update_user_total_fees_in_queue( + &mut self, + addr: &Address, + new_total_fees_in_queue: U256, + ) -> Option { + // TODO refactor to return Result, or something less redundant + if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { + user_state.get_mut().total_fees_in_queue = new_total_fees_in_queue; + return Some(new_total_fees_in_queue); + } + None + } + + pub(crate) fn update_user_total_fees_in_queue_of_replacement_message( + &mut self, + addr: &Address, + original_max_fee: U256, + new_max_fee: U256, + ) -> Option { + // TODO refactor to return Result, or something less redundant + let fee_difference = new_max_fee - original_max_fee; //here we already know new_max_fee > original_max_fee + if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { + user_state.get_mut().total_fees_in_queue += fee_difference; + return Some(user_state.get().total_fees_in_queue); + } + None + } + + /// Updates the user with address `addr` with the provided values of + /// `new_nonce`, `new_max_fee_limit`, `new_proof_count` and `new_total_fees_in_queue` + /// If state is updated successfully, returns the updated values inside a `Some()` + /// If the address was not found in the user states, returns `None` + pub(crate) fn update_user_state( + &mut self, + addr: &Address, + new_nonce: U256, + new_max_fee_limit: U256, + new_proof_count: usize, + new_total_fees_in_queue: U256, + ) -> Option<(U256, U256, usize, U256)> { + // TODO refactor to return Result, or something less redundant + let updated_nonce = self.update_user_nonce(addr, new_nonce); + let updated_max_fee_limit = self.update_user_max_fee_limit(addr, new_max_fee_limit); + let updated_proof_count = self.update_user_proof_count(addr, new_proof_count); + let updated_total_fees_in_queue = + self.update_user_total_fees_in_queue(addr, new_total_fees_in_queue); + + if updated_nonce.is_some() + && updated_max_fee_limit.is_some() + && updated_proof_count.is_some() + && updated_total_fees_in_queue.is_some() + { + return Some(( + new_nonce, + new_max_fee_limit, + new_proof_count, + new_total_fees_in_queue, + )); + } + None + } + // LOGIC: + pub(crate) fn calculate_new_user_states_data(&self) -> HashMap { + let mut updated_user_states = HashMap::new(); // address -> (proof_count, max_fee_limit, total_fees_in_queue) + for (entry, _) in self.batch_queue.iter() { + let addr = entry.sender; + let max_fee = entry.nonced_verification_data.max_fee; + + let (proof_count, max_fee_limit, total_fees_in_queue) = updated_user_states + .entry(addr) + .or_insert((0, max_fee, U256::zero())); + + *proof_count += 1; + *total_fees_in_queue += max_fee; + if max_fee < *max_fee_limit { + *max_fee_limit = max_fee; + } + } + + updated_user_states + } + /// Checks if the entry is valid /// An entry is valid if there is no entry with the same sender, lower nonce and a lower fee pub(crate) fn replacement_entry_is_valid( @@ -60,6 +221,37 @@ impl BatchState { }) } + /// Updates or removes a user's state when their latest proof entry is removed from the batch queue. + /// + /// If the user has no other proofs remaining in the queue, their state is removed entirely. + /// Otherwise, the user's state is updated to reflect the next most recent entry in the queue. + /// + /// Note: The given `removed_entry` must be the most recent (latest or highest nonce) entry for the user in the queue. + pub(crate) fn update_user_state_on_entry_removal(&mut self, removed_entry: &BatchQueueEntry) { + let addr = removed_entry.sender; + + let new_last_max_fee_limit = match self + .batch_queue + .iter() + .filter(|(e, _)| e.sender == addr) + .next_back() + { + Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee, + None => { + self.user_states.remove(&addr); + return; + } + }; + + if let Entry::Occupied(mut user_state) = self.user_states.entry(addr) { + user_state.get_mut().proofs_in_batch -= 1; + user_state.get_mut().nonce -= U256::one(); + user_state.get_mut().total_fees_in_queue -= + removed_entry.nonced_verification_data.max_fee; + user_state.get_mut().last_max_fee_limit = new_last_max_fee_limit; + } + } + pub(crate) fn is_queue_full(&self) -> bool { self.batch_queue.len() >= self.max_size } diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index a9778cbda2..ebada4d68e 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -567,9 +567,6 @@ async fn main() -> Result<(), AlignedError> { aligned_sdk::common::errors::GetNonceError::UnexpectedResponse(e) => { SubmitError::UnexpectedBatcherResponse(e) } - aligned_sdk::common::errors::GetNonceError::GenericError(e) => { - SubmitError::GenericError(e) - } })?, }; diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml index 964be4a4e9..767e2ed266 100644 --- a/crates/sdk/Cargo.toml +++ b/crates/sdk/Cargo.toml @@ -19,6 +19,7 @@ tokio = { version = "1.37.0", features = [ ] } lambdaworks-crypto = { git = "https://github.com/lambdaclass/lambdaworks.git", rev = "5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b", features = ["serde"]} serde = { version = "1.0.201", features = ["derive"] } +serde_bytes = "0.11" sha3 = { version = "0.10.8" } url = "2.5.0" hex = "0.4.3" diff --git a/crates/sdk/src/common/errors.rs b/crates/sdk/src/common/errors.rs index 4be1dfa451..30d1147242 100644 --- a/crates/sdk/src/common/errors.rs +++ b/crates/sdk/src/common/errors.rs @@ -251,7 +251,6 @@ pub enum GetNonceError { UnexpectedResponse(String), InvalidRequest(String), ProtocolMismatch { current: u16, expected: u16 }, - GenericError(String), } #[derive(Debug)] diff --git a/crates/sdk/src/common/types.rs b/crates/sdk/src/common/types.rs index 2129ac38f1..5f88eca40a 100644 --- a/crates/sdk/src/common/types.rs +++ b/crates/sdk/src/common/types.rs @@ -65,9 +65,28 @@ impl Display for ProvingSystemId { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct VerificationData { pub proving_system: ProvingSystemId, + #[serde(with = "serde_bytes")] pub proof: Vec, + #[serde( + default, + skip_serializing_if = "Option::is_none", + deserialize_with = "deserialize_option_bytes", + serialize_with = "serialize_option_bytes" + )] pub pub_input: Option>, + #[serde( + default, + skip_serializing_if = "Option::is_none", + deserialize_with = "deserialize_option_bytes", + serialize_with = "serialize_option_bytes" + )] pub verification_key: Option>, + #[serde( + default, + skip_serializing_if = "Option::is_none", + deserialize_with = "deserialize_option_bytes", + serialize_with = "serialize_option_bytes" + )] pub vm_program_code: Option>, pub proof_generator_addr: Address, } @@ -429,7 +448,6 @@ pub enum SubmitProofResponseMessage { EthRpcError, InvalidPaymentServiceAddress(Address, Address), UnderpricedProof, - ServerBusy, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -437,7 +455,6 @@ pub enum GetNonceResponseMessage { Nonce(U256), EthRpcError(String), InvalidRequest(String), - ServerBusy, } #[derive(Debug, Clone)] @@ -504,6 +521,25 @@ impl Network { } } +// Helper functions for serializing Option> with serde_bytes +fn serialize_option_bytes(value: &Option>, serializer: S) -> Result +where + S: serde::Serializer, +{ + match value { + Some(bytes) => serde_bytes::serialize(bytes, serializer), + None => serializer.serialize_none(), + } +} + +fn deserialize_option_bytes<'de, D>(deserializer: D) -> Result>, D::Error> +where + D: serde::Deserializer<'de>, +{ + let opt: Option = Option::deserialize(deserializer)?; + Ok(opt.map(|buf| buf.into_vec())) +} + #[cfg(test)] mod tests { use ethers::signers::LocalWallet; diff --git a/crates/sdk/src/communication/messaging.rs b/crates/sdk/src/communication/messaging.rs index 19d593c938..2a1c100e7d 100644 --- a/crates/sdk/src/communication/messaging.rs +++ b/crates/sdk/src/communication/messaging.rs @@ -269,12 +269,6 @@ async fn handle_batcher_response(msg: Message) -> Result { - error!("Server is busy processing requests, please retry. Funds have not been spent."); - Err(SubmitError::GenericError( - "Server is busy processing requests, please retry".to_string(), - )) - } Err(e) => { error!( "Error while deserializing batch inclusion data: {}. Funds have not been spent.", diff --git a/crates/sdk/src/verification_layer/mod.rs b/crates/sdk/src/verification_layer/mod.rs index 9ddadba5b8..82d4604da7 100644 --- a/crates/sdk/src/verification_layer/mod.rs +++ b/crates/sdk/src/verification_layer/mod.rs @@ -584,9 +584,6 @@ pub async fn get_nonce_from_batcher( Ok(GetNonceResponseMessage::Nonce(nonce)) => Ok(nonce), Ok(GetNonceResponseMessage::EthRpcError(e)) => Err(GetNonceError::EthRpcError(e)), Ok(GetNonceResponseMessage::InvalidRequest(e)) => Err(GetNonceError::InvalidRequest(e)), - Ok(GetNonceResponseMessage::ServerBusy) => Err(GetNonceError::GenericError( - "Server is busy processing requests, please retry".to_string(), - )), Err(_) => Err(GetNonceError::SerializationError( "Failed to deserialize batcher message".to_string(), )),