-
Notifications
You must be signed in to change notification settings - Fork 301
feat(fortuna): better retry mechanism #2780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,11 +4,15 @@ use { | |
backoff::ExponentialBackoff, | ||
ethabi::ethereum_types::U64, | ||
ethers::{ | ||
contract::ContractCall, | ||
contract::{ContractCall, ContractError}, | ||
middleware::Middleware, | ||
types::{TransactionReceipt, U256}, | ||
providers::ProviderError, | ||
types::{transaction::eip2718::TypedTransaction, TransactionReceipt, U256}, | ||
}, | ||
std::{ | ||
fmt::Display, | ||
sync::{atomic::AtomicU64, Arc}, | ||
}, | ||
std::sync::{atomic::AtomicU64, Arc}, | ||
tokio::time::{timeout, Duration}, | ||
tracing, | ||
}; | ||
|
@@ -156,7 +160,9 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>( | |
call: ContractCall<T, ()>, | ||
gas_limit: U256, | ||
escalation_policy: EscalationPolicy, | ||
) -> Result<SubmitTxResult> { | ||
error_mapper: impl Fn(u64, backoff::Error<SubmitTxError<T>>) -> backoff::Error<SubmitTxError<T>> | ||
+ Copy, | ||
) -> Result<SubmitTxResult, SubmitTxError<T>> { | ||
let start_time = std::time::Instant::now(); | ||
|
||
tracing::info!("Started processing event"); | ||
|
@@ -176,14 +182,15 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>( | |
|
||
let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries); | ||
let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries); | ||
submit_tx( | ||
let result = submit_tx( | ||
middleware.clone(), | ||
&call, | ||
padded_gas_limit, | ||
gas_multiplier_pct, | ||
fee_multiplier_pct, | ||
) | ||
.await | ||
.await; | ||
result.map_err(|e| error_mapper(num_retries, e)) | ||
|
||
}, | ||
|e, dur| { | ||
let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed); | ||
|
@@ -210,6 +217,51 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>( | |
}) | ||
} | ||
|
||
pub enum SubmitTxError<T: Middleware + NonceManaged + 'static> { | ||
GasUsageEstimateError(ContractError<T>), | ||
GasLimitExceeded { estimate: U256, limit: U256 }, | ||
GasPriceEstimateError(<T as Middleware>::Error), | ||
SubmissionError(TypedTransaction, <T as Middleware>::Error), | ||
ConfirmationTimeout(TypedTransaction), | ||
ConfirmationError(TypedTransaction, ProviderError), | ||
ReceiptError(TypedTransaction, TransactionReceipt), | ||
} | ||
|
||
impl<T> Display for SubmitTxError<T> | ||
where | ||
T: Middleware + NonceManaged + 'static, | ||
{ | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
match self { | ||
SubmitTxError::GasUsageEstimateError(e) => { | ||
write!(f, "Error estimating gas for reveal: {:?}", e) | ||
} | ||
SubmitTxError::GasLimitExceeded { estimate, limit } => write!( | ||
f, | ||
"Gas estimate for reveal with callback is higher than the gas limit {} > {}", | ||
estimate, limit | ||
), | ||
SubmitTxError::GasPriceEstimateError(e) => write!(f, "Gas price estimate error: {}", e), | ||
SubmitTxError::SubmissionError(tx, e) => write!( | ||
f, | ||
"Error submitting the reveal transaction. Tx:{:?}, Error:{:?}", | ||
tx, e | ||
), | ||
SubmitTxError::ConfirmationTimeout(tx) => { | ||
write!(f, "Tx stuck in mempool. Resetting nonce. Tx:{:?}", tx) | ||
} | ||
SubmitTxError::ConfirmationError(tx, e) => write!( | ||
f, | ||
"Error waiting for transaction receipt. Tx:{:?} Error:{:?}", | ||
tx, e | ||
), | ||
SubmitTxError::ReceiptError(tx, _) => { | ||
write!(f, "Reveal transaction reverted on-chain. Tx:{:?}", tx,) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Submit a transaction to the blockchain. It estimates the gas for the transaction, | ||
/// pads both the gas and fee estimates using the provided multipliers, and submits the transaction. | ||
/// It will return a permanent or transient error depending on the error type and whether | ||
|
@@ -221,24 +273,23 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>( | |
// A value of 100 submits the tx with the same gas/fee as the estimate. | ||
gas_estimate_multiplier_pct: u64, | ||
fee_estimate_multiplier_pct: u64, | ||
) -> Result<TransactionReceipt, backoff::Error<anyhow::Error>> { | ||
) -> Result<TransactionReceipt, backoff::Error<SubmitTxError<T>>> { | ||
let gas_estimate_res = call.estimate_gas().await; | ||
|
||
let gas_estimate = gas_estimate_res.map_err(|e| { | ||
// we consider the error transient even if it is a contract revert since | ||
// it can be because of routing to a lagging RPC node. Retrying such errors will | ||
// incur a few additional RPC calls, but it is fine. | ||
backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e)) | ||
backoff::Error::transient(SubmitTxError::GasUsageEstimateError(e)) | ||
})?; | ||
|
||
// The gas limit on the simulated transaction is the maximum expected tx gas estimate, | ||
// but we are willing to pad the gas a bit to ensure reliable submission. | ||
if gas_estimate > gas_limit { | ||
return Err(backoff::Error::permanent(anyhow!( | ||
"Gas estimate for reveal with callback is higher than the gas limit {} > {}", | ||
gas_estimate, | ||
gas_limit | ||
))); | ||
return Err(backoff::Error::permanent(SubmitTxError::GasLimitExceeded { | ||
estimate: gas_estimate, | ||
limit: gas_limit, | ||
})); | ||
} | ||
|
||
// Pad the gas estimate after checking it against the simulation gas limit. | ||
|
@@ -247,13 +298,11 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>( | |
let call = call.clone().gas(gas_estimate); | ||
let mut transaction = call.tx.clone(); | ||
|
||
// manually fill the tx with the gas info, so we can log the details in case of error | ||
// manually fill the tx with the gas price info, so we can log the details in case of error | ||
client | ||
.fill_transaction(&mut transaction, None) | ||
.await | ||
.map_err(|e| { | ||
backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e)) | ||
})?; | ||
.map_err(|e| backoff::Error::transient(SubmitTxError::GasPriceEstimateError(e)))?; | ||
|
||
// Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle | ||
// in the client that sets the gas price. | ||
|
@@ -271,11 +320,7 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>( | |
.send_transaction(transaction.clone(), None) | ||
.await | ||
.map_err(|e| { | ||
backoff::Error::transient(anyhow!( | ||
"Error submitting the reveal transaction. Tx:{:?}, Error:{:?}", | ||
transaction, | ||
e | ||
)) | ||
backoff::Error::transient(SubmitTxError::SubmissionError(transaction.clone(), e)) | ||
})?; | ||
|
||
let reset_nonce = || { | ||
|
@@ -292,34 +337,24 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>( | |
// in this case ethers internal polling will not reduce the number of retries | ||
// and keep retrying indefinitely. So we set a manual timeout here and reset the nonce. | ||
reset_nonce(); | ||
backoff::Error::transient(anyhow!( | ||
"Tx stuck in mempool. Resetting nonce. Tx:{:?}", | ||
transaction | ||
)) | ||
backoff::Error::transient(SubmitTxError::ConfirmationTimeout(transaction.clone())) | ||
})?; | ||
|
||
let receipt = pending_receipt | ||
.map_err(|e| { | ||
backoff::Error::transient(anyhow!( | ||
"Error waiting for transaction receipt. Tx:{:?} Error:{:?}", | ||
transaction, | ||
e | ||
)) | ||
backoff::Error::transient(SubmitTxError::ConfirmationError(transaction.clone(), e)) | ||
})? | ||
.ok_or_else(|| { | ||
// RPC may not return an error on tx submission if the nonce is too high. | ||
// But we will never get a receipt. So we reset the nonce manager to get the correct nonce. | ||
reset_nonce(); | ||
backoff::Error::transient(anyhow!( | ||
"Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}", | ||
transaction | ||
)) | ||
backoff::Error::transient(SubmitTxError::ConfirmationTimeout(transaction.clone())) | ||
})?; | ||
|
||
if receipt.status == Some(U64::from(0)) { | ||
return Err(backoff::Error::transient(anyhow!( | ||
"Reveal transaction reverted on-chain. Tx:{:?}", | ||
transaction | ||
return Err(backoff::Error::transient(SubmitTxError::ReceiptError( | ||
transaction.clone(), | ||
receipt.clone(), | ||
))); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,14 @@ | ||
use { | ||
super::keeper_metrics::AccountLabel, | ||
crate::{ | ||
chain::reader::RequestedWithCallbackEvent, | ||
eth_utils::utils::submit_tx_with_backoff, | ||
chain::{ethereum::PythRandomErrorsErrors, reader::RequestedWithCallbackEvent}, | ||
eth_utils::utils::{submit_tx_with_backoff, SubmitTxError}, | ||
history::{RequestEntryState, RequestStatus}, | ||
keeper::block::ProcessParams, | ||
}, | ||
anyhow::{anyhow, Result}, | ||
ethers::{abi::AbiDecode, contract::ContractError}, | ||
std::time::Duration, | ||
tracing, | ||
}; | ||
|
||
|
@@ -74,12 +76,40 @@ pub async fn process_event_with_backoff( | |
event.user_random_number, | ||
provider_revelation, | ||
); | ||
let error_mapper = |num_retries, e| { | ||
if let backoff::Error::Transient { | ||
err: SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert)), | ||
.. | ||
} = &e | ||
{ | ||
if let Ok(PythRandomErrorsErrors::NoSuchRequest(_)) = | ||
PythRandomErrorsErrors::decode(revert) | ||
{ | ||
let err = | ||
SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert.clone())); | ||
// Slow down the retries if the request is not found. | ||
// This probably means that the request is already fulfilled via another process. | ||
// After 5 retries, we return the error permanently. | ||
if num_retries >= 5 { | ||
return backoff::Error::Permanent(err); | ||
} | ||
if 1 < num_retries && num_retries < 5 { | ||
return backoff::Error::Transient { | ||
err, | ||
retry_after: Some(Duration::from_secs(60)), | ||
|
||
}; | ||
} | ||
} | ||
} | ||
e | ||
}; | ||
|
||
let success = submit_tx_with_backoff( | ||
contract.client(), | ||
contract_call, | ||
gas_limit, | ||
escalation_policy, | ||
error_mapper, | ||
) | ||
.await; | ||
|
||
|
@@ -160,16 +190,45 @@ pub async fn process_event_with_backoff( | |
.get_request(event.provider_address, event.sequence_number) | ||
.await; | ||
|
||
tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved this inside because it was creating a bunch of false alarms. |
||
|
||
// We only count failures for cases where we are completely certain that the callback failed. | ||
if req.is_ok_and(|x| x.is_some()) { | ||
if req.as_ref().is_ok_and(|x| x.is_some()) { | ||
tracing::error!("Failed to process event: {}. Request: {:?}", e, req); | ||
metrics | ||
.requests_processed_failure | ||
.get_or_create(&account_label) | ||
.inc(); | ||
// Do not display the internal error, it might include RPC details. | ||
let reason = match e { | ||
SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert)) => { | ||
format!("Reverted: {}", revert) | ||
} | ||
SubmitTxError::GasLimitExceeded { limit, estimate } => format!( | ||
"Gas limit exceeded: limit = {}, estimate = {}", | ||
limit, estimate | ||
), | ||
SubmitTxError::GasUsageEstimateError(_) => { | ||
"Unable to estimate gas usage".to_string() | ||
} | ||
SubmitTxError::GasPriceEstimateError(_) => { | ||
"Unable to estimate gas price".to_string() | ||
} | ||
SubmitTxError::SubmissionError(_, _) => { | ||
"Error submitting the transaction on-chain".to_string() | ||
} | ||
SubmitTxError::ConfirmationTimeout(tx) => format!( | ||
"Transaction was submitted, but never confirmed. Hash: {}", | ||
tx.sighash() | ||
), | ||
SubmitTxError::ConfirmationError(tx, _) => format!( | ||
"Transaction was submitted, but never confirmed. Hash: {}", | ||
tx.sighash() | ||
), | ||
SubmitTxError::ReceiptError(tx, _) => { | ||
format!("Reveal transaction failed on-chain. Hash: {}", tx.sighash()) | ||
} | ||
}; | ||
status.state = RequestEntryState::Failed { | ||
reason: format!("Error revealing: {:?}", e), | ||
reason, | ||
provider_random_number: Some(provider_revelation), | ||
}; | ||
history.add(&status); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you leave a comment that this lets you customize the backoff behavior based on the error type? It's not obvious what you get from this at the moment