Skip to content
Merged
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 71 additions & 163 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,28 +496,9 @@ impl Batcher {

async fn handle_get_nonce_for_address_msg(
self: Arc<Self>,
mut address: Address,
address: Address,
ws_conn_sink: WsMessageSink,
) -> Result<(), Error> {
if self.is_nonpaying(&address) {
info!("Handling nonpaying message");
let Some(non_paying_config) = self.non_paying_config.as_ref() else {
warn!(
"There isn't a non-paying configuration loaded. This message will be ignored"
);
send_message(
ws_conn_sink.clone(),
GetNonceResponseMessage::InvalidRequest(
"There isn't a non-paying configuration loaded.".to_string(),
),
)
.await;
return Ok(());
};
let replacement_addr = non_paying_config.replacement.address();
address = replacement_addr;
}

let cached_user_nonce = {
let batch_state_lock = self.batch_state.lock().await;
batch_state_lock.get_user_nonce(&address).await
Expand Down Expand Up @@ -585,14 +566,34 @@ impl Batcher {
return Ok(());
}

let Some(addr) = self
let Some(addr_in_msg) = self
.msg_signature_is_valid(&client_msg, &ws_conn_sink)
.await
else {
return Ok(());
};

let nonced_verification_data = client_msg.verification_data.clone();
let addr;
let signature = client_msg.signature;
let nonced_verification_data;

if self.has_to_pay(&addr_in_msg) {
addr = addr_in_msg;
nonced_verification_data = client_msg.verification_data.clone();
} else {
info!("Generating non-paying data");
// If the user is not required to pay, substitute their address with a pre-funded Aligned address
addr = self
.non_paying_config
.as_ref()
.unwrap()
.replacement
.address();
// Substitute the max_fee to a high enough value to cover the gas cost of the proof
let mut aux_verification_data = client_msg.verification_data.clone();
aux_verification_data.max_fee = (DEFAULT_MAX_FEE_PER_PROOF * 100).into(); // 2_000 gas per proof * 100 gwei gas price (upper bound) * 100 to make sure it is enough
nonced_verification_data = aux_verification_data
}

// When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
if self.pre_verification_is_enabled {
Expand Down Expand Up @@ -634,14 +635,7 @@ impl Batcher {
}
}

if self.is_nonpaying(&addr) {
// TODO: Non paying msg and paying should share some logic
return self
.handle_nonpaying_msg(ws_conn_sink.clone(), &client_msg)
.await;
}

info!("Handling paying message");
info!("Handling message");

// We don't need a batch state lock here, since if the user locks its funds
// after the check, some blocks should pass until he can withdraw.
Expand Down Expand Up @@ -742,45 +736,48 @@ impl Batcher {
return Ok(());
}

let cached_user_nonce = batch_state_lock.get_user_nonce(&addr).await;
let Some(expected_nonce) = cached_user_nonce else {
error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted");
std::mem::drop(batch_state_lock);
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::AddToBatchError,
)
.await;
self.metrics.user_error(&["batcher_state_error", ""]);
return Ok(());
};
if self.has_to_pay(&addr_in_msg) {
let cached_user_nonce = batch_state_lock.get_user_nonce(&addr).await;

if expected_nonce < msg_nonce {
std::mem::drop(batch_state_lock);
warn!("Invalid nonce for address {addr}, expected nonce: {expected_nonce:?}, received nonce: {msg_nonce:?}");
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidNonce,
)
.await;
self.metrics.user_error(&["invalid_nonce", ""]);
return Ok(());
}
let Some(expected_nonce) = cached_user_nonce else {
error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted");
std::mem::drop(batch_state_lock);
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::AddToBatchError,
)
.await;
self.metrics.user_error(&["batcher_state_error", ""]);
return Ok(());
};

// In this case, the message might be a replacement one. If it is valid,
// we replace the old entry with the new from the replacement message.
if expected_nonce > msg_nonce {
info!("Possible replacement message received: Expected nonce {expected_nonce:?} - message nonce: {msg_nonce:?}");
self.handle_replacement_message(
batch_state_lock,
nonced_verification_data,
ws_conn_sink.clone(),
client_msg.signature,
addr,
)
.await;
if expected_nonce < msg_nonce {
std::mem::drop(batch_state_lock);
warn!("Invalid nonce for address {addr}, expected nonce: {expected_nonce:?}, received nonce: {msg_nonce:?}");
send_message(
ws_conn_sink.clone(),
SubmitProofResponseMessage::InvalidNonce,
)
.await;
self.metrics.user_error(&["invalid_nonce", ""]);
return Ok(());
}

return Ok(());
// In this case, the message might be a replacement one. If it is valid,
// we replace the old entry with the new from the replacement message.
if expected_nonce > msg_nonce {
info!("Possible replacement message received: Expected nonce {expected_nonce:?} - message nonce: {msg_nonce:?}");
self.handle_replacement_message(
batch_state_lock,
nonced_verification_data,
ws_conn_sink.clone(),
client_msg.signature,
addr,
)
.await;

return Ok(());
}
}

// We check this after replacement logic because if user wants to replace a proof, their
Expand Down Expand Up @@ -867,7 +864,7 @@ impl Batcher {
batch_state_lock,
nonced_verification_data,
ws_conn_sink.clone(),
client_msg.signature,
signature,
addr,
)
.await
Expand Down Expand Up @@ -1100,17 +1097,6 @@ impl Batcher {

info!("Current batch queue length: {}", queue_len);

let mut proof_submitter_addr = proof_submitter_addr;

// If the proof submitter is the nonpaying one, we should update the state
// of the replacement address.
proof_submitter_addr = if self.is_nonpaying(&proof_submitter_addr) {
self.get_nonpaying_replacement_addr()
.unwrap_or(proof_submitter_addr)
} else {
proof_submitter_addr
};

let Some(user_proof_count) = batch_state_lock
.get_user_proof_count(&proof_submitter_addr)
.await
Expand Down Expand Up @@ -1743,98 +1729,20 @@ impl Batcher {
0.0
}

/// Only relevant for testing and for users to easily use Aligned
fn is_nonpaying(&self, addr: &Address) -> bool {
self.non_paying_config
.as_ref()
.is_some_and(|non_paying_config| non_paying_config.address == *addr)
/// An address has to pay if it's on mainnet or is not the special designated address on testnet
fn has_to_pay(&self, addr: &Address) -> bool {
self.non_paying_config.is_none()
|| self
.non_paying_config
.as_ref()
.is_some_and(|non_paying_config| non_paying_config.address != *addr)
}

fn get_nonpaying_replacement_addr(&self) -> Option<Address> {
let non_paying_conf = self.non_paying_config.as_ref()?;
Some(non_paying_conf.replacement.address())
}

/// Only relevant for testing and for users to easily use Aligned in testnet.
async fn handle_nonpaying_msg(
&self,
ws_sink: WsMessageSink,
client_msg: &SubmitProofMessage,
) -> Result<(), Error> {
info!("Handling nonpaying message");
let Some(non_paying_config) = self.non_paying_config.as_ref() else {
warn!("There isn't a non-paying configuration loaded. This message will be ignored");
send_message(ws_sink.clone(), SubmitProofResponseMessage::InvalidNonce).await;
return Ok(());
};

let replacement_addr = non_paying_config.replacement.address();
let Some(replacement_user_balance) = self.get_user_balance(&replacement_addr).await else {
error!("Could not get balance for non-paying address {replacement_addr:?}");
send_message(
ws_sink.clone(),
SubmitProofResponseMessage::InsufficientBalance(replacement_addr),
)
.await;
return Ok(());
};

if replacement_user_balance == U256::from(0) {
error!("Insufficient funds for non-paying address {replacement_addr:?}");
send_message(
ws_sink.clone(),
SubmitProofResponseMessage::InsufficientBalance(replacement_addr),
)
.await;
return Ok(());
}

let batch_state_lock = self.batch_state.lock().await;

if batch_state_lock.is_queue_full() {
error!("Can't add new entry, the batcher queue is full");
send_message(
ws_sink.clone(),
SubmitProofResponseMessage::UnderpricedProof,
)
.await;
return Ok(());
}

let nonced_verification_data = NoncedVerificationData::new(
client_msg.verification_data.verification_data.clone(),
client_msg.verification_data.nonce,
DEFAULT_MAX_FEE_PER_PROOF.into(), // 2_000 gas per proof * 100 gwei gas price (upper bound)
self.chain_id,
self.payment_service.address(),
);

let client_msg = SubmitProofMessage::new(
nonced_verification_data.clone(),
non_paying_config.replacement.clone(),
)
.await;

let signature = client_msg.signature;
if let Err(e) = self
.add_to_batch(
batch_state_lock,
nonced_verification_data,
ws_sink.clone(),
signature,
replacement_addr,
)
.await
{
info!("Error while adding nonpaying address entry to batch: {e:?}");
send_message(ws_sink, SubmitProofResponseMessage::AddToBatchError).await;
return Ok(());
};

info!("Non-paying verification data message handled");
Ok(())
}

/// Gets the balance of user with address `addr` from Ethereum.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs)
Expand Down
Loading