diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index 11117b055e..52f62ce115 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -24,8 +24,8 @@ use aligned_sdk::core::constants::{ BUMP_MAX_RETRIES, BUMP_MAX_RETRY_DELAY, BUMP_MIN_RETRY_DELAY, CONSTANT_GAS_COST, DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER, DEFAULT_MAX_FEE_PER_PROOF, ETHEREUM_CALL_BACKOFF_FACTOR, ETHEREUM_CALL_MAX_RETRIES, ETHEREUM_CALL_MAX_RETRY_DELAY, - ETHEREUM_CALL_MIN_RETRY_DELAY, GAS_PRICE_PERCENTAGE_MULTIPLIER, MIN_FEE_PER_PROOF, - PERCENTAGE_DIVIDER, RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER, + ETHEREUM_CALL_MIN_RETRY_DELAY, GAS_PRICE_PERCENTAGE_MULTIPLIER, PERCENTAGE_DIVIDER, + RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER, }; use aligned_sdk::core::types::{ ClientMessage, GetNonceResponseMessage, NoncedVerificationData, ProofInvalidReason, @@ -519,7 +519,7 @@ impl Batcher { ) .await; self.metrics - .user_error(&["invalid_paument_service_address", ""]); + .user_error(&["invalid_payment_service_address", ""]); return Ok(()); } @@ -630,10 +630,10 @@ impl Batcher { ); send_message( ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidNonce, + SubmitProofResponseMessage::EthRpcError, ) .await; - self.metrics.user_error(&["invalid_nonce", ""]); + self.metrics.user_error(&["eth_rpc_error", ""]); return Ok(()); } }; @@ -665,19 +665,34 @@ impl Batcher { // finally add the proof to the batch queue. let batch_state_lock = self.batch_state.lock().await; - let Some(proofs_in_batch) = batch_state_lock.get_user_proof_count(&addr).await else { - error!("Failed to get user proof count: User not found in user states, but it should have been already inserted"); + + let msg_max_fee = nonced_verification_data.max_fee; + let Some(user_last_max_fee_limit) = + batch_state_lock.get_user_last_max_fee_limit(&addr).await + else { std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidNonce, + SubmitProofResponseMessage::AddToBatchError, ) .await; - self.metrics.user_error(&["invalid_nonce", ""]); + self.metrics.user_error(&["batcher_state_error", ""]); + return Ok(()); + }; + + let Some(user_accumulated_fee) = batch_state_lock.get_user_total_fees_in_queue(&addr).await + else { + std::mem::drop(batch_state_lock); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + self.metrics.user_error(&["batcher_state_error", ""]); return Ok(()); }; - if !self.check_min_balance(proofs_in_batch + 1, user_balance) { + if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) { std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), @@ -694,10 +709,10 @@ impl Batcher { std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidNonce, + SubmitProofResponseMessage::AddToBatchError, ) .await; - self.metrics.user_error(&["invalid_nonce", ""]); + self.metrics.user_error(&["batcher_state_error", ""]); return Ok(()); }; @@ -729,21 +744,9 @@ impl Batcher { return Ok(()); } - let msg_max_fee = nonced_verification_data.max_fee; - let Some(user_min_fee) = batch_state_lock.get_user_min_fee(&addr).await else { - std::mem::drop(batch_state_lock); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidNonce, - ) - .await; - self.metrics.user_error(&["invalid_nonce", ""]); - return Ok(()); - }; - - if msg_max_fee > user_min_fee { + if msg_max_fee > user_last_max_fee_limit { std::mem::drop(batch_state_lock); - warn!("Invalid max fee for address {addr}, had fee {user_min_fee:?} < {msg_max_fee:?}"); + warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}"); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::InvalidMaxFee, @@ -782,10 +785,15 @@ impl Batcher { zk_utils::is_verifier_disabled(*disabled_verifiers, verifier) } - // Checks user has sufficient balance for paying all its the proofs in the current batch. - fn check_min_balance(&self, user_proofs_in_batch: usize, user_balance: U256) -> bool { - let min_balance = U256::from(user_proofs_in_batch) * U256::from(MIN_FEE_PER_PROOF); - user_balance >= min_balance + // Verifies user has enough balance for paying all his proofs in the current batch. + fn verify_user_has_enough_balance( + &self, + user_balance: U256, + user_accumulated_fee: U256, + new_msg_max_fee: U256, + ) -> bool { + let required_balance: U256 = user_accumulated_fee + new_msg_max_fee; + user_balance >= required_balance } /// Handles a replacement message @@ -820,7 +828,7 @@ impl Batcher { let original_max_fee = entry.nonced_verification_data.max_fee; if original_max_fee > replacement_max_fee { std::mem::drop(batch_state_lock); - warn!("Invalid replacement message for address {addr}, had fee {original_max_fee:?} < {replacement_max_fee:?}"); + warn!("Invalid replacement message for address {addr}, had max fee: {original_max_fee:?}, received fee: {replacement_max_fee:?}"); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::InvalidReplacementMessage, @@ -886,13 +894,38 @@ impl Batcher { BatchQueueEntryPriority::new(replacement_max_fee, nonce), ); - let updated_min_fee_in_batch = batch_state_lock.get_user_min_fee_in_batch(&addr); + // update max_fee_limit + let updated_max_fee_limit_in_batch = batch_state_lock.get_user_min_fee_in_batch(&addr); if batch_state_lock - .update_user_min_fee(&addr, updated_min_fee_in_batch) + .update_user_max_fee_limit(&addr, updated_max_fee_limit_in_batch) .is_none() { std::mem::drop(batch_state_lock); warn!("User state for address {addr:?} was not present in batcher user states, but it should be"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + return; + }; + + // update total_fees_in_queue + if batch_state_lock + .update_user_total_fees_in_queue_of_replacement_message( + &addr, + original_max_fee, + replacement_max_fee, + ) + .is_none() + { + std::mem::drop(batch_state_lock); + warn!("User state for address {addr:?} was not present in batcher user states, but it should be"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; }; } @@ -984,6 +1017,17 @@ impl Batcher { )); }; + let Some(current_total_fees_in_queue) = batch_state_lock + .get_user_total_fees_in_queue(&proof_submitter_addr) + .await + else { + error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); + std::mem::drop(batch_state_lock); + return Err(BatcherError::AddressNotFoundInUserStates( + proof_submitter_addr, + )); + }; + // User state is updated if batch_state_lock .update_user_state( @@ -991,6 +1035,7 @@ impl Batcher { nonce + U256::one(), max_fee, user_proof_count + 1, + current_total_fees_in_queue + max_fee, ) .is_none() { @@ -1029,7 +1074,7 @@ impl Batcher { if current_batch_len < 2 { info!( - "Current batch has {} proof. Waiting for more proofs...", + "Current batch has {} proofs. Waiting for more proofs...", current_batch_len ); return None; @@ -1072,14 +1117,14 @@ impl Batcher { .ok()?; batch_state_lock.batch_queue = resulting_batch_queue; - let updated_user_proof_count_and_min_fee = - batch_state_lock.get_user_proofs_in_batch_and_min_fee(); + let new_user_states = // proofs, max_fee_limit, total_fees_in_queue + batch_state_lock.calculate_new_user_states_data(); let user_addresses: Vec
= batch_state_lock.user_states.keys().cloned().collect(); + let default_value = (0, U256::MAX, U256::zero()); for addr in user_addresses.iter() { - let (proof_count, min_fee) = updated_user_proof_count_and_min_fee - .get(addr) - .unwrap_or(&(0, U256::MAX)); + let (proof_count, max_fee_limit, total_fees_in_queue) = + new_user_states.get(addr).unwrap_or(&default_value); // FIXME: The case where a the update functions return `None` can only happen when the user was not found // in the `user_states` map should not really happen here, but doing this check so that we don't unwrap. @@ -1088,7 +1133,8 @@ impl Batcher { // Now we update the user states related to the batch (proof count in batch and min fee in batch) batch_state_lock.update_user_proof_count(addr, *proof_count)?; - batch_state_lock.update_user_min_fee(addr, *min_fee)?; + batch_state_lock.update_user_max_fee_limit(addr, *max_fee_limit)?; + batch_state_lock.update_user_total_fees_in_queue(addr, *total_fees_in_queue)?; } Some(finalized_batch) diff --git a/batcher/aligned-batcher/src/types/batch_state.rs b/batcher/aligned-batcher/src/types/batch_state.rs index 3e09377ac9..2df6a0cdc6 100644 --- a/batcher/aligned-batcher/src/types/batch_state.rs +++ b/batcher/aligned-batcher/src/types/batch_state.rs @@ -13,6 +13,8 @@ pub(crate) struct BatchState { } impl BatchState { + // CONSTRUCTORS: + pub(crate) fn new() -> Self { Self { batch_queue: BatchQueue::new(), @@ -27,6 +29,8 @@ impl BatchState { } } + // GETTERS: + pub(crate) fn get_entry(&self, sender: Address, nonce: U256) -> Option<&BatchQueueEntry> { self.batch_queue .iter() @@ -43,17 +47,14 @@ impl BatchState { Some(user_state.nonce) } - pub(crate) async fn get_user_min_fee(&self, addr: &Address) -> Option