Skip to content

Commit 3e21508

Browse files
committed
feat(fortuna): better retry mechanism
1 parent 3471777 commit 3e21508

File tree

3 files changed

+141
-45
lines changed

3 files changed

+141
-45
lines changed

apps/fortuna/src/chain/ethereum.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use {
3333
// contract in the same repo.
3434
abigen!(
3535
PythRandom,
36-
"../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json"
36+
"../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json";
37+
PythRandomErrors,
38+
"../../target_chains/ethereum/entropy_sdk/solidity/abis/EntropyErrors.json"
3739
);
3840

3941
pub type MiddlewaresWrapper<T> = LegacyTxMiddleware<

apps/fortuna/src/eth_utils/utils.rs

Lines changed: 73 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@ use {
44
backoff::ExponentialBackoff,
55
ethabi::ethereum_types::U64,
66
ethers::{
7-
contract::ContractCall,
7+
contract::{ContractCall, ContractError},
88
middleware::Middleware,
9-
types::{TransactionReceipt, U256},
9+
providers::ProviderError,
10+
types::{transaction::eip2718::TypedTransaction, TransactionReceipt, U256},
11+
},
12+
std::{
13+
fmt::Display,
14+
sync::{atomic::AtomicU64, Arc},
1015
},
11-
std::sync::{atomic::AtomicU64, Arc},
1216
tokio::time::{timeout, Duration},
1317
tracing,
1418
};
@@ -156,7 +160,9 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
156160
call: ContractCall<T, ()>,
157161
gas_limit: U256,
158162
escalation_policy: EscalationPolicy,
159-
) -> Result<SubmitTxResult> {
163+
error_mapper: impl Fn(u64, backoff::Error<SubmitTxError<T>>) -> backoff::Error<SubmitTxError<T>>
164+
+ Copy,
165+
) -> Result<SubmitTxResult, SubmitTxError<T>> {
160166
let start_time = std::time::Instant::now();
161167

162168
tracing::info!("Started processing event");
@@ -176,14 +182,15 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
176182

177183
let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries);
178184
let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries);
179-
submit_tx(
185+
let result = submit_tx(
180186
middleware.clone(),
181187
&call,
182188
padded_gas_limit,
183189
gas_multiplier_pct,
184190
fee_multiplier_pct,
185191
)
186-
.await
192+
.await;
193+
result.map_err(|e| error_mapper(num_retries, e))
187194
},
188195
|e, dur| {
189196
let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed);
@@ -210,6 +217,51 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
210217
})
211218
}
212219

220+
pub enum SubmitTxError<T: Middleware + NonceManaged + 'static> {
221+
GasUsageEstimateError(ContractError<T>),
222+
GasLimitExceeded { estimate: U256, limit: U256 },
223+
GasPriceEstimateError(<T as Middleware>::Error),
224+
SubmissionError(TypedTransaction, <T as Middleware>::Error),
225+
ConfirmationTimeout(TypedTransaction),
226+
ConfirmationError(TypedTransaction, ProviderError),
227+
ReceiptError(TypedTransaction, TransactionReceipt),
228+
}
229+
230+
impl<T> Display for SubmitTxError<T>
231+
where
232+
T: Middleware + NonceManaged + 'static,
233+
{
234+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235+
match self {
236+
SubmitTxError::GasUsageEstimateError(e) => {
237+
write!(f, "Error estimating gas for reveal: {:?}", e)
238+
}
239+
SubmitTxError::GasLimitExceeded { estimate, limit } => write!(
240+
f,
241+
"Gas estimate for reveal with callback is higher than the gas limit {} > {}",
242+
estimate, limit
243+
),
244+
SubmitTxError::GasPriceEstimateError(e) => write!(f, "Gas price estimate error: {}", e),
245+
SubmitTxError::SubmissionError(tx, e) => write!(
246+
f,
247+
"Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
248+
tx, e
249+
),
250+
SubmitTxError::ConfirmationTimeout(tx) => {
251+
write!(f, "Tx stuck in mempool. Resetting nonce. Tx:{:?}", tx)
252+
}
253+
SubmitTxError::ConfirmationError(tx, e) => write!(
254+
f,
255+
"Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
256+
tx, e
257+
),
258+
SubmitTxError::ReceiptError(tx, _) => {
259+
write!(f, "Reveal transaction reverted on-chain. Tx:{:?}", tx,)
260+
}
261+
}
262+
}
263+
}
264+
213265
/// Submit a transaction to the blockchain. It estimates the gas for the transaction,
214266
/// pads both the gas and fee estimates using the provided multipliers, and submits the transaction.
215267
/// It will return a permanent or transient error depending on the error type and whether
@@ -221,24 +273,23 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
221273
// A value of 100 submits the tx with the same gas/fee as the estimate.
222274
gas_estimate_multiplier_pct: u64,
223275
fee_estimate_multiplier_pct: u64,
224-
) -> Result<TransactionReceipt, backoff::Error<anyhow::Error>> {
276+
) -> Result<TransactionReceipt, backoff::Error<SubmitTxError<T>>> {
225277
let gas_estimate_res = call.estimate_gas().await;
226278

227279
let gas_estimate = gas_estimate_res.map_err(|e| {
228280
// we consider the error transient even if it is a contract revert since
229281
// it can be because of routing to a lagging RPC node. Retrying such errors will
230282
// incur a few additional RPC calls, but it is fine.
231-
backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e))
283+
backoff::Error::transient(SubmitTxError::GasUsageEstimateError(e))
232284
})?;
233285

