Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions batcher/aligned-batcher/src/eth/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use crate::{
},
};
use aligned_sdk::core::constants::{
DEFAULT_BACKOFF_FACTOR, DEFAULT_MAX_RETRIES, DEFAULT_MIN_RETRY_DELAY,
GAS_PRICE_INCREMENT_PERCENTAGE_PER_ITERATION, OVERRIDE_GAS_PRICE_PERCENTAGE_MULTIPLIER,
PERCENTAGE_DIVIDER,
ETHEREUM_CALL_BACKOFF_FACTOR, ETHEREUM_CALL_MAX_RETRIES, ETHEREUM_CALL_MAX_RETRY_DELAY,
ETHEREUM_CALL_MIN_RETRY_DELAY, GAS_PRICE_INCREMENT_PERCENTAGE_PER_ITERATION,
OVERRIDE_GAS_PRICE_PERCENTAGE_MULTIPLIER, PERCENTAGE_DIVIDER,
};
use ethers::prelude::*;
use ethers::providers::{Http, Provider};
Expand Down Expand Up @@ -61,16 +61,20 @@ pub fn calculate_bumped_gas_price(
bumped_current_gas_price.max(bumped_previous_gas_price)
}

/// Gets the current nonce from Ethereum.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
pub async fn get_current_nonce(
eth_http_provider: &Provider<Http>,
eth_http_provider_fallback: &Provider<Http>,
addr: H160,
) -> Result<U256, ProviderError> {
retry_function(
|| get_current_nonce_retryable(eth_http_provider, eth_http_provider_fallback, addr),
DEFAULT_MIN_RETRY_DELAY,
DEFAULT_BACKOFF_FACTOR,
DEFAULT_MAX_RETRIES,
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| {
Expand All @@ -79,16 +83,19 @@ pub async fn get_current_nonce(
})
}

/// Gets the current gas price from Ethereum using exponential backoff.
/// Gets the current gas price from Ethereum.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
pub async fn get_gas_price(
eth_http_provider: &Provider<Http>,
eth_http_provider_fallback: &Provider<Http>,
) -> Result<U256, ProviderError> {
retry_function(
|| get_gas_price_retryable(eth_http_provider, eth_http_provider_fallback),
DEFAULT_MIN_RETRY_DELAY,
DEFAULT_BACKOFF_FACTOR,
DEFAULT_MAX_RETRIES,
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| {
Expand Down
90 changes: 56 additions & 34 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use std::net::SocketAddr;
use std::sync::Arc;

use aligned_sdk::core::constants::{
ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF, AGGREGATOR_GAS_COST, CANCEL_TRANSACTION_MAX_RETRIES,
CONSTANT_GAS_COST, DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER, DEFAULT_BACKOFF_FACTOR,
DEFAULT_MAX_FEE_PER_PROOF, DEFAULT_MAX_RETRIES, DEFAULT_MIN_RETRY_DELAY,
GAS_PRICE_PERCENTAGE_MULTIPLIER, MIN_FEE_PER_PROOF, PERCENTAGE_DIVIDER,
RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER,
ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF, AGGREGATOR_GAS_COST, BUMP_BACKOFF_FACTOR,
BUMP_MAX_RETRIES, BUMP_MAX_RETRY_DELAY, BUMP_MIN_RETRY_DELAY, CONSTANT_GAS_COST,
DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER, DEFAULT_MAX_FEE_PER_PROOF,
ETHEREUM_CALL_BACKOFF_FACTOR, ETHEREUM_CALL_MAX_RETRIES, ETHEREUM_CALL_MAX_RETRY_DELAY,
ETHEREUM_CALL_MIN_RETRY_DELAY, GAS_PRICE_PERCENTAGE_MULTIPLIER, MIN_FEE_PER_PROOF,
PERCENTAGE_DIVIDER, RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER,
};
use aligned_sdk::core::types::{
ClientMessage, GetNonceResponseMessage, NoncedVerificationData, ProofInvalidReason,
Expand Down Expand Up @@ -271,15 +272,19 @@ impl Batcher {
Ok(())
}

/// Listen for Ethereum new blocks.
/// Retries on recoverable errors using exponential backoff
/// with the maximum number of retries and a `MAX_DELAY` of 1 hour.
pub async fn listen_new_blocks(self: Arc<Self>) -> Result<(), BatcherError> {
retry_function(
|| {
let app = self.clone();
async move { app.listen_new_blocks_retryable().await }
},
DEFAULT_MIN_RETRY_DELAY,
DEFAULT_BACKOFF_FACTOR,
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
LISTEN_NEW_BLOCKS_MAX_TIMES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| e.inner())
Expand Down Expand Up @@ -903,7 +908,9 @@ impl Batcher {
}
}

/// Gets the user nonce from Ethereum using exponential backoff.
/// Gets the user nonce from Ethereum.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
async fn get_user_nonce_from_ethereum(
&self,
addr: Address,
Expand All @@ -916,9 +923,10 @@ impl Batcher {
addr,
)
},
DEFAULT_MIN_RETRY_DELAY,
DEFAULT_BACKOFF_FACTOR,
DEFAULT_MAX_RETRIES,
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
}
Expand Down Expand Up @@ -1358,6 +1366,10 @@ impl Batcher {
}
}

/// Sends a `create_new_task` transaction to Ethereum and waits for a maximum of 3 blocks for the receipt.
/// Retries up to `ETHEREUM_CALL_MAX_RETRIES` times using exponential backoff on recoverable errors while trying to send the transaction:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
/// `ReceiptNotFoundError` is treated as non-recoverable, and the transaction will be canceled using `cancel_create_new_task_tx` in that case.
async fn create_new_task(
&self,
batch_merkle_root: [u8; 32],
Expand All @@ -1377,9 +1389,10 @@ impl Batcher {
&self.payment_service_fallback,
)
},
DEFAULT_MIN_RETRY_DELAY,
DEFAULT_BACKOFF_FACTOR,
DEFAULT_MAX_RETRIES,
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await;
match result {
Expand All @@ -1403,11 +1416,10 @@ impl Batcher {
}

/// Sends a transaction to Ethereum with the same nonce as the previous one to override it.
/// In case of a recoverable error, it will retry with an exponential backoff up to CANCEL_TRANSACTION_MAX_RETRIES times.
/// A tx not included in 3 blocks will be considered an error, and will trigger a bump of the fee, with the rules on ```calculate_bumped_gas_price```
/// This will do 5 bumps every 3 blocks, and then the exponential backoff will dominate, doing bumps at 8,13,24,45,89 and so on.
/// Errors on ```get_gas_price``` calls inside this function are considered transient,
/// so they won't stop the retries.
/// Retries on recoverable errors with exponential backoff.
/// Bumps the fee if not included in 3 blocks, using `calculate_bumped_gas_price`.
/// In the first 5 attemps, bumps the fee every 3 blocks. Then exponential backoff takes over.
/// After 2 hours (attempt 13), retries occur hourly for 1 day (33 retries).
pub async fn cancel_create_new_task_tx(&self, old_tx_gas_price: U256) {
info!("Cancelling createNewTask transaction...");
let iteration = Arc::new(Mutex::new(0));
Expand Down Expand Up @@ -1442,9 +1454,10 @@ impl Batcher {
)
.await
},
DEFAULT_MIN_RETRY_DELAY,
DEFAULT_BACKOFF_FACTOR,
CANCEL_TRANSACTION_MAX_RETRIES,
BUMP_MIN_RETRY_DELAY,
BUMP_BACKOFF_FACTOR,
BUMP_MAX_RETRIES,
BUMP_MAX_RETRY_DELAY,
)
.await
{
Expand Down Expand Up @@ -1537,7 +1550,9 @@ impl Batcher {
Ok(())
}

/// Gets the balance of user with address `addr` from Ethereum using exponential backoff.
/// Gets the balance of user with address `addr` from Ethereum.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs)
/// Returns `None` if the balance couldn't be returned
/// FIXME: This should return a `Result` instead.
async fn get_user_balance(&self, addr: &Address) -> Option<U256> {
Expand All @@ -1549,15 +1564,18 @@ impl Batcher {
addr,
)
},
DEFAULT_MIN_RETRY_DELAY,
DEFAULT_BACKOFF_FACTOR,
DEFAULT_MAX_RETRIES,
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.ok()
}

/// Checks if the user's balance is unlocked for a given address using exponential backoff.
/// Checks if the user's balance is unlocked for a given address.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
/// Returns `false` if an error occurs during the retries.
async fn user_balance_is_unlocked(&self, addr: &Address) -> bool {
let Ok(unlocked) = retry_function(
Expand All @@ -1568,9 +1586,10 @@ impl Batcher {
addr,
)
},
DEFAULT_MIN_RETRY_DELAY,
DEFAULT_BACKOFF_FACTOR,
DEFAULT_MAX_RETRIES,
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
else {
Expand All @@ -1580,7 +1599,9 @@ impl Batcher {
unlocked
}

/// Uploads the batch to s3 using exponential backoff.
/// Uploads the batch to s3.
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
async fn upload_batch_to_s3(
&self,
batch_bytes: &[u8],
Expand All @@ -1595,9 +1616,10 @@ impl Batcher {
&self.s3_bucket_name,
)
},
DEFAULT_MIN_RETRY_DELAY,
DEFAULT_BACKOFF_FACTOR,
DEFAULT_MAX_RETRIES,
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| BatcherError::BatchUploadError(e.to_string()))
Expand Down
2 changes: 1 addition & 1 deletion batcher/aligned-batcher/src/retry/batcher_retryables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub async fn create_new_task_retryable(
})?
.map_err(|e| {
warn!("Error while waiting for batch inclusion: {e}");
RetryError::Transient(BatcherError::TransactionSendError(e.to_string()))
RetryError::Permanent(BatcherError::ReceiptNotFoundError)
})?
.ok_or(RetryError::Permanent(BatcherError::ReceiptNotFoundError))
}
Expand Down
7 changes: 5 additions & 2 deletions batcher/aligned-batcher/src/retry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ impl<E> RetryError<E> {

impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {}

// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function
/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function
/// Runs with `jitter: false`.
pub async fn retry_function<FutureFn, Fut, T, E>(
function: FutureFn,
min_delay: u64,
factor: f32,
max_times: usize,
max_delay: u64,
) -> Result<T, RetryError<E>>
where
Fut: Future<Output = Result<T, RetryError<E>>>,
Expand All @@ -44,7 +46,8 @@ where
let backoff = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(min_delay))
.with_max_times(max_times)
.with_factor(factor);
.with_factor(factor)
.with_max_delay(Duration::from_secs(max_delay));

function
.retry(backoff)
Expand Down
16 changes: 11 additions & 5 deletions batcher/aligned-sdk/src/core/constants.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/// Batcher ///
pub const GAS_PRICE_INCREMENT_PERCENTAGE_PER_ITERATION: usize = 5;
pub const CANCEL_TRANSACTION_MAX_RETRIES: usize = 20;
pub const AGGREGATOR_GAS_COST: u128 = 400_000;
pub const BATCHER_SUBMISSION_BASE_GAS_COST: u128 = 125_000;
pub const ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF: u128 = 13_000;
Expand All @@ -27,7 +26,14 @@ pub const MAX_FEE_BATCH_PROOF_NUMBER: usize = 32;
/// This corresponds to the number of proofs to compute for a default max_fee.
pub const MAX_FEE_DEFAULT_PROOF_NUMBER: usize = 10;

/// retry
pub const DEFAULT_MIN_RETRY_DELAY: u64 = 500; // milliseconds
pub const DEFAULT_MAX_RETRIES: usize = 5;
pub const DEFAULT_BACKOFF_FACTOR: f32 = 2.0;
/// Ethereum calls retry constants
pub const ETHEREUM_CALL_MIN_RETRY_DELAY: u64 = 500; // milliseconds
pub const ETHEREUM_CALL_MAX_RETRIES: usize = 5;
pub const ETHEREUM_CALL_BACKOFF_FACTOR: f32 = 2.0;
pub const ETHEREUM_CALL_MAX_RETRY_DELAY: u64 = 3600; // seconds

/// Ethereum transaction retry constants
pub const BUMP_MIN_RETRY_DELAY: u64 = 500; // milliseconds
pub const BUMP_MAX_RETRIES: usize = 33; // ~ 1 day
pub const BUMP_BACKOFF_FACTOR: f32 = 2.0;
pub const BUMP_MAX_RETRY_DELAY: u64 = 3600; // seconds