Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 86 additions & 40 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use aligned_sdk::core::constants::{
BUMP_MAX_RETRIES, BUMP_MAX_RETRY_DELAY, BUMP_MIN_RETRY_DELAY, CONSTANT_GAS_COST,
DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER, DEFAULT_MAX_FEE_PER_PROOF,
ETHEREUM_CALL_BACKOFF_FACTOR, ETHEREUM_CALL_MAX_RETRIES, ETHEREUM_CALL_MAX_RETRY_DELAY,
ETHEREUM_CALL_MIN_RETRY_DELAY, GAS_PRICE_PERCENTAGE_MULTIPLIER, MIN_FEE_PER_PROOF,
PERCENTAGE_DIVIDER, RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER,
ETHEREUM_CALL_MIN_RETRY_DELAY, GAS_PRICE_PERCENTAGE_MULTIPLIER, PERCENTAGE_DIVIDER,
RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER,
};
use aligned_sdk::core::types::{
ClientMessage, GetNonceResponseMessage, NoncedVerificationData, ProofInvalidReason,
Expand Down Expand Up @@ -519,7 +519,7 @@ impl Batcher {
)
.await;
self.metrics
.user_error(&["invalid_paument_service_address", ""]);
.user_error(&["invalid_payment_service_address", ""]);
return Ok(());
}

Expand Down Expand Up @@ -630,10 +630,10 @@ impl Batcher {
);
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidNonce,
SubmitProofResponseMessage::EthRpcError,
)
.await;
self.metrics.user_error(&["invalid_nonce", ""]);
self.metrics.user_error(&["eth_rpc_error", ""]);
return Ok(());
}
};
Expand Down Expand Up @@ -665,19 +665,34 @@ impl Batcher {
// finally add the proof to the batch queue.

let batch_state_lock = self.batch_state.lock().await;
let Some(proofs_in_batch) = batch_state_lock.get_user_proof_count(&addr).await else {
error!("Failed to get user proof count: User not found in user states, but it should have been already inserted");

let msg_max_fee = nonced_verification_data.max_fee;
let Some(user_last_max_fee_limit) =
batch_state_lock.get_user_last_max_fee_limit(&addr).await
else {
std::mem::drop(batch_state_lock);
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidNonce,
SubmitProofResponseMessage::AddToBatchError,
)
.await;
self.metrics.user_error(&["invalid_nonce", ""]);
self.metrics.user_error(&["batcher_state_error", ""]);
return Ok(());
};

let Some(user_accumulated_fee) = batch_state_lock.get_user_total_fees_in_queue(&addr).await
else {
std::mem::drop(batch_state_lock);
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::AddToBatchError,
)
.await;
self.metrics.user_error(&["batcher_state_error", ""]);
return Ok(());
};

if !self.check_min_balance(proofs_in_batch + 1, user_balance) {
if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) {
std::mem::drop(batch_state_lock);
send_message(
ws_conn_sink.clone(),
Expand All @@ -694,10 +709,10 @@ impl Batcher {
std::mem::drop(batch_state_lock);
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidNonce,
SubmitProofResponseMessage::AddToBatchError,
)
.await;
self.metrics.user_error(&["invalid_nonce", ""]);
self.metrics.user_error(&["batcher_state_error", ""]);
return Ok(());
};

Expand Down Expand Up @@ -729,21 +744,9 @@ impl Batcher {
return Ok(());
}

let msg_max_fee = nonced_verification_data.max_fee;
let Some(user_min_fee) = batch_state_lock.get_user_min_fee(&addr).await else {
std::mem::drop(batch_state_lock);
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidNonce,
)
.await;
self.metrics.user_error(&["invalid_nonce", ""]);
return Ok(());
};

