Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "7.6.3"
version = "7.6.4"
edition = "2021"

[lib]
Expand Down
4 changes: 3 additions & 1 deletion apps/fortuna/src/chain/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use {
// contract in the same repo.
abigen!(
PythRandom,
"../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json"
"../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json";
PythRandomErrors,
"../../target_chains/ethereum/entropy_sdk/solidity/abis/EntropyErrors.json"
);

pub type MiddlewaresWrapper<T> = LegacyTxMiddleware<
Expand Down
118 changes: 80 additions & 38 deletions apps/fortuna/src/eth_utils/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ use {
backoff::ExponentialBackoff,
ethabi::ethereum_types::U64,
ethers::{
contract::ContractCall,
contract::{ContractCall, ContractError},
middleware::Middleware,
types::{TransactionReceipt, U256},
providers::ProviderError,
types::{transaction::eip2718::TypedTransaction, TransactionReceipt, U256},
},
std::{
fmt::Display,
sync::{atomic::AtomicU64, Arc},
},
std::sync::{atomic::AtomicU64, Arc},
tokio::time::{timeout, Duration},
tracing,
};
Expand Down Expand Up @@ -151,12 +155,17 @@ pub async fn estimate_tx_cost<T: Middleware + 'static>(
/// the transaction exceeds this limit, the transaction is not submitted.
/// Note however that any gas_escalation policy is applied to the estimate, so the actual gas used may exceed the limit.
/// The transaction is retried until it is confirmed on chain or the maximum number of retries is reached.
/// You can pass an `error_mapper` function that will be called on each retry with the number of retries and the error.
/// This lets you customize the backoff behavior based on the error type.
pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
middleware: Arc<T>,
call: ContractCall<T, ()>,
gas_limit: U256,
escalation_policy: EscalationPolicy,
) -> Result<SubmitTxResult> {
error_mapper: Option<
impl Fn(u64, backoff::Error<SubmitTxError<T>>) -> backoff::Error<SubmitTxError<T>>,
>,
) -> Result<SubmitTxResult, SubmitTxError<T>> {
let start_time = std::time::Instant::now();

tracing::info!("Started processing event");
Expand All @@ -176,14 +185,19 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(

let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries);
let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries);
submit_tx(
let result = submit_tx(
middleware.clone(),
&call,
padded_gas_limit,
gas_multiplier_pct,
fee_multiplier_pct,
)
.await
.await;
if let Some(ref mapper) = error_mapper {
result.map_err(|e| mapper(num_retries, e))
} else {
result
}
},
|e, dur| {
let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed);
Expand All @@ -210,6 +224,51 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
})
}

pub enum SubmitTxError<T: Middleware + NonceManaged + 'static> {
GasUsageEstimateError(ContractError<T>),
GasLimitExceeded { estimate: U256, limit: U256 },
GasPriceEstimateError(<T as Middleware>::Error),
SubmissionError(TypedTransaction, <T as Middleware>::Error),
ConfirmationTimeout(TypedTransaction),
ConfirmationError(TypedTransaction, ProviderError),
ReceiptError(TypedTransaction, TransactionReceipt),
}

impl<T> Display for SubmitTxError<T>
where
T: Middleware + NonceManaged + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SubmitTxError::GasUsageEstimateError(e) => {
write!(f, "Error estimating gas for reveal: {:?}", e)
}
SubmitTxError::GasLimitExceeded { estimate, limit } => write!(
f,
"Gas estimate for reveal with callback is higher than the gas limit {} > {}",
estimate, limit
),
SubmitTxError::GasPriceEstimateError(e) => write!(f, "Gas price estimate error: {}", e),
SubmitTxError::SubmissionError(tx, e) => write!(
f,
"Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
tx, e
),
SubmitTxError::ConfirmationTimeout(tx) => {
write!(f, "Tx stuck in mempool. Resetting nonce. Tx:{:?}", tx)
}
SubmitTxError::ConfirmationError(tx, e) => write!(
f,
"Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
tx, e
),
SubmitTxError::ReceiptError(tx, _) => {
write!(f, "Reveal transaction reverted on-chain. Tx:{:?}", tx,)
}
}
}
}

/// Submit a transaction to the blockchain. It estimates the gas for the transaction,
/// pads both the gas and fee estimates using the provided multipliers, and submits the transaction.
/// It will return a permanent or transient error depending on the error type and whether
Expand All @@ -221,24 +280,23 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
// A value of 100 submits the tx with the same gas/fee as the estimate.
gas_estimate_multiplier_pct: u64,
fee_estimate_multiplier_pct: u64,
) -> Result<TransactionReceipt, backoff::Error<anyhow::Error>> {
) -> Result<TransactionReceipt, backoff::Error<SubmitTxError<T>>> {
let gas_estimate_res = call.estimate_gas().await;

let gas_estimate = gas_estimate_res.map_err(|e| {
// we consider the error transient even if it is a contract revert since
// it can be because of routing to a lagging RPC node. Retrying such errors will
// incur a few additional RPC calls, but it is fine.
backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e))
backoff::Error::transient(SubmitTxError::GasUsageEstimateError(e))
})?;

// The gas limit on the simulated transaction is the maximum expected tx gas estimate,
// but we are willing to pad the gas a bit to ensure reliable submission.
if gas_estimate > gas_limit {
return Err(backoff::Error::permanent(anyhow!(
"Gas estimate for reveal with callback is higher than the gas limit {} > {}",
gas_estimate,
gas_limit
)));
return Err(backoff::Error::permanent(SubmitTxError::GasLimitExceeded {
estimate: gas_estimate,
limit: gas_limit,
}));
}

