diff --git a/Cargo.lock b/Cargo.lock index 17075d2611..ca85b2f628 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3046,7 +3046,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "7.7.0" +version = "8.0.0" dependencies = [ "anyhow", "axum 0.6.20", @@ -3064,6 +3064,7 @@ dependencies = [ "futures-locks", "hex", "lazy_static", + "num-traits", "once_cell", "prometheus-client", "pythnet-sdk", diff --git a/apps/fortuna/.gitignore b/apps/fortuna/.gitignore index 9fb1d0163d..b978440e2e 100644 --- a/apps/fortuna/.gitignore +++ b/apps/fortuna/.gitignore @@ -3,4 +3,4 @@ *secret* *private-key* .envrc -fortuna.db +fortuna.db* diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index 1c83a6902a..94d1e7a302 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "7.7.0" +version = "8.0.0" edition = "2021" [lib] @@ -47,6 +47,7 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] } thiserror = "1.0.61" futures-locks = "0.7.1" sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono"] } +num-traits = "0.2.19" [dev-dependencies] axum-test = "13.1.1" diff --git a/apps/fortuna/src/api/revelation.rs b/apps/fortuna/src/api/revelation.rs index e7b979413c..26628cb74f 100644 --- a/apps/fortuna/src/api/revelation.rs +++ b/apps/fortuna/src/api/revelation.rs @@ -89,7 +89,9 @@ pub async fn revelation( .ok_or(RestError::NoPendingRequest)?; } None => { - let maybe_request_fut = state.contract.get_request(state.provider_address, sequence); + let maybe_request_fut = state + .contract + .get_request_v2(state.provider_address, sequence); let (maybe_request, current_block_number) = try_join!(maybe_request_fut, current_block_number_fut).map_err(|e| { tracing::error!(chain_id = chain_id, "RPC request failed {}", e); diff --git a/apps/fortuna/src/chain/ethereum.rs b/apps/fortuna/src/chain/ethereum.rs index 57e73fdde5..fe6464ce85 100644 --- a/apps/fortuna/src/chain/ethereum.rs +++ b/apps/fortuna/src/chain/ethereum.rs @@ -265,29 +265,22 @@ impl PythRandom> { #[async_trait] impl EntropyReader for PythRandom> { - async fn get_request( + async fn get_request_v2( &self, provider_address: Address, sequence_number: u64, ) -> Result> { - let r = self - .get_request(provider_address, sequence_number) - // TODO: This doesn't work for lighlink right now. Figure out how to do this in lightlink - // .block(ethers::core::types::BlockNumber::Finalized) + let request = self + .get_request_v2(provider_address, sequence_number) .call() .await?; - - // sequence_number == 0 means the request does not exist. - if r.sequence_number != 0 { - Ok(Some(reader::Request { - provider: r.provider, - sequence_number: r.sequence_number, - block_number: r.block_number, - use_blockhash: r.use_blockhash, - })) - } else { - Ok(None) - } + Ok(Some(reader::Request { + provider: request.provider, + sequence_number: request.sequence_number, + block_number: request.block_number, + use_blockhash: request.use_blockhash, + callback_status: reader::RequestCallbackStatus::try_from(request.callback_status)?, + })) } async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result { diff --git a/apps/fortuna/src/chain/reader.rs b/apps/fortuna/src/chain/reader.rs index 8e1c3e6d45..44130f85c3 100644 --- a/apps/fortuna/src/chain/reader.rs +++ b/apps/fortuna/src/chain/reader.rs @@ -60,8 +60,11 @@ pub trait EntropyReader: Send + Sync { /// Get an in-flight request (if it exists) /// Note that if we support additional blockchains in the future, the type of `provider` may /// need to become more generic. - async fn get_request(&self, provider: Address, sequence_number: u64) - -> Result>; + async fn get_request_v2( + &self, + provider: Address, + sequence_number: u64, + ) -> Result>; async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result; @@ -93,12 +96,48 @@ pub struct Request { // The block number where this request was created pub block_number: BlockNumber, pub use_blockhash: bool, + pub callback_status: RequestCallbackStatus, +} + +/// Status values for Request.callback_status +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum RequestCallbackStatus { + /// Not a request with callback + CallbackNotNecessary = 0, + /// A request with callback where the callback hasn't been invoked yet + CallbackNotStarted = 1, + /// A request with callback where the callback is currently in flight (this state is a reentry guard) + CallbackInProgress = 2, + /// A request with callback where the callback has been invoked and failed + CallbackFailed = 3, +} + +impl TryFrom for RequestCallbackStatus { + type Error = anyhow::Error; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(RequestCallbackStatus::CallbackNotNecessary), + 1 => Ok(RequestCallbackStatus::CallbackNotStarted), + 2 => Ok(RequestCallbackStatus::CallbackInProgress), + 3 => Ok(RequestCallbackStatus::CallbackFailed), + _ => Err(anyhow::anyhow!("Invalid callback status value: {}", value)), + } + } +} + +impl From for u8 { + fn from(status: RequestCallbackStatus) -> Self { + status as u8 + } } #[cfg(test)] pub mod mock { use { - crate::chain::reader::{BlockNumber, BlockStatus, EntropyReader, Request}, + crate::chain::reader::{ + BlockNumber, BlockStatus, EntropyReader, Request, RequestCallbackStatus, + }, anyhow::Result, axum::async_trait, ethers::types::{Address, U256}, @@ -129,6 +168,7 @@ pub mod mock { sequence_number: s, block_number: b, use_blockhash: u, + callback_status: RequestCallbackStatus::CallbackNotNecessary, }) .collect(), ), @@ -148,6 +188,7 @@ pub mod mock { sequence_number: sequence, block_number, use_blockhash, + callback_status: RequestCallbackStatus::CallbackNotNecessary, }); self } @@ -160,7 +201,7 @@ pub mod mock { #[async_trait] impl EntropyReader for MockEntropyReader { - async fn get_request( + async fn get_request_v2( &self, provider: Address, sequence_number: u64, diff --git a/apps/fortuna/src/command/get_request.rs b/apps/fortuna/src/command/get_request.rs index 1f6e49bd2b..300b6a6896 100644 --- a/apps/fortuna/src/command/get_request.rs +++ b/apps/fortuna/src/command/get_request.rs @@ -14,12 +14,12 @@ pub async fn get_request(opts: &GetRequestOptions) -> Result<()> { &Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?, )?); - let p = contract.get_provider_info(opts.provider).call().await?; + let p = contract.get_provider_info_v2(opts.provider).call().await?; tracing::info!("Found provider: {:?}", p); let r = contract - .get_request(opts.provider, opts.sequence) + .get_request_v2(opts.provider, opts.sequence) .call() .await?; tracing::info!("Found request: {:?}", r); diff --git a/apps/fortuna/src/command/inspect.rs b/apps/fortuna/src/command/inspect.rs index 0840fac59f..98d54910d2 100644 --- a/apps/fortuna/src/command/inspect.rs +++ b/apps/fortuna/src/command/inspect.rs @@ -1,6 +1,6 @@ use { crate::{ - chain::ethereum::{EntropyStructsRequest, PythContract}, + chain::ethereum::{EntropyStructsV2Request, PythContract}, config::{Config, EthereumConfig, InspectOptions}, }, anyhow::Result, @@ -42,7 +42,10 @@ async fn inspect_chain( let contract = PythContract::from_config(chain_config)?; let entropy_provider = contract.get_default_provider().call().await?; - let provider_info = contract.get_provider_info(entropy_provider).call().await?; + let provider_info = contract + .get_provider_info_v2(entropy_provider) + .call() + .await?; let mut current_request_number = provider_info.sequence_number; println!("Initial request number: {current_request_number}"); let last_request_number = current_request_number.saturating_sub(num_requests); @@ -60,12 +63,12 @@ async fn inspect_chain( break; } multicall.add_call( - contract.get_request(entropy_provider, current_request_number), + contract.get_request_v2(entropy_provider, current_request_number), false, ); current_request_number -= 1; } - let return_data: Vec = multicall.call_array().await?; + let return_data: Vec = multicall.call_array().await?; for request in return_data { process_request(rpc_provider.clone(), request).await?; } @@ -75,7 +78,7 @@ async fn inspect_chain( println!("Multicall not deployed in this chain, fetching requests one by one"); while current_request_number > last_request_number { let request = contract - .get_request(entropy_provider, current_request_number) + .get_request_v2(entropy_provider, current_request_number) .call() .await?; process_request(rpc_provider.clone(), request).await?; @@ -90,9 +93,9 @@ async fn inspect_chain( async fn process_request( rpc_provider: Provider, - request: EntropyStructsRequest, + request: EntropyStructsV2Request, ) -> Result<()> { - if request.sequence_number != 0 && request.is_request_with_callback { + if request.sequence_number != 0 && request.callback_status != 0 { let block = rpc_provider .get_block(request.block_number) .await? diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index e8c8309ea2..3b4c8f9a9d 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -248,7 +248,7 @@ async fn setup_chain_state( }); let provider_info = contract - .get_provider_info(*provider) + .get_provider_info_v2(*provider) .call() .await .map_err(|e| anyhow!("Failed to get provider info: {}", e))?; diff --git a/apps/fortuna/src/command/setup_provider.rs b/apps/fortuna/src/command/setup_provider.rs index a0f6ff2737..b1bd37e4eb 100644 --- a/apps/fortuna/src/command/setup_provider.rs +++ b/apps/fortuna/src/command/setup_provider.rs @@ -1,7 +1,7 @@ use { crate::{ api::{get_register_uri, ChainId}, - chain::ethereum::{EntropyStructsProviderInfo, SignablePythContract}, + chain::ethereum::{EntropyStructsV2ProviderInfo, SignablePythContract}, command::register_provider::{register_provider_from_config, CommitmentMetadata}, config::{Config, EthereumConfig, SetupProviderOptions}, state::{HashChainState, PebbleHashChain}, @@ -76,7 +76,10 @@ async fn setup_chain_provider( let contract = Arc::new(SignablePythContract::from_config(chain_config, &private_key).await?); tracing::info!("Fetching provider info"); - let provider_info = contract.get_provider_info(provider_address).call().await?; + let provider_info = contract + .get_provider_info_v2(provider_address) + .call() + .await?; tracing::info!("Provider info: {:?}", provider_info); let mut register = false; @@ -147,7 +150,10 @@ async fn setup_chain_provider( tracing::info!("Registered"); } - let provider_info = contract.get_provider_info(provider_address).call().await?; + let provider_info = contract + .get_provider_info_v2(provider_address) + .call() + .await?; if register || !chain_config.sync_fee_only_on_register { sync_fee(&contract, &provider_info, chain_config.fee) @@ -176,12 +182,16 @@ async fn setup_chain_provider( .in_current_span() .await?; + sync_default_gas_limit(&contract, &provider_info, chain_config.gas_limit) + .in_current_span() + .await?; + Ok(()) } async fn sync_uri( contract: &Arc, - provider_info: &EntropyStructsProviderInfo, + provider_info: &EntropyStructsV2ProviderInfo, uri: String, ) -> Result<()> { let uri_as_bytes: Bytes = AbiBytes::from(uri.as_str()).into(); @@ -201,7 +211,7 @@ async fn sync_uri( async fn sync_fee( contract: &Arc, - provider_info: &EntropyStructsProviderInfo, + provider_info: &EntropyStructsV2ProviderInfo, provider_fee: u128, ) -> Result<()> { if provider_info.fee_in_wei != provider_fee { @@ -220,7 +230,7 @@ async fn sync_fee( async fn sync_fee_manager( contract: &Arc, - provider_info: &EntropyStructsProviderInfo, + provider_info: &EntropyStructsV2ProviderInfo, fee_manager: Address, ) -> Result<()> { if provider_info.fee_manager != fee_manager { @@ -234,7 +244,7 @@ async fn sync_fee_manager( async fn sync_max_num_hashes( contract: &Arc, - provider_info: &EntropyStructsProviderInfo, + provider_info: &EntropyStructsV2ProviderInfo, max_num_hashes: u32, ) -> Result<()> { if provider_info.max_num_hashes != max_num_hashes { @@ -250,3 +260,25 @@ async fn sync_max_num_hashes( } Ok(()) } + +async fn sync_default_gas_limit( + contract: &Arc, + provider_info: &EntropyStructsV2ProviderInfo, + default_gas_limit: u32, +) -> Result<()> { + if provider_info.default_gas_limit != default_gas_limit { + tracing::info!( + "Updating provider default gas limit to {:?}", + default_gas_limit + ); + if let Some(receipt) = contract + .set_default_gas_limit(default_gas_limit) + .send() + .await? + .await? + { + tracing::info!("Updated provider default gas limit to : {:?}", receipt); + } + } + Ok(()) +} diff --git a/apps/fortuna/src/command/withdraw_fees.rs b/apps/fortuna/src/command/withdraw_fees.rs index 8f701823a9..fd69f10af4 100644 --- a/apps/fortuna/src/command/withdraw_fees.rs +++ b/apps/fortuna/src/command/withdraw_fees.rs @@ -58,7 +58,10 @@ pub async fn withdraw_fees_for_chain( retained_balance: u128, ) -> Result<()> { tracing::info!("Fetching fees for provider: {:?}", provider_address); - let provider_info = contract.get_provider_info(provider_address).call().await?; + let provider_info = contract + .get_provider_info_v2(provider_address) + .call() + .await?; let fees = provider_info.accrued_fees_in_wei; tracing::info!("Accrued fees: {} wei", fees); diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index 0d0ae8b2d2..893981bff4 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -151,7 +151,7 @@ pub struct EthereumConfig { pub legacy_tx: bool, /// The gas limit to use for entropy callback transactions. - pub gas_limit: u64, + pub gas_limit: u32, /// The percentage multiplier to apply to priority fee estimates (100 = no change, e.g. 150 = 150% of base fee) #[serde(default = "default_priority_fee_multiplier_pct")] @@ -227,23 +227,6 @@ fn default_backlog_range() -> u64 { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct EscalationPolicyConfig { - // The keeper will perform the callback as long as the tx is within this percentage of the configured gas limit. - // Default value is 110, meaning a 10% tolerance over the configured value. - #[serde(default = "default_gas_limit_tolerance_pct")] - pub gas_limit_tolerance_pct: u64, - - /// The initial gas multiplier to apply to the tx gas estimate - #[serde(default = "default_initial_gas_multiplier_pct")] - pub initial_gas_multiplier_pct: u64, - - /// The gas multiplier to apply to the tx gas estimate during backoff retries. - /// The gas on each successive retry is multiplied by this value, with the maximum multiplier capped at `gas_multiplier_cap_pct`. - #[serde(default = "default_gas_multiplier_pct")] - pub gas_multiplier_pct: u64, - /// The maximum gas multiplier to apply to the tx gas estimate during backoff retries. - #[serde(default = "default_gas_multiplier_cap_pct")] - pub gas_multiplier_cap_pct: u64, - /// The fee multiplier to apply to the fee during backoff retries. /// The initial fee is 100% of the estimate (which itself may be padded based on our chain configuration) /// The fee on each successive retry is multiplied by this value, with the maximum multiplier capped at `fee_multiplier_cap_pct`. @@ -253,22 +236,6 @@ pub struct EscalationPolicyConfig { pub fee_multiplier_cap_pct: u64, } -fn default_gas_limit_tolerance_pct() -> u64 { - 110 -} - -fn default_initial_gas_multiplier_pct() -> u64 { - 125 -} - -fn default_gas_multiplier_pct() -> u64 { - 110 -} - -fn default_gas_multiplier_cap_pct() -> u64 { - 600 -} - fn default_fee_multiplier_pct() -> u64 { 110 } @@ -280,10 +247,6 @@ fn default_fee_multiplier_cap_pct() -> u64 { impl Default for EscalationPolicyConfig { fn default() -> Self { Self { - gas_limit_tolerance_pct: default_gas_limit_tolerance_pct(), - initial_gas_multiplier_pct: default_initial_gas_multiplier_pct(), - gas_multiplier_pct: default_gas_multiplier_pct(), - gas_multiplier_cap_pct: default_gas_multiplier_cap_pct(), fee_multiplier_pct: default_fee_multiplier_pct(), fee_multiplier_cap_pct: default_fee_multiplier_cap_pct(), } @@ -293,10 +256,6 @@ impl Default for EscalationPolicyConfig { impl EscalationPolicyConfig { pub fn to_policy(&self) -> EscalationPolicy { EscalationPolicy { - gas_limit_tolerance_pct: self.gas_limit_tolerance_pct, - initial_gas_multiplier_pct: self.initial_gas_multiplier_pct, - gas_multiplier_pct: self.gas_multiplier_pct, - gas_multiplier_cap_pct: self.gas_multiplier_cap_pct, fee_multiplier_pct: self.fee_multiplier_pct, fee_multiplier_cap_pct: self.fee_multiplier_cap_pct, } diff --git a/apps/fortuna/src/eth_utils/utils.rs b/apps/fortuna/src/eth_utils/utils.rs index df674f2275..00c0c75883 100644 --- a/apps/fortuna/src/eth_utils/utils.rs +++ b/apps/fortuna/src/eth_utils/utils.rs @@ -22,7 +22,6 @@ const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30; #[derive(Debug)] pub struct SubmitTxResult { pub num_retries: u64, - pub gas_multiplier: u64, pub fee_multiplier: u64, pub duration: Duration, pub receipt: TransactionReceipt, @@ -30,19 +29,6 @@ pub struct SubmitTxResult { #[derive(Clone, Debug)] pub struct EscalationPolicy { - // The keeper will perform the callback as long as the tx is within this percentage of the configured gas limit. - // Default value is 110, meaning a 10% tolerance over the configured value. - pub gas_limit_tolerance_pct: u64, - - /// The initial gas multiplier to apply to the tx gas estimate - pub initial_gas_multiplier_pct: u64, - - /// The gas multiplier to apply to the tx gas estimate during backoff retries. - /// The gas on each successive retry is multiplied by this value, with the maximum multiplier capped at `gas_multiplier_cap_pct`. - pub gas_multiplier_pct: u64, - /// The maximum gas multiplier to apply to the tx gas estimate during backoff retries. - pub gas_multiplier_cap_pct: u64, - /// The fee multiplier to apply to the fee during backoff retries. /// The initial fee is 100% of the estimate (which itself may be padded based on our chain configuration) /// The fee on each successive retry is multiplied by this value, with the maximum multiplier capped at `fee_multiplier_cap_pct`. @@ -51,15 +37,6 @@ pub struct EscalationPolicy { } impl EscalationPolicy { - pub fn get_gas_multiplier_pct(&self, num_retries: u64) -> u64 { - self.apply_escalation_policy( - num_retries, - self.initial_gas_multiplier_pct, - self.gas_multiplier_pct, - self.gas_multiplier_cap_pct, - ) - } - pub fn get_fee_multiplier_pct(&self, num_retries: u64) -> u64 { self.apply_escalation_policy( num_retries, @@ -160,7 +137,6 @@ pub async fn estimate_tx_cost( pub async fn submit_tx_with_backoff( middleware: Arc, call: ContractCall, - gas_limit: U256, escalation_policy: EscalationPolicy, error_mapper: Option< impl Fn(u64, backoff::Error>) -> backoff::Error>, @@ -176,23 +152,13 @@ pub async fn submit_tx_with_backoff( let num_retries = Arc::new(AtomicU64::new(0)); - let padded_gas_limit = U256::from(escalation_policy.gas_limit_tolerance_pct) * gas_limit / 100; - let success = backoff::future::retry_notify( backoff, || async { let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); - let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries); let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries); - let result = submit_tx( - middleware.clone(), - &call, - padded_gas_limit, - gas_multiplier_pct, - fee_multiplier_pct, - ) - .await; + let result = submit_tx(middleware.clone(), &call, fee_multiplier_pct).await; if let Some(ref mapper) = error_mapper { result.map_err(|e| mapper(num_retries, e)) } else { @@ -217,7 +183,6 @@ pub async fn submit_tx_with_backoff( Ok(SubmitTxResult { num_retries, - gas_multiplier: escalation_policy.get_gas_multiplier_pct(num_retries), fee_multiplier: escalation_policy.get_fee_multiplier_pct(num_retries), duration, receipt: success, @@ -274,33 +239,9 @@ where pub async fn submit_tx( client: Arc, call: &ContractCall, - gas_limit: U256, - // A value of 100 submits the tx with the same gas/fee as the estimate. - gas_estimate_multiplier_pct: u64, + // A value of 100 submits the tx with the same fee as the estimate. fee_estimate_multiplier_pct: u64, ) -> Result>> { - let gas_estimate_res = call.estimate_gas().await; - - let gas_estimate = gas_estimate_res.map_err(|e| { - // we consider the error transient even if it is a contract revert since - // it can be because of routing to a lagging RPC node. Retrying such errors will - // incur a few additional RPC calls, but it is fine. - backoff::Error::transient(SubmitTxError::GasUsageEstimateError(e)) - })?; - - // The gas limit on the simulated transaction is the maximum expected tx gas estimate, - // but we are willing to pad the gas a bit to ensure reliable submission. - if gas_estimate > gas_limit { - return Err(backoff::Error::permanent(SubmitTxError::GasLimitExceeded { - estimate: gas_estimate, - limit: gas_limit, - })); - } - - // Pad the gas estimate after checking it against the simulation gas limit. - let gas_estimate = gas_estimate.saturating_mul(gas_estimate_multiplier_pct.into()) / 100; - - let call = call.clone().gas(gas_estimate); let mut transaction = call.tx.clone(); // manually fill the tx with the gas price info, so we can log the details in case of error diff --git a/apps/fortuna/src/history.rs b/apps/fortuna/src/history.rs index 03c5251fe8..245e5dea18 100644 --- a/apps/fortuna/src/history.rs +++ b/apps/fortuna/src/history.rs @@ -259,20 +259,19 @@ impl History { let block_number = new_status.request_block_number as i64; let sender: String = new_status.sender.encode_hex(); let user_random_number: String = new_status.user_random_number.encode_hex(); - sqlx::query!("INSERT INTO request(chain_id, network_id, provider, sequence, created_at, last_updated_at, state, request_block_number, request_tx_hash, user_random_number, sender, gas_limit) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - chain_id, - network_id, - provider, - sequence, - new_status.created_at, - new_status.last_updated_at, - "Pending", - block_number, - request_tx_hash, - user_random_number, - sender, - gas_limit - ) + sqlx::query("INSERT INTO request(chain_id, network_id, provider, sequence, created_at, last_updated_at, state, request_block_number, request_tx_hash, user_random_number, sender, gas_limit) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + .bind(chain_id.clone()) + .bind(network_id) + .bind(provider.clone()) + .bind(sequence) + .bind(new_status.created_at) + .bind(new_status.last_updated_at) + .bind("Pending") + .bind(block_number) + .bind(request_tx_hash.clone()) + .bind(user_random_number) + .bind(sender) + .bind(gas_limit.clone()) .execute(pool) .await } @@ -287,17 +286,17 @@ impl History { let reveal_tx_hash: String = reveal_tx_hash.encode_hex(); let provider_random_number: String = provider_random_number.encode_hex(); let gas_used: String = gas_used.to_string(); - let result = sqlx::query!("UPDATE request SET state = ?, last_updated_at = ?, reveal_block_number = ?, reveal_tx_hash = ?, provider_random_number =?, gas_used = ? WHERE network_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ?", - "Completed", - new_status.last_updated_at, - reveal_block_number, - reveal_tx_hash, - provider_random_number, - gas_used, - network_id, - sequence, - provider, - request_tx_hash) + let result = sqlx::query("UPDATE request SET state = ?, last_updated_at = ?, reveal_block_number = ?, reveal_tx_hash = ?, provider_random_number =?, gas_used = ? WHERE network_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ?") + .bind("Completed") + .bind(new_status.last_updated_at) + .bind(reveal_block_number) + .bind(reveal_tx_hash) + .bind(provider_random_number) + .bind(gas_used) + .bind(network_id) + .bind(sequence) + .bind(provider.clone()) + .bind(request_tx_hash.clone()) .execute(pool) .await; if let Ok(query_result) = &result { @@ -313,15 +312,15 @@ impl History { } => { let provider_random_number: Option = provider_random_number .map(|provider_random_number| provider_random_number.encode_hex()); - sqlx::query!("UPDATE request SET state = ?, last_updated_at = ?, info = ?, provider_random_number = ? WHERE network_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ? AND state = 'Pending'", - "Failed", - new_status.last_updated_at, - reason, - provider_random_number, - network_id, - sequence, - provider, - request_tx_hash) + sqlx::query("UPDATE request SET state = ?, last_updated_at = ?, info = ?, provider_random_number = ? WHERE network_id = ? AND sequence = ? AND provider = ? AND request_tx_hash = ? AND state = 'Pending'") + .bind("Failed") + .bind(new_status.last_updated_at) + .bind(reason) + .bind(provider_random_number) + .bind(network_id) + .bind(sequence) + .bind(provider) + .bind(request_tx_hash) .execute(pool) .await } diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 184c6821d4..3ceece09d6 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -82,11 +82,9 @@ pub async fn run_keeper_threads( let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::::new())); // Spawn a thread to handle the events from last backlog_range blocks. - let gas_limit: U256 = chain_eth_config.gas_limit.into(); let process_params = ProcessParams { chain_state: chain_state.clone(), contract: contract.clone(), - gas_limit, escalation_policy: chain_eth_config.escalation_policy.to_policy(), replica_config: keeper_replica_config, metrics: metrics.clone(), @@ -143,14 +141,6 @@ pub async fn run_keeper_threads( chain_state.provider_address, ADJUST_FEE_INTERVAL, chain_eth_config.legacy_tx, - // NOTE: we are adjusting the fees based on the maximum configured gas for user transactions. - // However, the keeper will pad the gas limit for transactions (per the escalation policy) to ensure reliable submission. - // Consequently, fees can be adjusted such that transactions are still unprofitable. - // While we could scale up this value based on the padding, that ends up overcharging users as most transactions cost nowhere - // near the maximum gas limit. - // In the unlikely event that the keeper fees aren't sufficient, the solution to this is to configure the target - // fee percentage to be higher on that specific chain. - chain_eth_config.gas_limit, // NOTE: unwrap() here so we panic early if someone configures these values below -100. u64::try_from(100 + chain_eth_config.min_profit_pct) .expect("min_profit_pct must be >= -100"), diff --git a/apps/fortuna/src/keeper/block.rs b/apps/fortuna/src/keeper/block.rs index 10505166d2..41b93fc6df 100644 --- a/apps/fortuna/src/keeper/block.rs +++ b/apps/fortuna/src/keeper/block.rs @@ -11,12 +11,8 @@ use { }, }, anyhow::Result, - ethers::types::U256, - std::{ - collections::HashSet, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, - }, + std::time::{SystemTime, UNIX_EPOCH}, + std::{collections::HashSet, sync::Arc}, tokio::{ spawn, sync::{mpsc, RwLock}, @@ -43,7 +39,6 @@ pub struct BlockRange { #[derive(Clone)] pub struct ProcessParams { pub contract: Arc, - pub gas_limit: U256, pub escalation_policy: EscalationPolicy, pub chain_state: BlockchainState, pub replica_config: Option, diff --git a/apps/fortuna/src/keeper/commitment.rs b/apps/fortuna/src/keeper/commitment.rs index 53512e21c4..2c5c69e84f 100644 --- a/apps/fortuna/src/keeper/commitment.rs +++ b/apps/fortuna/src/keeper/commitment.rs @@ -38,7 +38,7 @@ pub async fn update_commitments_if_necessary( let latest_safe_block = get_latest_safe_block(chain_state).in_current_span().await; let provider_address = chain_state.provider_address; let provider_info = contract - .get_provider_info(provider_address) + .get_provider_info_v2(provider_address) .block(latest_safe_block) // To ensure we are not revealing sooner than we should .call() .await diff --git a/apps/fortuna/src/keeper/fee.rs b/apps/fortuna/src/keeper/fee.rs index bd9aedeb0a..6d8db20f78 100644 --- a/apps/fortuna/src/keeper/fee.rs +++ b/apps/fortuna/src/keeper/fee.rs @@ -49,7 +49,7 @@ pub async fn withdraw_fees_if_necessary( .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?; let provider_info = contract - .get_provider_info(provider_address) + .get_provider_info_v2(provider_address) .call() .await .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; @@ -80,7 +80,6 @@ pub async fn adjust_fee_wrapper( provider_address: Address, poll_interval: Duration, legacy_tx: bool, - gas_limit: u64, min_profit_pct: u64, target_profit_pct: u64, max_profit_pct: u64, @@ -97,7 +96,6 @@ pub async fn adjust_fee_wrapper( chain_state.id.clone(), provider_address, legacy_tx, - gas_limit, min_profit_pct, target_profit_pct, max_profit_pct, @@ -134,7 +132,6 @@ pub async fn adjust_fee_if_necessary( chain_id: ChainId, provider_address: Address, legacy_tx: bool, - gas_limit: u64, min_profit_pct: u64, target_profit_pct: u64, max_profit_pct: u64, @@ -144,7 +141,7 @@ pub async fn adjust_fee_if_necessary( metrics: Arc, ) -> Result<()> { let provider_info = contract - .get_provider_info(provider_address) + .get_provider_info_v2(provider_address) .call() .await .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; @@ -155,7 +152,8 @@ pub async fn adjust_fee_if_necessary( // Calculate target window for the on-chain fee. let middleware = contract.client(); - let max_callback_cost: u128 = estimate_tx_cost(middleware, legacy_tx, gas_limit.into()) + let gas_limit: u128 = u128::from(provider_info.default_gas_limit); + let max_callback_cost: u128 = estimate_tx_cost(middleware, legacy_tx, gas_limit) .await .map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?; @@ -167,7 +165,7 @@ pub async fn adjust_fee_if_necessary( metrics .gas_price_estimate .get_or_create(&account_label) - .set((max_callback_cost / u128::from(gas_limit)) as f64 / 1e9); + .set((max_callback_cost / gas_limit) as f64 / 1e9); let target_fee_min = std::cmp::max( (max_callback_cost * u128::from(min_profit_pct)) / 100, diff --git a/apps/fortuna/src/keeper/keeper_metrics.rs b/apps/fortuna/src/keeper/keeper_metrics.rs index bc0f69c1e0..3fe4423976 100644 --- a/apps/fortuna/src/keeper/keeper_metrics.rs +++ b/apps/fortuna/src/keeper/keeper_metrics.rs @@ -38,7 +38,6 @@ pub struct KeeperMetrics { pub reveals: Family, pub request_duration_ms: Family, pub retry_count: Family, - pub final_gas_multiplier: Family, pub final_fee_multiplier: Family, pub gas_price_estimate: Family>, pub highest_revealed_sequence_number: Family, @@ -77,9 +76,6 @@ impl Default for KeeperMetrics { retry_count: Family::new_with_constructor(|| { Histogram::new(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 20.0]) }), - final_gas_multiplier: Family::new_with_constructor(|| { - Histogram::new(vec![100.0, 125.0, 150.0, 200.0, 300.0, 400.0, 500.0, 600.0]) - }), final_fee_multiplier: Family::new_with_constructor(|| { Histogram::new(vec![100.0, 110.0, 120.0, 140.0, 160.0, 180.0, 200.0]) }), @@ -202,12 +198,6 @@ impl KeeperMetrics { keeper_metrics.retry_count.clone(), ); - writable_registry.register( - "final_gas_multiplier", - "Final gas multiplier percentage for successful transactions", - keeper_metrics.final_gas_multiplier.clone(), - ); - writable_registry.register( "final_fee_multiplier", "Final fee multiplier percentage for successful transactions", @@ -310,7 +300,6 @@ impl KeeperMetrics { let _ = self.reveals.get_or_create(&account_label); let _ = self.request_duration_ms.get_or_create(&account_label); let _ = self.retry_count.get_or_create(&account_label); - let _ = self.final_gas_multiplier.get_or_create(&account_label); let _ = self.final_fee_multiplier.get_or_create(&account_label); let _ = self.gas_price_estimate.get_or_create(&account_label); } diff --git a/apps/fortuna/src/keeper/process_event.rs b/apps/fortuna/src/keeper/process_event.rs index 966ca82fc9..7a91253be7 100644 --- a/apps/fortuna/src/keeper/process_event.rs +++ b/apps/fortuna/src/keeper/process_event.rs @@ -1,13 +1,16 @@ use { super::keeper_metrics::AccountLabel, crate::{ - chain::{ethereum::PythRandomErrorsErrors, reader::RequestedWithCallbackEvent}, + chain::{ + ethereum::PythRandomErrorsErrors, + reader::{RequestCallbackStatus, RequestedWithCallbackEvent}, + }, eth_utils::utils::{submit_tx_with_backoff, SubmitTxError}, history::{RequestEntryState, RequestStatus}, keeper::block::ProcessParams, }, anyhow::{anyhow, Result}, - ethers::{abi::AbiDecode, contract::ContractError}, + ethers::{abi::AbiDecode, contract::ContractError, types::U256}, std::time::Duration, tracing, }; @@ -23,7 +26,6 @@ pub async fn process_event_with_backoff( let ProcessParams { chain_state, contract, - gas_limit, escalation_policy, metrics, history, @@ -59,10 +61,18 @@ pub async fn process_event_with_backoff( // If it is, we will process it as a backup replica. match chain_state .contract - .get_request(event.provider_address, event.sequence_number) + .get_request_v2(event.provider_address, event.sequence_number) .await { - Ok(Some(_)) => { + Ok(Some(req)) => { + // If the request is in the CallbackNotStarted state, it means that the primary replica + // has not yet called the callback. We should process it as a backup replica. + if req.callback_status != RequestCallbackStatus::CallbackNotStarted { + tracing::debug!( + "Request already handled by primary replica during delay, skipping" + ); + return Ok(()); + } tracing::info!( delay_seconds = replica_config.backup_delay_seconds, "Request still open after delay, processing as backup replica" @@ -70,7 +80,7 @@ pub async fn process_event_with_backoff( } Ok(None) => { tracing::debug!( - "Request already fulfilled by primary replica during delay, skipping" + "Request already handled by primary replica during delay, skipping" ); return Ok(()); } @@ -103,7 +113,7 @@ pub async fn process_event_with_backoff( sender: event.requestor, user_random_number: event.user_random_number, state: RequestEntryState::Pending, - gas_limit, + gas_limit: U256::from(0), // FIXME(Tejas): set this properly }; history.add(&status); @@ -159,7 +169,6 @@ pub async fn process_event_with_backoff( let success = submit_tx_with_backoff( contract.client(), contract_call, - gas_limit, escalation_policy, Some(error_mapper), ) @@ -206,11 +215,6 @@ pub async fn process_event_with_backoff( .get_or_create(&account_label) .observe(result.num_retries as f64); - metrics - .final_gas_multiplier - .get_or_create(&account_label) - .observe(result.gas_multiplier as f64); - metrics .final_fee_multiplier .get_or_create(&account_label) @@ -239,7 +243,7 @@ pub async fn process_event_with_backoff( // the RPC gave us an error anyway. let req = chain_state .contract - .get_request(event.provider_address, event.sequence_number) + .get_request_v2(event.provider_address, event.sequence_number) .await; // We only count failures for cases where we are completely certain that the callback failed. diff --git a/apps/fortuna/src/keeper/track.rs b/apps/fortuna/src/keeper/track.rs index 271d48b2ef..f8832c5726 100644 --- a/apps/fortuna/src/keeper/track.rs +++ b/apps/fortuna/src/keeper/track.rs @@ -6,6 +6,7 @@ use { }, anyhow::{anyhow, Result}, ethers::{middleware::Middleware, prelude::BlockNumber, providers::Provider, types::Address}, + num_traits::cast::ToPrimitive, std::{ sync::Arc, time::{SystemTime, UNIX_EPOCH}, @@ -86,7 +87,10 @@ pub async fn track_provider( provider_address: Address, metrics: Arc, ) -> Result<()> { - let provider_info = contract.get_provider_info(provider_address).call().await?; + let provider_info = contract + .get_provider_info_v2(provider_address) + .call() + .await?; // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. // The fee is in wei, so we divide by 1e18 to convert it to eth. @@ -155,7 +159,12 @@ pub async fn track_accrued_pyth_fees( // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. // The fee is in wei, so we divide by 1e18 to convert it to eth. - let accrued_pyth_fees = accrued_pyth_fees as f64 / 1e18; + let accrued_pyth_fees = accrued_pyth_fees.to_f64().ok_or_else(|| { + anyhow!( + "Failed to convert accrued_pyth_fees value {:?} to f64", + accrued_pyth_fees + ) + })? / 1e18; metrics .accrued_pyth_fees diff --git a/fortuna.db b/fortuna.db new file mode 100644 index 0000000000..e69de29bb2