if msg_max_fee > user_min_fee {
if msg_max_fee > user_last_max_fee_limit {
std::mem::drop(batch_state_lock);
warn!("Invalid max fee for address {addr}, had fee {user_min_fee:?} < {msg_max_fee:?}");
warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}");
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidMaxFee,
Expand Down Expand Up @@ -782,10 +785,15 @@ impl Batcher {
zk_utils::is_verifier_disabled(*disabled_verifiers, verifier)
}

// Checks user has sufficient balance for paying all its the proofs in the current batch.
fn check_min_balance(&self, user_proofs_in_batch: usize, user_balance: U256) -> bool {
let min_balance = U256::from(user_proofs_in_batch) * U256::from(MIN_FEE_PER_PROOF);
user_balance >= min_balance
// Verifies user has enough balance for paying all his proofs in the current batch.
fn verify_user_has_enough_balance(
&self,
user_balance: U256,
user_accumulated_fee: U256,
new_msg_max_fee: U256,
) -> bool {
let required_balance: U256 = user_accumulated_fee + new_msg_max_fee;
user_balance >= required_balance
}

/// Handles a replacement message
Expand Down Expand Up @@ -820,7 +828,7 @@ impl Batcher {
let original_max_fee = entry.nonced_verification_data.max_fee;
if original_max_fee > replacement_max_fee {
std::mem::drop(batch_state_lock);
warn!("Invalid replacement message for address {addr}, had fee {original_max_fee:?} < {replacement_max_fee:?}");
warn!("Invalid replacement message for address {addr}, had max fee: {original_max_fee:?}, received fee: {replacement_max_fee:?}");
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidReplacementMessage,
Expand Down Expand Up @@ -886,13 +894,38 @@ impl Batcher {
BatchQueueEntryPriority::new(replacement_max_fee, nonce),
);

let updated_min_fee_in_batch = batch_state_lock.get_user_min_fee_in_batch(&addr);
// update max_fee_limit
let updated_max_fee_limit_in_batch = batch_state_lock.get_user_min_fee_in_batch(&addr);
if batch_state_lock
.update_user_min_fee(&addr, updated_min_fee_in_batch)
.update_user_max_fee_limit(&addr, updated_max_fee_limit_in_batch)
.is_none()
{
std::mem::drop(batch_state_lock);
warn!("User state for address {addr:?} was not present in batcher user states, but it should be");
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::AddToBatchError,
)
.await;
return;
};

// update total_fees_in_queue
if batch_state_lock
.update_user_total_fees_in_queue_of_replacement_message(
&addr,
original_max_fee,
replacement_max_fee,
)
.is_none()
{
std::mem::drop(batch_state_lock);
warn!("User state for address {addr:?} was not present in batcher user states, but it should be");
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::AddToBatchError,
)
.await;
};
}

Expand Down Expand Up @@ -984,13 +1017,25 @@ impl Batcher {
));
};

let Some(current_total_fees_in_queue) = batch_state_lock
.get_user_total_fees_in_queue(&proof_submitter_addr)
.await
else {
error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present");
std::mem::drop(batch_state_lock);
return Err(BatcherError::AddressNotFoundInUserStates(
proof_submitter_addr,
));
};

// User state is updated
if batch_state_lock
.update_user_state(
&proof_submitter_addr,
nonce + U256::one(),
max_fee,
user_proof_count + 1,
current_total_fees_in_queue + max_fee,
)
.is_none()
{
Expand Down Expand Up @@ -1029,7 +1074,7 @@ impl Batcher {

if current_batch_len < 2 {
info!(
"Current batch has {} proof. Waiting for more proofs...",
"Current batch has {} proofs. Waiting for more proofs...",
current_batch_len
);
return None;
Expand Down Expand Up @@ -1072,14 +1117,14 @@ impl Batcher {
.ok()?;

batch_state_lock.batch_queue = resulting_batch_queue;
let updated_user_proof_count_and_min_fee =
batch_state_lock.get_user_proofs_in_batch_and_min_fee();
let new_user_states = // proofs, max_fee_limit, total_fees_in_queue
batch_state_lock.calculate_new_user_states_data();

let user_addresses: Vec<Address> = batch_state_lock.user_states.keys().cloned().collect();
let default_value = (0, U256::MAX, U256::zero());
for addr in user_addresses.iter() {
let (proof_count, min_fee) = updated_user_proof_count_and_min_fee
.get(addr)
.unwrap_or(&(0, U256::MAX));
let (proof_count, max_fee_limit, total_fees_in_queue) =
new_user_states.get(addr).unwrap_or(&default_value);

// FIXME: The case where a the update functions return `None` can only happen when the user was not found
// in the `user_states` map should not really happen here, but doing this check so that we don't unwrap.
Expand All @@ -1088,7 +1133,8 @@ impl Batcher {

// Now we update the user states related to the batch (proof count in batch and min fee in batch)
batch_state_lock.update_user_proof_count(addr, *proof_count)?;
batch_state_lock.update_user_min_fee(addr, *min_fee)?;
batch_state_lock.update_user_max_fee_limit(addr, *max_fee_limit)?;
batch_state_lock.update_user_total_fees_in_queue(addr, *total_fees_in_queue)?;
}

Some(finalized_batch)
Expand Down
Loading
Loading