// Pad the gas estimate after checking it against the simulation gas limit.
Expand All @@ -247,13 +305,11 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
let call = call.clone().gas(gas_estimate);
let mut transaction = call.tx.clone();

// manually fill the tx with the gas info, so we can log the details in case of error
// manually fill the tx with the gas price info, so we can log the details in case of error
client
.fill_transaction(&mut transaction, None)
.await
.map_err(|e| {
backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e))
})?;
.map_err(|e| backoff::Error::transient(SubmitTxError::GasPriceEstimateError(e)))?;

// Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle
// in the client that sets the gas price.
Expand All @@ -271,11 +327,7 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
.send_transaction(transaction.clone(), None)
.await
.map_err(|e| {
backoff::Error::transient(anyhow!(
"Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
transaction,
e
))
backoff::Error::transient(SubmitTxError::SubmissionError(transaction.clone(), e))
})?;

let reset_nonce = || {
Expand All @@ -292,34 +344,24 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
// in this case ethers internal polling will not reduce the number of retries
// and keep retrying indefinitely. So we set a manual timeout here and reset the nonce.
reset_nonce();
backoff::Error::transient(anyhow!(
"Tx stuck in mempool. Resetting nonce. Tx:{:?}",
transaction
))
backoff::Error::transient(SubmitTxError::ConfirmationTimeout(transaction.clone()))
})?;

let receipt = pending_receipt
.map_err(|e| {
backoff::Error::transient(anyhow!(
"Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
transaction,
e
))
backoff::Error::transient(SubmitTxError::ConfirmationError(transaction.clone(), e))
})?
.ok_or_else(|| {
// RPC may not return an error on tx submission if the nonce is too high.
// But we will never get a receipt. So we reset the nonce manager to get the correct nonce.
reset_nonce();
backoff::Error::transient(anyhow!(
"Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}",
transaction
))
backoff::Error::transient(SubmitTxError::ConfirmationTimeout(transaction.clone()))
})?;

if receipt.status == Some(U64::from(0)) {
return Err(backoff::Error::transient(anyhow!(
"Reveal transaction reverted on-chain. Tx:{:?}",
transaction
return Err(backoff::Error::transient(SubmitTxError::ReceiptError(
transaction.clone(),
receipt.clone(),
)));
}

Expand Down
74 changes: 68 additions & 6 deletions apps/fortuna/src/keeper/process_event.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use {
super::keeper_metrics::AccountLabel,
crate::{
chain::reader::RequestedWithCallbackEvent,
eth_utils::utils::submit_tx_with_backoff,
chain::{ethereum::PythRandomErrorsErrors, reader::RequestedWithCallbackEvent},
eth_utils::utils::{submit_tx_with_backoff, SubmitTxError},
history::{RequestEntryState, RequestStatus},
keeper::block::ProcessParams,
},
anyhow::{anyhow, Result},
ethers::{abi::AbiDecode, contract::ContractError},
std::time::Duration,
tracing,
};

Expand Down Expand Up @@ -74,12 +76,43 @@ pub async fn process_event_with_backoff(
event.user_random_number,
provider_revelation,
);
let error_mapper = |num_retries, e| {
if let backoff::Error::Transient {
err: SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert)),
..
} = &e
{
if let Ok(PythRandomErrorsErrors::NoSuchRequest(_)) =
PythRandomErrorsErrors::decode(revert)
{
let err =
SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert.clone()));
// Slow down the retries if the request is not found.
// This probably means that the request is already fulfilled via another process.
// After 5 retries, we return the error permanently.
if num_retries >= 5 {
return backoff::Error::Permanent(err);
}
let retry_after_seconds = match num_retries {
0 => 5,
1 => 10,
_ => 60,
};
return backoff::Error::Transient {
err,
retry_after: Some(Duration::from_secs(retry_after_seconds)),
};
}
}
e
};

let success = submit_tx_with_backoff(
contract.client(),
contract_call,
gas_limit,
escalation_policy,
Some(error_mapper),
)
.await;

Expand Down Expand Up @@ -160,16 +193,45 @@ pub async fn process_event_with_backoff(
.get_request(event.provider_address, event.sequence_number)
.await;

tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this inside because it was creating a bunch of false alarms.


// We only count failures for cases where we are completely certain that the callback failed.
if req.is_ok_and(|x| x.is_some()) {
if req.as_ref().is_ok_and(|x| x.is_some()) {
tracing::error!("Failed to process event: {}. Request: {:?}", e, req);
metrics
.requests_processed_failure
.get_or_create(&account_label)
.inc();
// Do not display the internal error, it might include RPC details.
let reason = match e {
SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert)) => {
format!("Reverted: {}", revert)
}
SubmitTxError::GasLimitExceeded { limit, estimate } => format!(
"Gas limit exceeded: limit = {}, estimate = {}",
limit, estimate
),
SubmitTxError::GasUsageEstimateError(_) => {
"Unable to estimate gas usage".to_string()
}
SubmitTxError::GasPriceEstimateError(_) => {
"Unable to estimate gas price".to_string()
}
SubmitTxError::SubmissionError(_, _) => {
"Error submitting the transaction on-chain".to_string()
}
SubmitTxError::ConfirmationTimeout(tx) => format!(
"Transaction was submitted, but never confirmed. Hash: {}",
tx.sighash()
),
SubmitTxError::ConfirmationError(tx, _) => format!(
"Transaction was submitted, but never confirmed. Hash: {}",
tx.sighash()
),
SubmitTxError::ReceiptError(tx, _) => {
format!("Reveal transaction failed on-chain. Hash: {}", tx.sighash())
}
};
status.state = RequestEntryState::Failed {
reason: format!("Error revealing: {:?}", e),
reason,
provider_random_number: Some(provider_revelation),
};
history.add(&status);
Expand Down
Loading