234286
// The gas limit on the simulated transaction is the maximum expected tx gas estimate,
235287
// but we are willing to pad the gas a bit to ensure reliable submission.
236288
if gas_estimate > gas_limit {
237-
return Err(backoff::Error::permanent(anyhow!(
238-
"Gas estimate for reveal with callback is higher than the gas limit {} > {}",
239-
gas_estimate,
240-
gas_limit
241-
)));
289+
return Err(backoff::Error::permanent(SubmitTxError::GasLimitExceeded {
290+
estimate: gas_estimate,
291+
limit: gas_limit,
292+
}));
242293
}
243294

244295
// Pad the gas estimate after checking it against the simulation gas limit.
@@ -247,13 +298,11 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
247298
let call = call.clone().gas(gas_estimate);
248299
let mut transaction = call.tx.clone();
249300

250-
// manually fill the tx with the gas info, so we can log the details in case of error
301+
// manually fill the tx with the gas price info, so we can log the details in case of error
251302
client
252303
.fill_transaction(&mut transaction, None)
253304
.await
254-
.map_err(|e| {
255-
backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e))
256-
})?;
305+
.map_err(|e| backoff::Error::transient(SubmitTxError::GasPriceEstimateError(e)))?;
257306

258307
// Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle
259308
// in the client that sets the gas price.
@@ -271,11 +320,7 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
271320
.send_transaction(transaction.clone(), None)
272321
.await
273322
.map_err(|e| {
274-
backoff::Error::transient(anyhow!(
275-
"Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
276-
transaction,
277-
e
278-
))
323+
backoff::Error::transient(SubmitTxError::SubmissionError(transaction.clone(), e))
279324
})?;
280325

281326
let reset_nonce = || {
@@ -292,34 +337,24 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
292337
// in this case ethers internal polling will not reduce the number of retries
293338
// and keep retrying indefinitely. So we set a manual timeout here and reset the nonce.
294339
reset_nonce();
295-
backoff::Error::transient(anyhow!(
296-
"Tx stuck in mempool. Resetting nonce. Tx:{:?}",
297-
transaction
298-
))
340+
backoff::Error::transient(SubmitTxError::ConfirmationTimeout(transaction.clone()))
299341
})?;
300342

301343
let receipt = pending_receipt
302344
.map_err(|e| {
303-
backoff::Error::transient(anyhow!(
304-
"Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
305-
transaction,
306-
e
307-
))
345+
backoff::Error::transient(SubmitTxError::ConfirmationError(transaction.clone(), e))
308346
})?
309347
.ok_or_else(|| {
310348
// RPC may not return an error on tx submission if the nonce is too high.
311349
// But we will never get a receipt. So we reset the nonce manager to get the correct nonce.
312350
reset_nonce();
313-
backoff::Error::transient(anyhow!(
314-
"Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}",
315-
transaction
316-
))
351+
backoff::Error::transient(SubmitTxError::ConfirmationTimeout(transaction.clone()))
317352
})?;
318353

