diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index de20434304..d383ffebda 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -393,29 +393,63 @@ dependencies = [ "generic-array", ] +[[package]] +name = "borsh" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15bf3650200d8bffa99015595e10f1fbd17de07abbc25bb067da79e769939bfa" +dependencies = [ + "borsh-derive 0.9.3", + "hashbrown 0.11.2", +] + [[package]] name = "borsh" version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" dependencies = [ - "borsh-derive", + "borsh-derive 0.10.3", "hashbrown 0.12.3", ] +[[package]] +name = "borsh-derive" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6441c552f230375d18e3cc377677914d2ca2b0d36e52129fe15450a2dce46775" +dependencies = [ + "borsh-derive-internal 0.9.3", + "borsh-schema-derive-internal 0.9.3", + "proc-macro-crate 0.1.5", + "proc-macro2", + "syn 1.0.109", +] + [[package]] name = "borsh-derive" version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0754613691538d51f329cce9af41d7b7ca150bc973056f1156611489475f54f7" dependencies = [ - "borsh-derive-internal", - "borsh-schema-derive-internal", + "borsh-derive-internal 0.10.3", + "borsh-schema-derive-internal 0.10.3", "proc-macro-crate 0.1.5", "proc-macro2", "syn 1.0.109", ] +[[package]] +name = "borsh-derive-internal" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5449c28a7b352f2d1e592a8a28bf139bc71afb0764a14f3c02500935d8c44065" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "borsh-derive-internal" version = "0.10.3" @@ -427,6 +461,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "borsh-schema-derive-internal" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdbd5696d8bfa21d53d9fe39a714a18538bad11492a42d066dbbc395fb1951c0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "borsh-schema-derive-internal" version = "0.10.3" @@ -987,6 +1032,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" +[[package]] +name = "dyn-clone" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feeef44e73baff3a26d371801df019877a9866a8c493d315ab00177843314f35" + [[package]] name = "ecdsa" version = "0.16.8" @@ -1503,7 +1554,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "7.4.1" +version = "7.4.2" dependencies = [ "anyhow", "axum", @@ -1753,6 +1804,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2852,16 +2912,30 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "pyth-sdk" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5c805ba3dfb5b7ed6a8ffa62ec38391f485a79c7cf6b3b11d3bd44fb0325824" +dependencies = [ + "borsh 0.9.3", + "borsh-derive 0.9.3", + "hex", + "schemars", + "serde", +] + [[package]] name = "pythnet-sdk" version = "2.3.1" dependencies = [ "bincode", - "borsh", + "borsh 0.10.3", "bytemuck", "byteorder", "fast-math", "hex", + "pyth-sdk", "rustc_version", "serde", "sha3", @@ -3280,6 +3354,30 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "schemars" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c024468a378b7e36765cd36702b7a90cc3cba11654f6685c8f233408e89e92" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1eee588578aff73f856ab961cd2f79e36bc45d7ded33a7562adba4667aecc0e" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.66", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -3386,6 +3484,17 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "serde_json" version = "1.0.107" diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index 2b00121ba8..dccfad3e52 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "7.4.1" +version = "7.4.2" edition = "2021" [lib] diff --git a/apps/fortuna/src/chain/ethereum.rs b/apps/fortuna/src/chain/ethereum.rs index b20f3df1ff..939f8f7cc8 100644 --- a/apps/fortuna/src/chain/ethereum.rs +++ b/apps/fortuna/src/chain/ethereum.rs @@ -41,6 +41,7 @@ pub type MiddlewaresWrapper = LegacyTxMiddleware< EthProviderOracle>, >, >; + pub type SignablePythContractInner = PythRandom>; pub type SignablePythContract = SignablePythContractInner; pub type InstrumentedSignablePythContract = SignablePythContractInner; diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index b3f9fb0887..47ae68a8f5 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -2,6 +2,7 @@ use { crate::{ api::ChainId, chain::reader::{BlockNumber, BlockStatus}, + eth_utils::utils::EscalationPolicy, }, anyhow::{anyhow, Result}, clap::{crate_authors, crate_description, crate_name, crate_version, Args, Parser}, @@ -259,39 +260,15 @@ impl Default for EscalationPolicyConfig { } impl EscalationPolicyConfig { - pub fn get_gas_multiplier_pct(&self, num_retries: u64) -> u64 { - self.apply_escalation_policy( - num_retries, - self.initial_gas_multiplier_pct, - self.gas_multiplier_pct, - self.gas_multiplier_cap_pct, - ) - } - - pub fn get_fee_multiplier_pct(&self, num_retries: u64) -> u64 { - self.apply_escalation_policy( - num_retries, - 100, - self.fee_multiplier_pct, - self.fee_multiplier_cap_pct, - ) - } - - fn apply_escalation_policy( - &self, - num_retries: u64, - initial: u64, - multiplier: u64, - cap: u64, - ) -> u64 { - let mut current = initial; - let mut i = 0; - while i < num_retries && current < cap { - current = current.saturating_mul(multiplier) / 100; - i += 1; + pub fn to_policy(&self) -> EscalationPolicy { + EscalationPolicy { + gas_limit_tolerance_pct: self.gas_limit_tolerance_pct, + initial_gas_multiplier_pct: self.initial_gas_multiplier_pct, + gas_multiplier_pct: self.gas_multiplier_pct, + gas_multiplier_cap_pct: self.gas_multiplier_cap_pct, + fee_multiplier_pct: self.fee_multiplier_pct, + fee_multiplier_cap_pct: self.fee_multiplier_cap_pct, } - - current.min(cap) } } diff --git a/apps/fortuna/src/eth_utils/nonce_manager.rs b/apps/fortuna/src/eth_utils/nonce_manager.rs index a414f1da77..9af0012542 100644 --- a/apps/fortuna/src/eth_utils/nonce_manager.rs +++ b/apps/fortuna/src/eth_utils/nonce_manager.rs @@ -2,8 +2,11 @@ // Copied from: https://github.com/gakonst/ethers-rs/blob/34ed9e372e66235aed7074bc3f5c14922b139242/ethers-middleware/src/nonce_manager.rs use { + super::legacy_tx_middleware::LegacyTxMiddleware, axum::async_trait, + ethers::prelude::GasOracle, ethers::{ + middleware::gas_oracle::GasOracleMiddleware, providers::{Middleware, MiddlewareError, PendingTransaction}, types::{transaction::eip2718::TypedTransaction, *}, }, @@ -72,15 +75,6 @@ where Ok(nonce) } // guard dropped here - /// Resets the initialized flag so the next usage of the manager will reinitialize the nonce - /// based on the chain state. - /// This is useful when the RPC does not return an error if the transaction is submitted with - /// an incorrect nonce. - /// This is the only new method compared to the original NonceManagerMiddleware. - pub fn reset(&self) { - self.initialized.store(false, Ordering::SeqCst); - } - async fn get_transaction_count_with_manager( &self, block: Option, @@ -100,6 +94,33 @@ where } } +pub trait NonceManaged { + fn reset(&self); +} + +impl NonceManaged for NonceManagerMiddleware { + /// Resets the initialized flag so the next usage of the manager will reinitialize the nonce + /// based on the chain state. + /// This is useful when the RPC does not return an error if the transaction is submitted with + /// an incorrect nonce. + /// This is the only new method compared to the original NonceManagerMiddleware. + fn reset(&self) { + self.initialized.store(false, Ordering::SeqCst); + } +} + +impl NonceManaged for GasOracleMiddleware { + fn reset(&self) { + self.inner().reset(); + } +} + +impl NonceManaged for LegacyTxMiddleware { + fn reset(&self) { + self.inner().reset(); + } +} + #[derive(Error, Debug)] /// Thrown when an error happens at the Nonce Manager pub enum NonceManagerError { diff --git a/apps/fortuna/src/eth_utils/utils.rs b/apps/fortuna/src/eth_utils/utils.rs index d0530da655..5934e64239 100644 --- a/apps/fortuna/src/eth_utils/utils.rs +++ b/apps/fortuna/src/eth_utils/utils.rs @@ -1,10 +1,87 @@ use { + crate::eth_utils::nonce_manager::NonceManaged, anyhow::{anyhow, Result}, + backoff::ExponentialBackoff, + ethers::types::TransactionReceipt, + ethers::types::U256, ethers::{contract::ContractCall, middleware::Middleware}, + std::sync::atomic::AtomicU64, std::sync::Arc, + tokio::time::{timeout, Duration}, tracing, }; +const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30; + +#[derive(Debug)] +pub struct SubmitTxResult { + pub num_retries: u64, + pub gas_multiplier: u64, + pub fee_multiplier: u64, + pub duration: Duration, + pub receipt: Result, +} + +#[derive(Clone, Debug)] +pub struct EscalationPolicy { + // The keeper will perform the callback as long as the tx is within this percentage of the configured gas limit. + // Default value is 110, meaning a 10% tolerance over the configured value. + pub gas_limit_tolerance_pct: u64, + + /// The initial gas multiplier to apply to the tx gas estimate + pub initial_gas_multiplier_pct: u64, + + /// The gas multiplier to apply to the tx gas estimate during backoff retries. + /// The gas on each successive retry is multiplied by this value, with the maximum multiplier capped at `gas_multiplier_cap_pct`. + pub gas_multiplier_pct: u64, + /// The maximum gas multiplier to apply to the tx gas estimate during backoff retries. + pub gas_multiplier_cap_pct: u64, + + /// The fee multiplier to apply to the fee during backoff retries. + /// The initial fee is 100% of the estimate (which itself may be padded based on our chain configuration) + /// The fee on each successive retry is multiplied by this value, with the maximum multiplier capped at `fee_multiplier_cap_pct`. + pub fee_multiplier_pct: u64, + pub fee_multiplier_cap_pct: u64, +} + +impl EscalationPolicy { + pub fn get_gas_multiplier_pct(&self, num_retries: u64) -> u64 { + self.apply_escalation_policy( + num_retries, + self.initial_gas_multiplier_pct, + self.gas_multiplier_pct, + self.gas_multiplier_cap_pct, + ) + } + + pub fn get_fee_multiplier_pct(&self, num_retries: u64) -> u64 { + self.apply_escalation_policy( + num_retries, + 100, + self.fee_multiplier_pct, + self.fee_multiplier_cap_pct, + ) + } + + fn apply_escalation_policy( + &self, + num_retries: u64, + initial: u64, + multiplier: u64, + cap: u64, + ) -> u64 { + let mut current = initial; + let mut i = 0; + while i < num_retries && current < cap { + current = current.saturating_mul(multiplier) / 100; + i += 1; + } + + current.min(cap) + } +} + +/// Send a transaction and wait for the receipt to ensure that it was confirmed on chain. pub async fn send_and_confirm(contract_call: ContractCall) -> Result<()> { let call_name = contract_call.function.name.as_str(); let pending_tx = contract_call @@ -64,3 +141,174 @@ pub async fn estimate_tx_cost( Ok(gas_price * gas_used) } + +/// Submit a transaction, retrying on failure according to a configurable backoff policy. +/// The transaction is retried with exponentially increasing delay between retries, and +/// similarly escalating gas and fee multipliers. +/// The gas_limit parameter is the maximum gas that we expect the transaction to use -- if the gas estimate for +/// the transaction exceeds this limit, the transaction is not submitted. +/// Note however that any gas_escalation policy is applied to the estimate, so the actual gas used may exceed the limit. +/// The transaction is retried until it is confirmed on chain or the maximum number of retries is reached. +pub async fn submit_tx_with_backoff( + middleware: Arc, + call: ContractCall, + gas_limit: U256, + escalation_policy: EscalationPolicy, +) -> Result { + let start_time = std::time::Instant::now(); + + tracing::info!("Started processing event"); + let backoff = ExponentialBackoff { + max_elapsed_time: Some(Duration::from_secs(300)), // retry for 5 minutes + ..Default::default() + }; + + let num_retries = Arc::new(AtomicU64::new(0)); + + let success = backoff::future::retry_notify( + backoff, + || async { + let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); + + let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries); + let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries); + submit_tx( + middleware.clone(), + &call, + gas_limit, + gas_multiplier_pct, + fee_multiplier_pct, + ) + .await + }, + |e, dur| { + let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed); + tracing::error!( + "Error on retry {} at duration {:?}: {}", + retry_number, + dur, + e + ); + num_retries.store(retry_number + 1, std::sync::atomic::Ordering::Relaxed); + }, + ) + .await; + + let duration = start_time.elapsed(); + let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); + + Ok(SubmitTxResult { + num_retries, + gas_multiplier: escalation_policy.get_gas_multiplier_pct(num_retries), + fee_multiplier: escalation_policy.get_fee_multiplier_pct(num_retries), + duration, + receipt: success, + }) +} + +/// Submit a transaction to the blockchain. It estimates the gas for the transaction, +/// pads both the gas and fee estimates using the provided multipliers, and submits the transaction. +/// It will return a permanent or transient error depending on the error type and whether +/// retry is possible or not. +pub async fn submit_tx( + client: Arc, + call: &ContractCall, + gas_limit: U256, + // A value of 100 submits the tx with the same gas/fee as the estimate. + gas_estimate_multiplier_pct: u64, + fee_estimate_multiplier_pct: u64, +) -> Result> { + let gas_estimate_res = call.estimate_gas().await; + + let gas_estimate = gas_estimate_res.map_err(|e| { + // we consider the error transient even if it is a contract revert since + // it can be because of routing to a lagging RPC node. Retrying such errors will + // incur a few additional RPC calls, but it is fine. + backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e)) + })?; + + // The gas limit on the simulated transaction is the maximum expected tx gas estimate, + // but we are willing to pad the gas a bit to ensure reliable submission. + if gas_estimate > gas_limit { + return Err(backoff::Error::permanent(anyhow!( + "Gas estimate for reveal with callback is higher than the gas limit {} > {}", + gas_estimate, + gas_limit + ))); + } + + // Pad the gas estimate after checking it against the simulation gas limit. + let gas_estimate = gas_estimate.saturating_mul(gas_estimate_multiplier_pct.into()) / 100; + + let call = call.clone().gas(gas_estimate); + let mut transaction = call.tx.clone(); + + // manually fill the tx with the gas info, so we can log the details in case of error + client + .fill_transaction(&mut transaction, None) + .await + .map_err(|e| { + backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e)) + })?; + + // Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle + // in the client that sets the gas price. + transaction.set_gas_price( + transaction + .gas_price() + .unwrap_or_default() + .saturating_mul(fee_estimate_multiplier_pct.into()) + / 100, + ); + + let pending_tx = client + .send_transaction(transaction.clone(), None) + .await + .map_err(|e| { + backoff::Error::transient(anyhow!( + "Error submitting the reveal transaction. Tx:{:?}, Error:{:?}", + transaction, + e + )) + })?; + + let reset_nonce = || { + client.reset(); + }; + + let pending_receipt = timeout( + Duration::from_secs(TX_CONFIRMATION_TIMEOUT_SECS), + pending_tx, + ) + .await + .map_err(|_| { + // Tx can get stuck in mempool without any progress if the nonce is too high + // in this case ethers internal polling will not reduce the number of retries + // and keep retrying indefinitely. So we set a manual timeout here and reset the nonce. + reset_nonce(); + backoff::Error::transient(anyhow!( + "Tx stuck in mempool. Resetting nonce. Tx:{:?}", + transaction + )) + })?; + + let receipt = pending_receipt + .map_err(|e| { + backoff::Error::transient(anyhow!( + "Error waiting for transaction receipt. Tx:{:?} Error:{:?}", + transaction, + e + )) + })? + .ok_or_else(|| { + // RPC may not return an error on tx submission if the nonce is too high. + // But we will never get a receipt. So we reset the nonce manager to get the correct nonce. + reset_nonce(); + backoff::Error::transient(anyhow!( + "Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}", + transaction + )) + })?; + + Ok(receipt) +} diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 1271c42518..47be7cdf4a 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -89,7 +89,7 @@ pub async fn run_keeper_threads( }, contract.clone(), gas_limit, - chain_eth_config.escalation_policy.clone(), + chain_eth_config.escalation_policy.to_policy(), chain_state.clone(), metrics.clone(), fulfilled_requests_cache.clone(), @@ -116,7 +116,7 @@ pub async fn run_keeper_threads( rx, Arc::clone(&contract), gas_limit, - chain_eth_config.escalation_policy.clone(), + chain_eth_config.escalation_policy.to_policy(), metrics.clone(), fulfilled_requests_cache.clone(), chain_eth_config.block_delays.clone(), diff --git a/apps/fortuna/src/keeper/block.rs b/apps/fortuna/src/keeper/block.rs index 21f7d26188..e2dc882b30 100644 --- a/apps/fortuna/src/keeper/block.rs +++ b/apps/fortuna/src/keeper/block.rs @@ -2,7 +2,7 @@ use { crate::{ api::{self, BlockchainState}, chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber}, - config::EscalationPolicyConfig, + eth_utils::utils::EscalationPolicy, keeper::keeper_metrics::KeeperMetrics, keeper::process_event::process_event_with_backoff, }, @@ -67,7 +67,7 @@ pub async fn process_block_range( block_range: BlockRange, contract: Arc, gas_limit: U256, - escalation_policy: EscalationPolicyConfig, + escalation_policy: EscalationPolicy, chain_state: api::BlockchainState, metrics: Arc, fulfilled_requests_cache: Arc>>, @@ -114,7 +114,7 @@ pub async fn process_single_block_batch( block_range: BlockRange, contract: Arc, gas_limit: U256, - escalation_policy: EscalationPolicyConfig, + escalation_policy: EscalationPolicy, chain_state: api::BlockchainState, metrics: Arc, fulfilled_requests_cache: Arc>>, @@ -290,7 +290,7 @@ pub async fn process_new_blocks( mut rx: mpsc::Receiver, contract: Arc, gas_limit: U256, - escalation_policy: EscalationPolicyConfig, + escalation_policy: EscalationPolicy, metrics: Arc, fulfilled_requests_cache: Arc>>, block_delays: Vec, @@ -339,7 +339,7 @@ pub async fn process_backlog( backlog_range: BlockRange, contract: Arc, gas_limit: U256, - escalation_policy: EscalationPolicyConfig, + escalation_policy: EscalationPolicy, chain_state: BlockchainState, metrics: Arc, fulfilled_requests_cache: Arc>>, diff --git a/apps/fortuna/src/keeper/process_event.rs b/apps/fortuna/src/keeper/process_event.rs index 425f5e0ae1..79459365df 100644 --- a/apps/fortuna/src/keeper/process_event.rs +++ b/apps/fortuna/src/keeper/process_event.rs @@ -3,16 +3,12 @@ use { crate::{ api::BlockchainState, chain::{ethereum::InstrumentedSignablePythContract, reader::RequestedWithCallbackEvent}, - config::EscalationPolicyConfig, + eth_utils::utils::{submit_tx_with_backoff, EscalationPolicy}, }, anyhow::{anyhow, Result}, - backoff::ExponentialBackoff, - ethers::middleware::Middleware, - ethers::signers::Signer, ethers::types::U256, - std::sync::{atomic::AtomicU64, Arc}, - tokio::time::{timeout, Duration}, - tracing::{self, Instrument}, + std::sync::Arc, + tracing, }; /// Process an event with backoff. It will retry the reveal on failure for 5 minutes. @@ -24,10 +20,14 @@ pub async fn process_event_with_backoff( chain_state: BlockchainState, contract: Arc, gas_limit: U256, - escalation_policy: EscalationPolicyConfig, + escalation_policy: EscalationPolicy, metrics: Arc, -) { - let start_time = std::time::Instant::now(); +) -> Result<()> { + // ignore requests that are not for the configured provider + if chain_state.provider_address != event.provider_address { + return Ok(()); + } + let account_label = AccountLabel { chain_id: chain_state.id.clone(), address: chain_state.provider_address.to_string(), @@ -35,54 +35,35 @@ pub async fn process_event_with_backoff( metrics.requests.get_or_create(&account_label).inc(); tracing::info!("Started processing event"); - let backoff = ExponentialBackoff { - max_elapsed_time: Some(Duration::from_secs(300)), // retry for 5 minutes - ..Default::default() - }; - let num_retries = Arc::new(AtomicU64::new(0)); + let provider_revelation = chain_state + .state + .reveal(event.sequence_number) + .map_err(|e| anyhow!("Error revealing: {:?}", e))?; - let success = backoff::future::retry_notify( - backoff, - || async { - let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); + let contract_call = contract.reveal_with_callback( + event.provider_address, + event.sequence_number, + event.user_random_number, + provider_revelation, + ); - let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries); - let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries); - process_event( - &event, - &chain_state, - &contract, - gas_limit.saturating_mul(escalation_policy.gas_limit_tolerance_pct.into()) / 100, - gas_multiplier_pct, - fee_multiplier_pct, - metrics.clone(), - ) - .await - }, - |e, dur| { - let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed); - tracing::error!( - "Error on retry {} at duration {:?}: {}", - retry_number, - dur, - e - ); - num_retries.store(retry_number + 1, std::sync::atomic::Ordering::Relaxed); - }, + let success = submit_tx_with_backoff( + contract.client(), + contract_call, + gas_limit, + escalation_policy, ) .await; - let duration = start_time.elapsed(); - metrics .requests_processed .get_or_create(&account_label) .inc(); match success { - Ok(()) => { - tracing::info!("Processed event successfully in {:?}", duration); + Ok(res) => { + tracing::info!("Processed event successfully in {:?}", res.duration); metrics .requests_processed_success @@ -92,26 +73,42 @@ pub async fn process_event_with_backoff( metrics .request_duration_ms .get_or_create(&account_label) - .observe(duration.as_millis() as f64); + .observe(res.duration.as_millis() as f64); // Track retry count, gas multiplier, and fee multiplier for successful transactions - let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); metrics .retry_count .get_or_create(&account_label) - .observe(num_retries as f64); + .observe(res.num_retries as f64); - let gas_multiplier = escalation_policy.get_gas_multiplier_pct(num_retries); metrics .final_gas_multiplier .get_or_create(&account_label) - .observe(gas_multiplier as f64); + .observe(res.gas_multiplier as f64); - let fee_multiplier = escalation_policy.get_fee_multiplier_pct(num_retries); metrics .final_fee_multiplier .get_or_create(&account_label) - .observe(fee_multiplier as f64); + .observe(res.fee_multiplier as f64); + + if let Ok(receipt) = res.receipt { + if let Some(gas_used) = receipt.gas_used { + let gas_used_float = gas_used.as_u128() as f64 / 1e18; + metrics + .total_gas_spent + .get_or_create(&account_label) + .inc_by(gas_used_float); + + if let Some(gas_price) = receipt.effective_gas_price { + let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18; + metrics + .total_gas_fee_spent + .get_or_create(&account_label) + .inc_by(gas_fee); + } + } + } + metrics.reveals.get_or_create(&account_label).inc(); } Err(e) => { // In case the callback did not succeed, we double-check that the request is still on-chain. @@ -133,176 +130,6 @@ pub async fn process_event_with_backoff( } } } -} - -const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30; - -/// Process a callback on a chain. It estimates the gas for the reveal with callback and -/// submits the transaction if the gas estimate is below the gas limit. -/// It will return a permanent or transient error depending on the error type and whether -/// retry is possible or not. -pub async fn process_event( - event: &RequestedWithCallbackEvent, - chain_config: &BlockchainState, - contract: &InstrumentedSignablePythContract, - gas_limit: U256, - // A value of 100 submits the tx with the same gas/fee as the estimate. - gas_estimate_multiplier_pct: u64, - fee_estimate_multiplier_pct: u64, - metrics: Arc, -) -> Result<(), backoff::Error> { - // ignore requests that are not for the configured provider - if chain_config.provider_address != event.provider_address { - return Ok(()); - } - let provider_revelation = chain_config - .state - .reveal(event.sequence_number) - .map_err(|e| backoff::Error::permanent(anyhow!("Error revealing: {:?}", e)))?; - - let gas_estimate_res = chain_config - .contract - .estimate_reveal_with_callback_gas( - contract.wallet().address(), - event.provider_address, - event.sequence_number, - event.user_random_number, - provider_revelation, - ) - .in_current_span() - .await; - - let gas_estimate = gas_estimate_res.map_err(|e| { - // we consider the error transient even if it is a contract revert since - // it can be because of routing to a lagging RPC node. Retrying such errors will - // incur a few additional RPC calls, but it is fine. - backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e)) - })?; - - // The gas limit on the simulated transaction is the configured gas limit on the chain, - // but we are willing to pad the gas a bit to ensure reliable submission. - if gas_estimate > gas_limit { - return Err(backoff::Error::permanent(anyhow!( - "Gas estimate for reveal with callback is higher than the gas limit {} > {}", - gas_estimate, - gas_limit - ))); - } - - // Pad the gas estimate after checking it against the simulation gas limit, ensuring that - // the padded gas estimate doesn't exceed the maximum amount of gas we are willing to use. - let gas_estimate = gas_estimate.saturating_mul(gas_estimate_multiplier_pct.into()) / 100; - - let contract_call = contract - .reveal_with_callback( - event.provider_address, - event.sequence_number, - event.user_random_number, - provider_revelation, - ) - .gas(gas_estimate); - - let client = contract.client(); - let mut transaction = contract_call.tx.clone(); - - // manually fill the tx with the gas info, so we can log the details in case of error - client - .fill_transaction(&mut transaction, None) - .await - .map_err(|e| { - backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e)) - })?; - - // Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle - // in the client that sets the gas price. - transaction.set_gas_price( - transaction - .gas_price() - .unwrap_or_default() - .saturating_mul(fee_estimate_multiplier_pct.into()) - / 100, - ); - - let pending_tx = client - .send_transaction(transaction.clone(), None) - .await - .map_err(|e| { - backoff::Error::transient(anyhow!( - "Error submitting the reveal transaction. Tx:{:?}, Error:{:?}", - transaction, - e - )) - })?; - - let reset_nonce = || { - let nonce_manager = contract.client_ref().inner().inner(); - nonce_manager.reset(); - }; - - let pending_receipt = timeout( - Duration::from_secs(TX_CONFIRMATION_TIMEOUT_SECS), - pending_tx, - ) - .await - .map_err(|_| { - // Tx can get stuck in mempool without any progress if the nonce is too high - // in this case ethers internal polling will not reduce the number of retries - // and keep retrying indefinitely. So we set a manual timeout here and reset the nonce. - reset_nonce(); - backoff::Error::transient(anyhow!( - "Tx stuck in mempool. Resetting nonce. Tx:{:?}", - transaction - )) - })?; - - let receipt = pending_receipt - .map_err(|e| { - backoff::Error::transient(anyhow!( - "Error waiting for transaction receipt. Tx:{:?} Error:{:?}", - transaction, - e - )) - })? - .ok_or_else(|| { - // RPC may not return an error on tx submission if the nonce is too high. - // But we will never get a receipt. So we reset the nonce manager to get the correct nonce. - reset_nonce(); - backoff::Error::transient(anyhow!( - "Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}", - transaction - )) - })?; - - tracing::info!( - sequence_number = &event.sequence_number, - transaction_hash = &receipt.transaction_hash.to_string(), - gas_used = ?receipt.gas_used, - "Revealed with res: {:?}", - receipt - ); - - let account_label = AccountLabel { - chain_id: chain_config.id.clone(), - address: chain_config.provider_address.to_string(), - }; - - if let Some(gas_used) = receipt.gas_used { - let gas_used_float = gas_used.as_u128() as f64 / 1e18; - metrics - .total_gas_spent - .get_or_create(&account_label) - .inc_by(gas_used_float); - - if let Some(gas_price) = receipt.effective_gas_price { - let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18; - metrics - .total_gas_fee_spent - .get_or_create(&account_label) - .inc_by(gas_fee); - } - } - - metrics.reveals.get_or_create(&account_label).inc(); Ok(()) }