319354
if receipt.status == Some(U64::from(0)) {
320-
return Err(backoff::Error::transient(anyhow!(
321-
"Reveal transaction reverted on-chain. Tx:{:?}",
322-
transaction
355+
return Err(backoff::Error::transient(SubmitTxError::ReceiptError(
356+
transaction.clone(),
357+
receipt.clone(),
323358
)));
324359
}
325360

apps/fortuna/src/keeper/process_event.rs

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use {
22
super::keeper_metrics::AccountLabel,
33
crate::{
4-
chain::reader::RequestedWithCallbackEvent,
5-
eth_utils::utils::submit_tx_with_backoff,
4+
chain::{ethereum::PythRandomErrorsErrors, reader::RequestedWithCallbackEvent},
5+
eth_utils::utils::{submit_tx_with_backoff, SubmitTxError},
66
history::{RequestEntryState, RequestStatus},
77
keeper::block::ProcessParams,
88
},
99
anyhow::{anyhow, Result},
10+
ethers::{abi::AbiDecode, contract::ContractError},
11+
std::time::Duration,
1012
tracing,
1113
};
1214

@@ -74,12 +76,40 @@ pub async fn process_event_with_backoff(
7476
event.user_random_number,
7577
provider_revelation,
7678
);
79+
let error_mapper = |num_retries, e| {
80+
if let backoff::Error::Transient {
81+
err: SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert)),
82+
..
83+
} = &e
84+
{
85+
if let Ok(PythRandomErrorsErrors::NoSuchRequest(_)) =
86+
PythRandomErrorsErrors::decode(revert)
87+
{
88+
let err =
89+
SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert.clone()));
90+
// Slow down the retries if the request is not found.
91+
// This probably means that the request is already fulfilled via another process.
92+
// After 5 retries, we return the error permanently.
93+
if num_retries >= 5 {
94+
return backoff::Error::Permanent(err);
95+
}
96+
if 1 < num_retries && num_retries < 5 {
97+
return backoff::Error::Transient {
98+
err,
99+
retry_after: Some(Duration::from_secs(60)),
100+
};
101+
}
102+
}
103+
}
104+
e
105+
};
77106

78107
let success = submit_tx_with_backoff(
79108
contract.client(),
80109
contract_call,
81110
gas_limit,
82111
escalation_policy,
112+
error_mapper,
83113
)
84114
.await;
85115

@@ -160,16 +190,45 @@ pub async fn process_event_with_backoff(
160190
.get_request(event.provider_address, event.sequence_number)
161191
.await;
162192

163-
tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req);
164-
165193
// We only count failures for cases where we are completely certain that the callback failed.
166-
if req.is_ok_and(|x| x.is_some()) {
194+
if req.as_ref().is_ok_and(|x| x.is_some()) {
195+
tracing::error!("Failed to process event: {}. Request: {:?}", e, req);
167196
metrics
168197
.requests_processed_failure
169198
.get_or_create(&account_label)
170199
.inc();
200+
// Do not display the internal error, it might include RPC details.
201+
let reason = match e {
202+
SubmitTxError::GasUsageEstimateError(ContractError::Revert(revert)) => {
203+
format!("Reverted: {}", revert)
204+
}
205+
SubmitTxError::GasLimitExceeded { limit, estimate } => format!(
206+
"Gas limit exceeded: limit = {}, estimate = {}",
207+
limit, estimate
208+
),
209+
SubmitTxError::GasUsageEstimateError(_) => {
210+
"Unable to estimate gas usage".to_string()
211+
}
212+
SubmitTxError::GasPriceEstimateError(_) => {
213+
"Unable to estimate gas price".to_string()
214+
}
215+
SubmitTxError::SubmissionError(_, _) => {
216+
"Error submitting the transaction on-chain".to_string()
217+
}
218+
SubmitTxError::ConfirmationTimeout(tx) => format!(
219+
"Transaction was submitted, but never confirmed. Hash: {}",
220+
tx.sighash()
221+
),
222+
SubmitTxError::ConfirmationError(tx, _) => format!(
223+
"Transaction was submitted, but never confirmed. Hash: {}",
224+
tx.sighash()
225+
),
226+
SubmitTxError::ReceiptError(tx, _) => {
227+
format!("Reveal transaction failed on-chain. Hash: {}", tx.sighash())
228+
}
229+
};
171230
status.state = RequestEntryState::Failed {
172-
reason: format!("Error revealing: {:?}", e),
231+
reason,
173232
provider_random_number: Some(provider_revelation),
174233
};
175234
history.add(&status);

0 commit comments

Comments
 (0)