diff --git a/apps/fortuna/src/chain.rs b/apps/fortuna/src/chain.rs index a3c6d12df8..21680a6a0b 100644 --- a/apps/fortuna/src/chain.rs +++ b/apps/fortuna/src/chain.rs @@ -1,5 +1,2 @@ -pub(crate) mod eth_gas_oracle; pub(crate) mod ethereum; -mod nonce_manager; pub(crate) mod reader; -pub(crate) mod traced_client; diff --git a/apps/fortuna/src/chain/ethereum.rs b/apps/fortuna/src/chain/ethereum.rs index be626523f0..b20f3df1ff 100644 --- a/apps/fortuna/src/chain/ethereum.rs +++ b/apps/fortuna/src/chain/ethereum.rs @@ -1,29 +1,31 @@ use { crate::{ api::ChainId, - chain::{ + chain::reader::{ + self, BlockNumber, BlockStatus, EntropyReader, RequestedWithCallbackEvent, + }, + config::EthereumConfig, + eth_utils::{ eth_gas_oracle::EthProviderOracle, + legacy_tx_middleware::LegacyTxMiddleware, nonce_manager::NonceManagerMiddleware, - reader::{self, BlockNumber, BlockStatus, EntropyReader, RequestedWithCallbackEvent}, traced_client::{RpcMetrics, TracedClient}, }, - config::EthereumConfig, }, anyhow::{anyhow, Error, Result}, axum::async_trait, ethers::{ abi::RawLog, - contract::{abigen, ContractCall, EthLogDecode}, + contract::{abigen, EthLogDecode}, core::types::Address, - middleware::{gas_oracle::GasOracleMiddleware, MiddlewareError, SignerMiddleware}, - prelude::{BlockId, JsonRpcClient, PendingTransaction, TransactionRequest}, + middleware::{gas_oracle::GasOracleMiddleware, SignerMiddleware}, + prelude::JsonRpcClient, providers::{Http, Middleware, Provider}, signers::{LocalWallet, Signer}, - types::{transaction::eip2718::TypedTransaction, BlockNumber as EthersBlockNumber, U256}, + types::{BlockNumber as EthersBlockNumber, U256}, }, sha3::{Digest, Keccak256}, std::sync::Arc, - thiserror::Error, }; // TODO: Programmatically generate this so we don't have to keep committed ABI in sync with the @@ -43,90 +45,9 @@ pub type SignablePythContractInner = PythRandom>; pub type SignablePythContract = SignablePythContractInner; pub type InstrumentedSignablePythContract = SignablePythContractInner; -pub type PythContractCall = ContractCall, ()>; - pub type PythContract = PythRandom>; pub type InstrumentedPythContract = PythRandom>; -/// Middleware that converts a transaction into a legacy transaction if use_legacy_tx is true. -/// We can not use TransformerMiddleware because keeper calls fill_transaction first which bypasses -/// the transformer. -#[derive(Clone, Debug)] -pub struct LegacyTxMiddleware { - use_legacy_tx: bool, - inner: M, -} - -impl LegacyTxMiddleware { - pub fn new(use_legacy_tx: bool, inner: M) -> Self { - Self { - use_legacy_tx, - inner, - } - } -} - -#[derive(Error, Debug)] -pub enum LegacyTxMiddlewareError { - #[error("{0}")] - MiddlewareError(M::Error), -} - -impl MiddlewareError for LegacyTxMiddlewareError { - type Inner = M::Error; - - fn from_err(src: M::Error) -> Self { - LegacyTxMiddlewareError::MiddlewareError(src) - } - - fn as_inner(&self) -> Option<&Self::Inner> { - match self { - LegacyTxMiddlewareError::MiddlewareError(e) => Some(e), - } - } -} - -#[async_trait] -impl Middleware for LegacyTxMiddleware { - type Error = LegacyTxMiddlewareError; - type Provider = M::Provider; - type Inner = M; - fn inner(&self) -> &M { - &self.inner - } - - async fn send_transaction + Send + Sync>( - &self, - tx: T, - block: Option, - ) -> std::result::Result, Self::Error> { - let mut tx = tx.into(); - if self.use_legacy_tx { - let legacy_request: TransactionRequest = tx.into(); - tx = legacy_request.into(); - } - self.inner() - .send_transaction(tx, block) - .await - .map_err(MiddlewareError::from_err) - } - - async fn fill_transaction( - &self, - tx: &mut TypedTransaction, - block: Option, - ) -> std::result::Result<(), Self::Error> { - if self.use_legacy_tx { - let legacy_request: TransactionRequest = (*tx).clone().into(); - *tx = legacy_request.into(); - } - self.inner() - .fill_transaction(tx, block) - .await - .map_err(MiddlewareError::from_err) - } -} - impl SignablePythContractInner { /// Get the wallet that signs transactions sent to this contract. pub fn wallet(&self) -> LocalWallet { diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index a01324da96..6efec816d2 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -1,13 +1,11 @@ use { crate::{ api::{self, BlockchainState, ChainId}, - chain::{ - ethereum::InstrumentedPythContract, - traced_client::{RpcMetrics, TracedClient}, - }, + chain::ethereum::InstrumentedPythContract, command::register_provider::CommitmentMetadata, config::{Commitment, Config, EthereumConfig, RunOptions}, - keeper::{self, KeeperMetrics}, + eth_utils::traced_client::{RpcMetrics, TracedClient}, + keeper::{self, keeper_metrics::KeeperMetrics}, state::{HashChainState, PebbleHashChain}, }, anyhow::{anyhow, Error, Result}, diff --git a/apps/fortuna/src/eth_utils.rs b/apps/fortuna/src/eth_utils.rs new file mode 100644 index 0000000000..86f3bcb3e9 --- /dev/null +++ b/apps/fortuna/src/eth_utils.rs @@ -0,0 +1,5 @@ +pub mod eth_gas_oracle; +pub mod legacy_tx_middleware; +pub mod nonce_manager; +pub mod traced_client; +pub mod utils; diff --git a/apps/fortuna/src/chain/eth_gas_oracle.rs b/apps/fortuna/src/eth_utils/eth_gas_oracle.rs similarity index 100% rename from apps/fortuna/src/chain/eth_gas_oracle.rs rename to apps/fortuna/src/eth_utils/eth_gas_oracle.rs diff --git a/apps/fortuna/src/eth_utils/legacy_tx_middleware.rs b/apps/fortuna/src/eth_utils/legacy_tx_middleware.rs new file mode 100644 index 0000000000..911413b552 --- /dev/null +++ b/apps/fortuna/src/eth_utils/legacy_tx_middleware.rs @@ -0,0 +1,88 @@ +use { + axum::async_trait, + ethers::{ + middleware::{Middleware, MiddlewareError}, + prelude::{BlockId, PendingTransaction, TransactionRequest}, + types::transaction::eip2718::TypedTransaction, + }, + thiserror::Error, +}; + +/// Middleware that converts a transaction into a legacy transaction if use_legacy_tx is true. +/// We can not use TransformerMiddleware because keeper calls fill_transaction first which bypasses +/// the transformer. +#[derive(Clone, Debug)] +pub struct LegacyTxMiddleware { + use_legacy_tx: bool, + inner: M, +} + +impl LegacyTxMiddleware { + pub fn new(use_legacy_tx: bool, inner: M) -> Self { + Self { + use_legacy_tx, + inner, + } + } +} + +#[derive(Error, Debug)] +pub enum LegacyTxMiddlewareError { + #[error("{0}")] + MiddlewareError(M::Error), +} + +impl MiddlewareError for LegacyTxMiddlewareError { + type Inner = M::Error; + + fn from_err(src: M::Error) -> Self { + LegacyTxMiddlewareError::MiddlewareError(src) + } + + fn as_inner(&self) -> Option<&Self::Inner> { + match self { + LegacyTxMiddlewareError::MiddlewareError(e) => Some(e), + } + } +} + +#[async_trait] +impl Middleware for LegacyTxMiddleware { + type Error = LegacyTxMiddlewareError; + type Provider = M::Provider; + type Inner = M; + fn inner(&self) -> &M { + &self.inner + } + + async fn send_transaction + Send + Sync>( + &self, + tx: T, + block: Option, + ) -> std::result::Result, Self::Error> { + let mut tx = tx.into(); + if self.use_legacy_tx { + let legacy_request: TransactionRequest = tx.into(); + tx = legacy_request.into(); + } + self.inner() + .send_transaction(tx, block) + .await + .map_err(MiddlewareError::from_err) + } + + async fn fill_transaction( + &self, + tx: &mut TypedTransaction, + block: Option, + ) -> std::result::Result<(), Self::Error> { + if self.use_legacy_tx { + let legacy_request: TransactionRequest = (*tx).clone().into(); + *tx = legacy_request.into(); + } + self.inner() + .fill_transaction(tx, block) + .await + .map_err(MiddlewareError::from_err) + } +} diff --git a/apps/fortuna/src/chain/nonce_manager.rs b/apps/fortuna/src/eth_utils/nonce_manager.rs similarity index 100% rename from apps/fortuna/src/chain/nonce_manager.rs rename to apps/fortuna/src/eth_utils/nonce_manager.rs diff --git a/apps/fortuna/src/chain/traced_client.rs b/apps/fortuna/src/eth_utils/traced_client.rs similarity index 100% rename from apps/fortuna/src/chain/traced_client.rs rename to apps/fortuna/src/eth_utils/traced_client.rs diff --git a/apps/fortuna/src/eth_utils/utils.rs b/apps/fortuna/src/eth_utils/utils.rs new file mode 100644 index 0000000000..d0530da655 --- /dev/null +++ b/apps/fortuna/src/eth_utils/utils.rs @@ -0,0 +1,66 @@ +use { + anyhow::{anyhow, Result}, + ethers::{contract::ContractCall, middleware::Middleware}, + std::sync::Arc, + tracing, +}; + +pub async fn send_and_confirm(contract_call: ContractCall) -> Result<()> { + let call_name = contract_call.function.name.as_str(); + let pending_tx = contract_call + .send() + .await + .map_err(|e| anyhow!("Error submitting transaction({}) {:?}", call_name, e))?; + + let tx_result = pending_tx + .await + .map_err(|e| { + anyhow!( + "Error waiting for transaction({}) receipt: {:?}", + call_name, + e + ) + })? + .ok_or_else(|| { + anyhow!( + "Can't verify the transaction({}), probably dropped from mempool", + call_name + ) + })?; + + tracing::info!( + transaction_hash = &tx_result.transaction_hash.to_string(), + "Confirmed transaction({}). Receipt: {:?}", + call_name, + tx_result, + ); + Ok(()) +} + +/// Estimate the cost (in wei) of a transaction consuming gas_used gas. +pub async fn estimate_tx_cost( + middleware: Arc, + use_legacy_tx: bool, + gas_used: u128, +) -> Result { + let gas_price: u128 = if use_legacy_tx { + middleware + .get_gas_price() + .await + .map_err(|e| anyhow!("Failed to fetch gas price. error: {:?}", e))? + .try_into() + .map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))? + } else { + // This is not obvious but the implementation of estimate_eip1559_fees in ethers.rs + // for a middleware that has a GasOracleMiddleware inside is to ignore the passed-in callback + // and use whatever the gas oracle returns. + let (max_fee_per_gas, max_priority_fee_per_gas) = + middleware.estimate_eip1559_fees(None).await?; + + (max_fee_per_gas + max_priority_fee_per_gas) + .try_into() + .map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))? + }; + + Ok(gas_price * gas_used) +} diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index bce30c0dad..1271c42518 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -1,317 +1,45 @@ use { crate::{ - api::{self, BlockchainState, ChainId}, - chain::{ - ethereum::{ - InstrumentedPythContract, InstrumentedSignablePythContract, PythContractCall, - }, - reader::{BlockNumber, RequestedWithCallbackEvent}, - traced_client::{RpcMetrics, TracedClient}, - }, - config::EscalationPolicyConfig, + api::{BlockchainState, ChainId}, + chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract}, config::EthereumConfig, + eth_utils::traced_client::RpcMetrics, + keeper::block::{ + get_latest_safe_block, process_backlog, process_new_blocks, watch_blocks_wrapper, + BlockRange, + }, + keeper::commitment::update_commitments_loop, + keeper::fee::adjust_fee_wrapper, + keeper::fee::withdraw_fees_wrapper, + keeper::track::track_balance, + keeper::track::track_provider, }, - anyhow::{anyhow, Result}, - backoff::ExponentialBackoff, - ethers::{ - providers::{Middleware, Provider, Ws}, - signers::Signer, - types::{Address, U256}, - }, - futures::StreamExt, - prometheus_client::{ - encoding::EncodeLabelSet, - metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram}, - registry::Registry, - }, - std::{ - collections::HashSet, - sync::{atomic::AtomicU64, Arc}, - }, + ethers::{signers::Signer, types::U256}, + keeper_metrics::{AccountLabel, KeeperMetrics}, + std::{collections::HashSet, sync::Arc}, tokio::{ spawn, sync::{mpsc, RwLock}, - time::{self, timeout, Duration}, + time::{self, Duration}, }, tracing::{self, Instrument}, }; -/// How much to wait before retrying in case of an RPC error -const RETRY_INTERVAL: Duration = Duration::from_secs(5); +pub(crate) mod block; +pub(crate) mod commitment; +pub(crate) mod fee; +pub(crate) mod keeper_metrics; +pub(crate) mod process_event; +pub(crate) mod track; + /// How many blocks to look back for events that might be missed when starting the keeper const BACKLOG_RANGE: u64 = 1000; -/// How many blocks to fetch events for in a single rpc call -const BLOCK_BATCH_SIZE: u64 = 100; -/// How much to wait before polling the next latest block -const POLL_INTERVAL: Duration = Duration::from_secs(2); /// Track metrics in this interval const TRACK_INTERVAL: Duration = Duration::from_secs(10); /// Check whether we need to conduct a withdrawal at this interval. const WITHDRAW_INTERVAL: Duration = Duration::from_secs(300); /// Check whether we need to adjust the fee at this interval. const ADJUST_FEE_INTERVAL: Duration = Duration::from_secs(30); -/// Check whether we need to manually update the commitments to reduce numHashes for future -/// requests and reduce the gas cost of the reveal. -const UPDATE_COMMITMENTS_INTERVAL: Duration = Duration::from_secs(30); -const UPDATE_COMMITMENTS_THRESHOLD_FACTOR: f64 = 0.95; -/// Rety last N blocks -const RETRY_PREVIOUS_BLOCKS: u64 = 100; - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct AccountLabel { - pub chain_id: String, - pub address: String, -} - -pub struct KeeperMetrics { - pub current_sequence_number: Family, - pub end_sequence_number: Family, - pub balance: Family>, - pub collected_fee: Family>, - pub current_fee: Family>, - pub target_provider_fee: Family>, - pub total_gas_spent: Family>, - pub total_gas_fee_spent: Family>, - pub requests: Family, - pub requests_processed: Family, - pub requests_processed_success: Family, - pub requests_processed_failure: Family, - pub requests_reprocessed: Family, - pub reveals: Family, - pub request_duration_ms: Family, - pub retry_count: Family, - pub final_gas_multiplier: Family, - pub final_fee_multiplier: Family, - pub gas_price_estimate: Family>, -} - -impl Default for KeeperMetrics { - fn default() -> Self { - Self { - current_sequence_number: Family::default(), - end_sequence_number: Family::default(), - balance: Family::default(), - collected_fee: Family::default(), - current_fee: Family::default(), - target_provider_fee: Family::default(), - total_gas_spent: Family::default(), - total_gas_fee_spent: Family::default(), - requests: Family::default(), - requests_processed: Family::default(), - requests_processed_success: Family::default(), - requests_processed_failure: Family::default(), - requests_reprocessed: Family::default(), - reveals: Family::default(), - request_duration_ms: Family::new_with_constructor(|| { - Histogram::new( - vec![ - 1000.0, 2500.0, 5000.0, 7500.0, 10000.0, 20000.0, 30000.0, 40000.0, - 50000.0, 60000.0, 120000.0, 180000.0, 240000.0, 300000.0, 600000.0, - ] - .into_iter(), - ) - }), - retry_count: Family::new_with_constructor(|| { - Histogram::new(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 20.0].into_iter()) - }), - final_gas_multiplier: Family::new_with_constructor(|| { - Histogram::new( - vec![100.0, 125.0, 150.0, 200.0, 300.0, 400.0, 500.0, 600.0].into_iter(), - ) - }), - final_fee_multiplier: Family::new_with_constructor(|| { - Histogram::new(vec![100.0, 110.0, 120.0, 140.0, 160.0, 180.0, 200.0].into_iter()) - }), - gas_price_estimate: Family::default(), - } - } -} - -impl KeeperMetrics { - pub async fn new( - registry: Arc>, - chain_labels: Vec<(String, Address)>, - ) -> Self { - let mut writable_registry = registry.write().await; - let keeper_metrics = KeeperMetrics::default(); - - writable_registry.register( - "current_sequence_number", - "The sequence number for a new request", - keeper_metrics.current_sequence_number.clone(), - ); - - writable_registry.register( - "end_sequence_number", - "The sequence number for the end request", - keeper_metrics.end_sequence_number.clone(), - ); - - writable_registry.register( - "requests", - "Number of requests received through events", - keeper_metrics.requests.clone(), - ); - - writable_registry.register( - "requests_processed", - "Number of requests processed", - keeper_metrics.requests_processed.clone(), - ); - - writable_registry.register( - "requests_processed_success", - "Number of requests processed successfully", - keeper_metrics.requests_processed_success.clone(), - ); - - writable_registry.register( - "requests_processed_failure", - "Number of requests processed with failure", - keeper_metrics.requests_processed_failure.clone(), - ); - - writable_registry.register( - "reveal", - "Number of reveals", - keeper_metrics.reveals.clone(), - ); - - writable_registry.register( - "balance", - "Balance of the keeper", - keeper_metrics.balance.clone(), - ); - - writable_registry.register( - "collected_fee", - "Collected fee on the contract", - keeper_metrics.collected_fee.clone(), - ); - - writable_registry.register( - "current_fee", - "Current fee charged by the provider", - keeper_metrics.current_fee.clone(), - ); - - writable_registry.register( - "target_provider_fee", - "Target fee in ETH -- differs from current_fee in that this is the goal, and current_fee is the on-chain value.", - keeper_metrics.target_provider_fee.clone(), - ); - - writable_registry.register( - "total_gas_spent", - "Total gas spent revealing requests", - keeper_metrics.total_gas_spent.clone(), - ); - - writable_registry.register( - "total_gas_fee_spent", - "Total amount of ETH spent on gas for revealing requests", - keeper_metrics.total_gas_fee_spent.clone(), - ); - - writable_registry.register( - "requests_reprocessed", - "Number of requests reprocessed", - keeper_metrics.requests_reprocessed.clone(), - ); - - writable_registry.register( - "request_duration_ms", - "Time taken to process each successful callback request in milliseconds", - keeper_metrics.request_duration_ms.clone(), - ); - - writable_registry.register( - "retry_count", - "Number of retries for successful transactions", - keeper_metrics.retry_count.clone(), - ); - - writable_registry.register( - "final_gas_multiplier", - "Final gas multiplier percentage for successful transactions", - keeper_metrics.final_gas_multiplier.clone(), - ); - - writable_registry.register( - "final_fee_multiplier", - "Final fee multiplier percentage for successful transactions", - keeper_metrics.final_fee_multiplier.clone(), - ); - - writable_registry.register( - "gas_price_estimate", - "Gas price estimate for the blockchain (in gwei)", - keeper_metrics.gas_price_estimate.clone(), - ); - - // *Important*: When adding a new metric: - // 1. Register it above using `writable_registry.register(...)` - // 2. Add a get_or_create call in the loop below to initialize it for each chain/provider pair - for (chain_id, provider_address) in chain_labels { - let account_label = AccountLabel { - chain_id, - address: provider_address.to_string(), - }; - - let _ = keeper_metrics - .current_sequence_number - .get_or_create(&account_label); - let _ = keeper_metrics - .end_sequence_number - .get_or_create(&account_label); - let _ = keeper_metrics.balance.get_or_create(&account_label); - let _ = keeper_metrics.collected_fee.get_or_create(&account_label); - let _ = keeper_metrics.current_fee.get_or_create(&account_label); - let _ = keeper_metrics - .target_provider_fee - .get_or_create(&account_label); - let _ = keeper_metrics.total_gas_spent.get_or_create(&account_label); - let _ = keeper_metrics - .total_gas_fee_spent - .get_or_create(&account_label); - let _ = keeper_metrics.requests.get_or_create(&account_label); - let _ = keeper_metrics - .requests_processed - .get_or_create(&account_label); - let _ = keeper_metrics - .requests_processed_success - .get_or_create(&account_label); - let _ = keeper_metrics - .requests_processed_failure - .get_or_create(&account_label); - let _ = keeper_metrics - .requests_reprocessed - .get_or_create(&account_label); - let _ = keeper_metrics.reveals.get_or_create(&account_label); - let _ = keeper_metrics - .request_duration_ms - .get_or_create(&account_label); - let _ = keeper_metrics.retry_count.get_or_create(&account_label); - let _ = keeper_metrics - .final_gas_multiplier - .get_or_create(&account_label); - let _ = keeper_metrics - .final_fee_multiplier - .get_or_create(&account_label); - let _ = keeper_metrics - .gas_price_estimate - .get_or_create(&account_label); - } - - keeper_metrics - } -} - -#[derive(Debug, Clone)] -pub struct BlockRange { - pub from: BlockNumber, - pub to: BlockNumber, -} #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RequestState { @@ -323,29 +51,6 @@ pub enum RequestState { Processed, } -/// Get the latest safe block number for the chain. Retry internally if there is an error. -async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber { - loop { - match chain_state - .contract - .get_block_number(chain_state.confirmed_block_status) - .await - { - Ok(latest_confirmed_block) => { - tracing::info!( - "Fetched latest safe block {}", - latest_confirmed_block - chain_state.reveal_delay_blocks - ); - return latest_confirmed_block - chain_state.reveal_delay_blocks; - } - Err(e) => { - tracing::error!("Error while getting block number. error: {:?}", e); - time::sleep(RETRY_INTERVAL).await; - } - } - } -} - /// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and /// handle any events for the new blocks. #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))] @@ -509,1032 +214,3 @@ pub async fn run_keeper_threads( .in_current_span(), ); } - -/// Process an event with backoff. It will retry the reveal on failure for 5 minutes. -#[tracing::instrument(name = "process_event_with_backoff", skip_all, fields( - sequence_number = event.sequence_number -))] -pub async fn process_event_with_backoff( - event: RequestedWithCallbackEvent, - chain_state: BlockchainState, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicyConfig, - metrics: Arc, -) { - let start_time = std::time::Instant::now(); - let account_label = AccountLabel { - chain_id: chain_state.id.clone(), - address: chain_state.provider_address.to_string(), - }; - - metrics.requests.get_or_create(&account_label).inc(); - tracing::info!("Started processing event"); - let backoff = ExponentialBackoff { - max_elapsed_time: Some(Duration::from_secs(300)), // retry for 5 minutes - ..Default::default() - }; - - let num_retries = Arc::new(AtomicU64::new(0)); - - let success = backoff::future::retry_notify( - backoff, - || async { - let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); - - let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries); - let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries); - process_event( - &event, - &chain_state, - &contract, - gas_limit.saturating_mul(escalation_policy.gas_limit_tolerance_pct.into()) / 100, - gas_multiplier_pct, - fee_multiplier_pct, - metrics.clone(), - ) - .await - }, - |e, dur| { - let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed); - tracing::error!( - "Error on retry {} at duration {:?}: {}", - retry_number, - dur, - e - ); - num_retries.store(retry_number + 1, std::sync::atomic::Ordering::Relaxed); - }, - ) - .await; - - let duration = start_time.elapsed(); - - metrics - .requests_processed - .get_or_create(&account_label) - .inc(); - - match success { - Ok(()) => { - tracing::info!("Processed event successfully in {:?}", duration); - - metrics - .requests_processed_success - .get_or_create(&account_label) - .inc(); - - metrics - .request_duration_ms - .get_or_create(&account_label) - .observe(duration.as_millis() as f64); - - // Track retry count, gas multiplier, and fee multiplier for successful transactions - let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); - metrics - .retry_count - .get_or_create(&account_label) - .observe(num_retries as f64); - - let gas_multiplier = escalation_policy.get_gas_multiplier_pct(num_retries); - metrics - .final_gas_multiplier - .get_or_create(&account_label) - .observe(gas_multiplier as f64); - - let fee_multiplier = escalation_policy.get_fee_multiplier_pct(num_retries); - metrics - .final_fee_multiplier - .get_or_create(&account_label) - .observe(fee_multiplier as f64); - } - Err(e) => { - // In case the callback did not succeed, we double-check that the request is still on-chain. - // If the request is no longer on-chain, one of the transactions we sent likely succeeded, but - // the RPC gave us an error anyway. - let req = chain_state - .contract - .get_request(event.provider_address, event.sequence_number) - .await; - - tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req); - - // We only count failures for cases where we are completely certain that the callback failed. - if req.is_ok_and(|x| x.is_some()) { - metrics - .requests_processed_failure - .get_or_create(&account_label) - .inc(); - } - } - } -} - -const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30; - -/// Process a callback on a chain. It estimates the gas for the reveal with callback and -/// submits the transaction if the gas estimate is below the gas limit. -/// It will return a permanent or transient error depending on the error type and whether -/// retry is possible or not. -pub async fn process_event( - event: &RequestedWithCallbackEvent, - chain_config: &BlockchainState, - contract: &InstrumentedSignablePythContract, - gas_limit: U256, - // A value of 100 submits the tx with the same gas/fee as the estimate. - gas_estimate_multiplier_pct: u64, - fee_estimate_multiplier_pct: u64, - metrics: Arc, -) -> Result<(), backoff::Error> { - // ignore requests that are not for the configured provider - if chain_config.provider_address != event.provider_address { - return Ok(()); - } - let provider_revelation = chain_config - .state - .reveal(event.sequence_number) - .map_err(|e| backoff::Error::permanent(anyhow!("Error revealing: {:?}", e)))?; - - let gas_estimate_res = chain_config - .contract - .estimate_reveal_with_callback_gas( - contract.wallet().address(), - event.provider_address, - event.sequence_number, - event.user_random_number, - provider_revelation, - ) - .in_current_span() - .await; - - let gas_estimate = gas_estimate_res.map_err(|e| { - // we consider the error transient even if it is a contract revert since - // it can be because of routing to a lagging RPC node. Retrying such errors will - // incur a few additional RPC calls, but it is fine. - backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e)) - })?; - - // The gas limit on the simulated transaction is the configured gas limit on the chain, - // but we are willing to pad the gas a bit to ensure reliable submission. - if gas_estimate > gas_limit { - return Err(backoff::Error::permanent(anyhow!( - "Gas estimate for reveal with callback is higher than the gas limit {} > {}", - gas_estimate, - gas_limit - ))); - } - - // Pad the gas estimate after checking it against the simulation gas limit, ensuring that - // the padded gas estimate doesn't exceed the maximum amount of gas we are willing to use. - let gas_estimate = gas_estimate.saturating_mul(gas_estimate_multiplier_pct.into()) / 100; - - let contract_call = contract - .reveal_with_callback( - event.provider_address, - event.sequence_number, - event.user_random_number, - provider_revelation, - ) - .gas(gas_estimate); - - let client = contract.client(); - let mut transaction = contract_call.tx.clone(); - - // manually fill the tx with the gas info, so we can log the details in case of error - client - .fill_transaction(&mut transaction, None) - .await - .map_err(|e| { - backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e)) - })?; - - // Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle - // in the client that sets the gas price. - transaction.set_gas_price( - transaction - .gas_price() - .unwrap_or_default() - .saturating_mul(fee_estimate_multiplier_pct.into()) - / 100, - ); - - let pending_tx = client - .send_transaction(transaction.clone(), None) - .await - .map_err(|e| { - backoff::Error::transient(anyhow!( - "Error submitting the reveal transaction. Tx:{:?}, Error:{:?}", - transaction, - e - )) - })?; - - let reset_nonce = || { - let nonce_manager = contract.client_ref().inner().inner(); - nonce_manager.reset(); - }; - - let pending_receipt = timeout( - Duration::from_secs(TX_CONFIRMATION_TIMEOUT_SECS), - pending_tx, - ) - .await - .map_err(|_| { - // Tx can get stuck in mempool without any progress if the nonce is too high - // in this case ethers internal polling will not reduce the number of retries - // and keep retrying indefinitely. So we set a manual timeout here and reset the nonce. - reset_nonce(); - backoff::Error::transient(anyhow!( - "Tx stuck in mempool. Resetting nonce. Tx:{:?}", - transaction - )) - })?; - - let receipt = pending_receipt - .map_err(|e| { - backoff::Error::transient(anyhow!( - "Error waiting for transaction receipt. Tx:{:?} Error:{:?}", - transaction, - e - )) - })? - .ok_or_else(|| { - // RPC may not return an error on tx submission if the nonce is too high. - // But we will never get a receipt. So we reset the nonce manager to get the correct nonce. - reset_nonce(); - backoff::Error::transient(anyhow!( - "Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}", - transaction - )) - })?; - - tracing::info!( - sequence_number = &event.sequence_number, - transaction_hash = &receipt.transaction_hash.to_string(), - gas_used = ?receipt.gas_used, - "Revealed with res: {:?}", - receipt - ); - - let account_label = AccountLabel { - chain_id: chain_config.id.clone(), - address: chain_config.provider_address.to_string(), - }; - - if let Some(gas_used) = receipt.gas_used { - let gas_used_float = gas_used.as_u128() as f64 / 1e18; - metrics - .total_gas_spent - .get_or_create(&account_label) - .inc_by(gas_used_float); - - if let Some(gas_price) = receipt.effective_gas_price { - let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18; - metrics - .total_gas_fee_spent - .get_or_create(&account_label) - .inc_by(gas_fee); - } - } - - metrics.reveals.get_or_create(&account_label).inc(); - - Ok(()) -} - -/// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch. -#[tracing::instrument(skip_all, fields( - range_from_block = block_range.from, range_to_block = block_range.to -))] -pub async fn process_block_range( - block_range: BlockRange, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicyConfig, - chain_state: api::BlockchainState, - metrics: Arc, - fulfilled_requests_cache: Arc>>, -) { - let BlockRange { - from: first_block, - to: last_block, - } = block_range; - let mut current_block = first_block; - while current_block <= last_block { - let mut to_block = current_block + BLOCK_BATCH_SIZE; - if to_block > last_block { - to_block = last_block; - } - - // TODO: this is handling all blocks sequentially we might want to handle them in parallel in future. - process_single_block_batch( - BlockRange { - from: current_block, - to: to_block, - }, - contract.clone(), - gas_limit, - escalation_policy.clone(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await; - - current_block = to_block + 1; - } -} - -/// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch -/// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled. -/// It won't reprocess it. If the request was already processed, it will reprocess it. -/// If the process fails, it will retry indefinitely. -#[tracing::instrument(name = "batch", skip_all, fields( - batch_from_block = block_range.from, batch_to_block = block_range.to -))] -pub async fn process_single_block_batch( - block_range: BlockRange, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicyConfig, - chain_state: api::BlockchainState, - metrics: Arc, - fulfilled_requests_cache: Arc>>, -) { - loop { - let events_res = chain_state - .contract - .get_request_with_callback_events(block_range.from, block_range.to) - .await; - - match events_res { - Ok(events) => { - tracing::info!(num_of_events = &events.len(), "Processing",); - for event in &events { - // the write lock guarantees we spawn only one task per sequence number - let newly_inserted = fulfilled_requests_cache - .write() - .await - .insert(event.sequence_number); - if newly_inserted { - spawn( - process_event_with_backoff( - event.clone(), - chain_state.clone(), - contract.clone(), - gas_limit, - escalation_policy.clone(), - metrics.clone(), - ) - .in_current_span(), - ); - } - } - tracing::info!(num_of_events = &events.len(), "Processed",); - break; - } - Err(e) => { - tracing::error!( - "Error while getting events. Waiting for {} seconds before retry. error: {:?}", - RETRY_INTERVAL.as_secs(), - e - ); - time::sleep(RETRY_INTERVAL).await; - } - } - } -} - -/// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay. -/// It retries indefinitely. -#[tracing::instrument(name = "watch_blocks", skip_all, fields( - initial_safe_block = latest_safe_block -))] -pub async fn watch_blocks_wrapper( - chain_state: BlockchainState, - latest_safe_block: BlockNumber, - tx: mpsc::Sender, - geth_rpc_wss: Option, -) { - let mut last_safe_block_processed = latest_safe_block; - loop { - if let Err(e) = watch_blocks( - chain_state.clone(), - &mut last_safe_block_processed, - tx.clone(), - geth_rpc_wss.clone(), - ) - .in_current_span() - .await - { - tracing::error!("watching blocks. error: {:?}", e); - time::sleep(RETRY_INTERVAL).await; - } - } -} - -/// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel. -/// We are subscribing to new blocks instead of events. If we miss some blocks, it will be fine as we are sending -/// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even -/// know about it. -pub async fn watch_blocks( - chain_state: BlockchainState, - last_safe_block_processed: &mut BlockNumber, - tx: mpsc::Sender, - geth_rpc_wss: Option, -) -> Result<()> { - tracing::info!("Watching blocks to handle new events"); - - let provider_option = match geth_rpc_wss { - Some(wss) => Some(match Provider::::connect(wss.clone()).await { - Ok(provider) => provider, - Err(e) => { - tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e); - return Err(e.into()); - } - }), - None => { - tracing::info!("No wss provided"); - None - } - }; - - let mut stream_option = match provider_option { - Some(ref provider) => Some(match provider.subscribe_blocks().await { - Ok(client) => client, - Err(e) => { - tracing::error!("Error while subscribing to blocks. error {:?}", e); - return Err(e.into()); - } - }), - None => None, - }; - - loop { - match stream_option { - Some(ref mut stream) => { - if stream.next().await.is_none() { - tracing::error!("Error blocks subscription stream ended"); - return Err(anyhow!("Error blocks subscription stream ended")); - } - } - None => { - time::sleep(POLL_INTERVAL).await; - } - } - - let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; - if latest_safe_block > *last_safe_block_processed { - let mut from = latest_safe_block.saturating_sub(RETRY_PREVIOUS_BLOCKS); - - // In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10) - // TODO: add a metric for this in separate PR. We need alerts - // But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and - // last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc - // to be in consistency after this much time. - if from > *last_safe_block_processed { - from = *last_safe_block_processed; - } - match tx - .send(BlockRange { - from, - to: latest_safe_block, - }) - .await - { - Ok(_) => { - tracing::info!( - from_block = from, - to_block = &latest_safe_block, - "Block range sent to handle events", - ); - *last_safe_block_processed = latest_safe_block; - } - Err(e) => { - tracing::error!( - from_block = from, - to_block = &latest_safe_block, - "Error while sending block range to handle events. These will be handled in next call. error: {:?}", - e - ); - } - }; - } - } -} - -/// It waits on rx channel to receive block ranges and then calls process_block_range to process them -/// for each configured block delay. -#[tracing::instrument(skip_all)] -#[allow(clippy::too_many_arguments)] -pub async fn process_new_blocks( - chain_state: BlockchainState, - mut rx: mpsc::Receiver, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicyConfig, - metrics: Arc, - fulfilled_requests_cache: Arc>>, - block_delays: Vec, -) { - tracing::info!("Waiting for new block ranges to process"); - loop { - if let Some(block_range) = rx.recv().await { - // Process blocks immediately first - process_block_range( - block_range.clone(), - Arc::clone(&contract), - gas_limit, - escalation_policy.clone(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await; - - // Then process with each configured delay - for delay in &block_delays { - let adjusted_range = BlockRange { - from: block_range.from.saturating_sub(*delay), - to: block_range.to.saturating_sub(*delay), - }; - process_block_range( - adjusted_range, - Arc::clone(&contract), - gas_limit, - escalation_policy.clone(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await; - } - } - } -} - -/// Processes the backlog_range for a chain. -#[tracing::instrument(skip_all)] -pub async fn process_backlog( - backlog_range: BlockRange, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicyConfig, - chain_state: BlockchainState, - metrics: Arc, - fulfilled_requests_cache: Arc>>, -) { - tracing::info!("Processing backlog"); - process_block_range( - backlog_range, - contract, - gas_limit, - escalation_policy, - chain_state, - metrics, - fulfilled_requests_cache, - ) - .in_current_span() - .await; - tracing::info!("Backlog processed"); -} - -/// tracks the balance of the given address on the given chain -/// if there was an error, the function will just return -#[tracing::instrument(skip_all)] -pub async fn track_balance( - chain_id: String, - provider: Arc>, - address: Address, - metrics: Arc, -) { - let balance = match provider.get_balance(address, None).await { - // This conversion to u128 is fine as the total balance will never cross the limits - // of u128 practically. - Ok(r) => r.as_u128(), - Err(e) => { - tracing::error!("Error while getting balance. error: {:?}", e); - return; - } - }; - // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus. - // The balance is in wei, so we need to divide by 1e18 to convert it to eth. - let balance = balance as f64 / 1e18; - - metrics - .balance - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: address.to_string(), - }) - .set(balance); -} - -/// tracks the collected fees and the hashchain data of the given provider address on the given chain -/// if there is a error the function will just return -#[tracing::instrument(skip_all)] -pub async fn track_provider( - chain_id: ChainId, - contract: InstrumentedPythContract, - provider_address: Address, - metrics: Arc, -) { - let provider_info = match contract.get_provider_info(provider_address).call().await { - Ok(info) => info, - Err(e) => { - tracing::error!("Error while getting provider info. error: {:?}", e); - return; - } - }; - - // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. - // The fee is in wei, so we divide by 1e18 to convert it to eth. - let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18; - let current_fee: f64 = provider_info.fee_in_wei as f64 / 1e18; - - let current_sequence_number = provider_info.sequence_number; - let end_sequence_number = provider_info.end_sequence_number; - - metrics - .collected_fee - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - .set(collected_fee); - - metrics - .current_fee - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - .set(current_fee); - - metrics - .current_sequence_number - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - // sequence_number type on chain is u64 but practically it will take - // a long time for it to cross the limits of i64. - // currently prometheus only supports i64 for Gauge types - .set(current_sequence_number as i64); - metrics - .end_sequence_number - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - .set(end_sequence_number as i64); -} - -#[tracing::instrument(name = "withdraw_fees", skip_all, fields())] -pub async fn withdraw_fees_wrapper( - contract: Arc, - provider_address: Address, - poll_interval: Duration, - min_balance: U256, -) { - loop { - if let Err(e) = withdraw_fees_if_necessary(contract.clone(), provider_address, min_balance) - .in_current_span() - .await - { - tracing::error!("Withdrawing fees. error: {:?}", e); - } - time::sleep(poll_interval).await; - } -} - -/// Withdraws accumulated fees in the contract as needed to maintain the balance of the keeper wallet. -pub async fn withdraw_fees_if_necessary( - contract: Arc, - provider_address: Address, - min_balance: U256, -) -> Result<()> { - let provider = contract.provider(); - let wallet = contract.wallet(); - - let keeper_balance = provider - .get_balance(wallet.address(), None) - .await - .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?; - - let provider_info = contract - .get_provider_info(provider_address) - .call() - .await - .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; - - if provider_info.fee_manager != wallet.address() { - return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", provider, provider_info.fee_manager, wallet.address())); - } - - let fees = provider_info.accrued_fees_in_wei; - - if keeper_balance < min_balance && U256::from(fees) > min_balance { - tracing::info!("Claiming accrued fees..."); - let contract_call = contract.withdraw_as_fee_manager(provider_address, fees); - send_and_confirm(contract_call).await?; - } else if keeper_balance < min_balance { - tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up.", keeper_balance, min_balance) - } - - Ok(()) -} - -pub async fn send_and_confirm(contract_call: PythContractCall) -> Result<()> { - let call_name = contract_call.function.name.as_str(); - let pending_tx = contract_call - .send() - .await - .map_err(|e| anyhow!("Error submitting transaction({}) {:?}", call_name, e))?; - - let tx_result = pending_tx - .await - .map_err(|e| { - anyhow!( - "Error waiting for transaction({}) receipt: {:?}", - call_name, - e - ) - })? - .ok_or_else(|| { - anyhow!( - "Can't verify the transaction({}), probably dropped from mempool", - call_name - ) - })?; - - tracing::info!( - transaction_hash = &tx_result.transaction_hash.to_string(), - "Confirmed transaction({}). Receipt: {:?}", - call_name, - tx_result, - ); - Ok(()) -} - -#[tracing::instrument(name = "adjust_fee", skip_all)] -#[allow(clippy::too_many_arguments)] -pub async fn adjust_fee_wrapper( - contract: Arc, - chain_state: BlockchainState, - provider_address: Address, - poll_interval: Duration, - legacy_tx: bool, - gas_limit: u64, - min_profit_pct: u64, - target_profit_pct: u64, - max_profit_pct: u64, - min_fee_wei: u128, - metrics: Arc, -) { - // The maximum balance of accrued fees + provider wallet balance. None if we haven't observed a value yet. - let mut high_water_pnl: Option = None; - // The sequence number where the keeper last updated the on-chain fee. None if we haven't observed it yet. - let mut sequence_number_of_last_fee_update: Option = None; - loop { - if let Err(e) = adjust_fee_if_necessary( - contract.clone(), - chain_state.id.clone(), - provider_address, - legacy_tx, - gas_limit, - min_profit_pct, - target_profit_pct, - max_profit_pct, - min_fee_wei, - &mut high_water_pnl, - &mut sequence_number_of_last_fee_update, - metrics.clone(), - ) - .in_current_span() - .await - { - tracing::error!("Withdrawing fees. error: {:?}", e); - } - time::sleep(poll_interval).await; - } -} - -#[tracing::instrument(name = "update_commitments", skip_all)] -pub async fn update_commitments_loop( - contract: Arc, - chain_state: BlockchainState, -) { - loop { - if let Err(e) = update_commitments_if_necessary(contract.clone(), &chain_state) - .in_current_span() - .await - { - tracing::error!("Update commitments. error: {:?}", e); - } - time::sleep(UPDATE_COMMITMENTS_INTERVAL).await; - } -} - -pub async fn update_commitments_if_necessary( - contract: Arc, - chain_state: &BlockchainState, -) -> Result<()> { - //TODO: we can reuse the result from the last call from the watch_blocks thread to reduce RPCs - let latest_safe_block = get_latest_safe_block(chain_state).in_current_span().await; - let provider_address = chain_state.provider_address; - let provider_info = contract - .get_provider_info(provider_address) - .block(latest_safe_block) // To ensure we are not revealing sooner than we should - .call() - .await - .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; - if provider_info.max_num_hashes == 0 { - return Ok(()); - } - let threshold = - ((provider_info.max_num_hashes as f64) * UPDATE_COMMITMENTS_THRESHOLD_FACTOR) as u64; - if provider_info.sequence_number - provider_info.current_commitment_sequence_number > threshold - { - let seq_number = provider_info.sequence_number - 1; - let provider_revelation = chain_state - .state - .reveal(seq_number) - .map_err(|e| anyhow!("Error revealing: {:?}", e))?; - let contract_call = - contract.advance_provider_commitment(provider_address, seq_number, provider_revelation); - send_and_confirm(contract_call).await?; - } - Ok(()) -} - -/// Adjust the fee charged by the provider to ensure that it is profitable at the prevailing gas price. -/// This method targets a fee as a function of the maximum cost of the callback, -/// c = (gas_limit) * (current gas price), with min_fee_wei as a lower bound on the fee. -/// -/// The method then updates the on-chain fee if all of the following are satisfied: -/// - the on-chain fee does not fall into an interval [c*min_profit, c*max_profit]. The tolerance -/// factor prevents the on-chain fee from changing with every single gas price fluctuation. -/// Profit scalars are specified in percentage units, min_profit = (min_profit_pct + 100) / 100 -/// - either the fee is increasing or the keeper is earning a profit -- i.e., fees only decrease when the keeper is profitable -/// - at least one random number has been requested since the last fee update -/// -/// These conditions are intended to make sure that the keeper is profitable while also minimizing the number of fee -/// update transactions. -#[allow(clippy::too_many_arguments)] -pub async fn adjust_fee_if_necessary( - contract: Arc, - chain_id: ChainId, - provider_address: Address, - legacy_tx: bool, - gas_limit: u64, - min_profit_pct: u64, - target_profit_pct: u64, - max_profit_pct: u64, - min_fee_wei: u128, - high_water_pnl: &mut Option, - sequence_number_of_last_fee_update: &mut Option, - metrics: Arc, -) -> Result<()> { - let provider_info = contract - .get_provider_info(provider_address) - .call() - .await - .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; - - if provider_info.fee_manager != contract.wallet().address() { - return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", contract.provider(), provider_info.fee_manager, contract.wallet().address())); - } - - // Calculate target window for the on-chain fee. - let max_callback_cost: u128 = estimate_tx_cost(contract.clone(), legacy_tx, gas_limit.into()) - .await - .map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?; - - let account_label = AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }; - - metrics - .gas_price_estimate - .get_or_create(&account_label) - .set((max_callback_cost / u128::from(gas_limit)) as f64 / 1e9); - - let target_fee_min = std::cmp::max( - (max_callback_cost * u128::from(min_profit_pct)) / 100, - min_fee_wei, - ); - let target_fee = std::cmp::max( - (max_callback_cost * u128::from(target_profit_pct)) / 100, - min_fee_wei, - ); - metrics - .target_provider_fee - .get_or_create(&account_label) - .set(((max_callback_cost * u128::from(target_profit_pct)) / 100) as f64 / 1e18); - - let target_fee_max = std::cmp::max( - (max_callback_cost * u128::from(max_profit_pct)) / 100, - min_fee_wei, - ); - - // Calculate current P&L to determine if we can reduce fees. - let current_keeper_balance = contract - .provider() - .get_balance(contract.wallet().address(), None) - .await - .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?; - let current_keeper_fees = U256::from(provider_info.accrued_fees_in_wei); - let current_pnl = current_keeper_balance + current_keeper_fees; - - let can_reduce_fees = match high_water_pnl { - Some(x) => current_pnl >= *x, - None => false, - }; - - // Determine if the chain has seen activity since the last fee update. - let is_chain_active: bool = match sequence_number_of_last_fee_update { - Some(n) => provider_info.sequence_number > *n, - None => { - // We don't want to adjust the fees on server start for unused chains, hence false here. - false - } - }; - - let provider_fee: u128 = provider_info.fee_in_wei; - if is_chain_active - && ((provider_fee > target_fee_max && can_reduce_fees) || provider_fee < target_fee_min) - { - tracing::info!( - "Adjusting fees. Current: {:?} Target: {:?}", - provider_fee, - target_fee - ); - let contract_call = contract.set_provider_fee_as_fee_manager(provider_address, target_fee); - send_and_confirm(contract_call).await?; - - *sequence_number_of_last_fee_update = Some(provider_info.sequence_number); - } else { - tracing::info!( - "Skipping fee adjustment. Current: {:?} Target: {:?} [{:?}, {:?}] Current Sequence Number: {:?} Last updated sequence number {:?} Current pnl: {:?} High water pnl: {:?}", - provider_fee, - target_fee, - target_fee_min, - target_fee_max, - provider_info.sequence_number, - sequence_number_of_last_fee_update, - current_pnl, - high_water_pnl - ) - } - - // Update high water pnl - *high_water_pnl = Some(std::cmp::max( - current_pnl, - high_water_pnl.unwrap_or(U256::from(0)), - )); - - // Update sequence number on server start. - match sequence_number_of_last_fee_update { - Some(_) => (), - None => { - *sequence_number_of_last_fee_update = Some(provider_info.sequence_number); - } - }; - - Ok(()) -} - -/// Estimate the cost (in wei) of a transaction consuming gas_used gas. -pub async fn estimate_tx_cost( - contract: Arc, - use_legacy_tx: bool, - gas_used: u128, -) -> Result { - let middleware = contract.client(); - - let gas_price: u128 = if use_legacy_tx { - middleware - .get_gas_price() - .await - .map_err(|e| anyhow!("Failed to fetch gas price. error: {:?}", e))? - .try_into() - .map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))? - } else { - // This is not obvious but the implementation of estimate_eip1559_fees in ethers.rs - // for a middleware that has a GasOracleMiddleware inside is to ignore the passed-in callback - // and use whatever the gas oracle returns. - let (max_fee_per_gas, max_priority_fee_per_gas) = - middleware.estimate_eip1559_fees(None).await?; - - (max_fee_per_gas + max_priority_fee_per_gas) - .try_into() - .map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))? - }; - - Ok(gas_price * gas_used) -} diff --git a/apps/fortuna/src/keeper/block.rs b/apps/fortuna/src/keeper/block.rs new file mode 100644 index 0000000000..21f7d26188 --- /dev/null +++ b/apps/fortuna/src/keeper/block.rs @@ -0,0 +1,360 @@ +use { + crate::{ + api::{self, BlockchainState}, + chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber}, + config::EscalationPolicyConfig, + keeper::keeper_metrics::KeeperMetrics, + keeper::process_event::process_event_with_backoff, + }, + anyhow::{anyhow, Result}, + ethers::{ + providers::{Middleware, Provider, Ws}, + types::U256, + }, + futures::StreamExt, + std::{collections::HashSet, sync::Arc}, + tokio::{ + spawn, + sync::{mpsc, RwLock}, + time::{self, Duration}, + }, + tracing::{self, Instrument}, +}; + +/// How much to wait before retrying in case of an RPC error +const RETRY_INTERVAL: Duration = Duration::from_secs(5); +/// How many blocks to fetch events for in a single rpc call +const BLOCK_BATCH_SIZE: u64 = 100; +/// How much to wait before polling the next latest block +const POLL_INTERVAL: Duration = Duration::from_secs(2); +/// Retry last N blocks +const RETRY_PREVIOUS_BLOCKS: u64 = 100; + +#[derive(Debug, Clone)] +pub struct BlockRange { + pub from: BlockNumber, + pub to: BlockNumber, +} + +/// Get the latest safe block number for the chain. Retry internally if there is an error. +pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber { + loop { + match chain_state + .contract + .get_block_number(chain_state.confirmed_block_status) + .await + { + Ok(latest_confirmed_block) => { + tracing::info!( + "Fetched latest safe block {}", + latest_confirmed_block - chain_state.reveal_delay_blocks + ); + return latest_confirmed_block - chain_state.reveal_delay_blocks; + } + Err(e) => { + tracing::error!("Error while getting block number. error: {:?}", e); + time::sleep(RETRY_INTERVAL).await; + } + } + } +} + +/// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch. +#[tracing::instrument(skip_all, fields( + range_from_block = block_range.from, range_to_block = block_range.to +))] +pub async fn process_block_range( + block_range: BlockRange, + contract: Arc, + gas_limit: U256, + escalation_policy: EscalationPolicyConfig, + chain_state: api::BlockchainState, + metrics: Arc, + fulfilled_requests_cache: Arc>>, +) { + let BlockRange { + from: first_block, + to: last_block, + } = block_range; + let mut current_block = first_block; + while current_block <= last_block { + let mut to_block = current_block + BLOCK_BATCH_SIZE; + if to_block > last_block { + to_block = last_block; + } + + // TODO: this is handling all blocks sequentially we might want to handle them in parallel in future. + process_single_block_batch( + BlockRange { + from: current_block, + to: to_block, + }, + contract.clone(), + gas_limit, + escalation_policy.clone(), + chain_state.clone(), + metrics.clone(), + fulfilled_requests_cache.clone(), + ) + .in_current_span() + .await; + + current_block = to_block + 1; + } +} + +/// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch +/// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled. +/// It won't reprocess it. If the request was already processed, it will reprocess it. +/// If the process fails, it will retry indefinitely. +#[tracing::instrument(name = "batch", skip_all, fields( + batch_from_block = block_range.from, batch_to_block = block_range.to +))] +pub async fn process_single_block_batch( + block_range: BlockRange, + contract: Arc, + gas_limit: U256, + escalation_policy: EscalationPolicyConfig, + chain_state: api::BlockchainState, + metrics: Arc, + fulfilled_requests_cache: Arc>>, +) { + loop { + let events_res = chain_state + .contract + .get_request_with_callback_events(block_range.from, block_range.to) + .await; + + match events_res { + Ok(events) => { + tracing::info!(num_of_events = &events.len(), "Processing",); + for event in &events { + // the write lock guarantees we spawn only one task per sequence number + let newly_inserted = fulfilled_requests_cache + .write() + .await + .insert(event.sequence_number); + if newly_inserted { + spawn( + process_event_with_backoff( + event.clone(), + chain_state.clone(), + contract.clone(), + gas_limit, + escalation_policy.clone(), + metrics.clone(), + ) + .in_current_span(), + ); + } + } + tracing::info!(num_of_events = &events.len(), "Processed",); + break; + } + Err(e) => { + tracing::error!( + "Error while getting events. Waiting for {} seconds before retry. error: {:?}", + RETRY_INTERVAL.as_secs(), + e + ); + time::sleep(RETRY_INTERVAL).await; + } + } + } +} + +/// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay. +/// It retries indefinitely. +#[tracing::instrument(name = "watch_blocks", skip_all, fields( + initial_safe_block = latest_safe_block +))] +pub async fn watch_blocks_wrapper( + chain_state: BlockchainState, + latest_safe_block: BlockNumber, + tx: mpsc::Sender, + geth_rpc_wss: Option, +) { + let mut last_safe_block_processed = latest_safe_block; + loop { + if let Err(e) = watch_blocks( + chain_state.clone(), + &mut last_safe_block_processed, + tx.clone(), + geth_rpc_wss.clone(), + ) + .in_current_span() + .await + { + tracing::error!("watching blocks. error: {:?}", e); + time::sleep(RETRY_INTERVAL).await; + } + } +} + +/// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel. +/// We are subscribing to new blocks instead of events. If we miss some blocks, it will be fine as we are sending +/// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even +/// know about it. +pub async fn watch_blocks( + chain_state: BlockchainState, + last_safe_block_processed: &mut BlockNumber, + tx: mpsc::Sender, + geth_rpc_wss: Option, +) -> Result<()> { + tracing::info!("Watching blocks to handle new events"); + + let provider_option = match geth_rpc_wss { + Some(wss) => Some(match Provider::::connect(wss.clone()).await { + Ok(provider) => provider, + Err(e) => { + tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e); + return Err(e.into()); + } + }), + None => { + tracing::info!("No wss provided"); + None + } + }; + + let mut stream_option = match provider_option { + Some(ref provider) => Some(match provider.subscribe_blocks().await { + Ok(client) => client, + Err(e) => { + tracing::error!("Error while subscribing to blocks. error {:?}", e); + return Err(e.into()); + } + }), + None => None, + }; + + loop { + match stream_option { + Some(ref mut stream) => { + if stream.next().await.is_none() { + tracing::error!("Error blocks subscription stream ended"); + return Err(anyhow!("Error blocks subscription stream ended")); + } + } + None => { + time::sleep(POLL_INTERVAL).await; + } + } + + let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; + if latest_safe_block > *last_safe_block_processed { + let mut from = latest_safe_block.saturating_sub(RETRY_PREVIOUS_BLOCKS); + + // In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10) + // TODO: add a metric for this in separate PR. We need alerts + // But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and + // last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc + // to be in consistency after this much time. + if from > *last_safe_block_processed { + from = *last_safe_block_processed; + } + match tx + .send(BlockRange { + from, + to: latest_safe_block, + }) + .await + { + Ok(_) => { + tracing::info!( + from_block = from, + to_block = &latest_safe_block, + "Block range sent to handle events", + ); + *last_safe_block_processed = latest_safe_block; + } + Err(e) => { + tracing::error!( + from_block = from, + to_block = &latest_safe_block, + "Error while sending block range to handle events. These will be handled in next call. error: {:?}", + e + ); + } + }; + } + } +} + +/// It waits on rx channel to receive block ranges and then calls process_block_range to process them +/// for each configured block delay. +#[tracing::instrument(skip_all)] +#[allow(clippy::too_many_arguments)] +pub async fn process_new_blocks( + chain_state: BlockchainState, + mut rx: mpsc::Receiver, + contract: Arc, + gas_limit: U256, + escalation_policy: EscalationPolicyConfig, + metrics: Arc, + fulfilled_requests_cache: Arc>>, + block_delays: Vec, +) { + tracing::info!("Waiting for new block ranges to process"); + loop { + if let Some(block_range) = rx.recv().await { + // Process blocks immediately first + process_block_range( + block_range.clone(), + Arc::clone(&contract), + gas_limit, + escalation_policy.clone(), + chain_state.clone(), + metrics.clone(), + fulfilled_requests_cache.clone(), + ) + .in_current_span() + .await; + + // Then process with each configured delay + for delay in &block_delays { + let adjusted_range = BlockRange { + from: block_range.from.saturating_sub(*delay), + to: block_range.to.saturating_sub(*delay), + }; + process_block_range( + adjusted_range, + Arc::clone(&contract), + gas_limit, + escalation_policy.clone(), + chain_state.clone(), + metrics.clone(), + fulfilled_requests_cache.clone(), + ) + .in_current_span() + .await; + } + } + } +} + +/// Processes the backlog_range for a chain. +#[tracing::instrument(skip_all)] +pub async fn process_backlog( + backlog_range: BlockRange, + contract: Arc, + gas_limit: U256, + escalation_policy: EscalationPolicyConfig, + chain_state: BlockchainState, + metrics: Arc, + fulfilled_requests_cache: Arc>>, +) { + tracing::info!("Processing backlog"); + process_block_range( + backlog_range, + contract, + gas_limit, + escalation_policy, + chain_state, + metrics, + fulfilled_requests_cache, + ) + .in_current_span() + .await; + tracing::info!("Backlog processed"); +} diff --git a/apps/fortuna/src/keeper/commitment.rs b/apps/fortuna/src/keeper/commitment.rs new file mode 100644 index 0000000000..c4c2344693 --- /dev/null +++ b/apps/fortuna/src/keeper/commitment.rs @@ -0,0 +1,63 @@ +use { + crate::{ + api::BlockchainState, chain::ethereum::InstrumentedSignablePythContract, + eth_utils::utils::send_and_confirm, keeper::block::get_latest_safe_block, + }, + anyhow::{anyhow, Result}, + std::sync::Arc, + tokio::time::{self, Duration}, + tracing::{self, Instrument}, +}; + +/// Check whether we need to manually update the commitments to reduce numHashes for future +/// requests and reduce the gas cost of the reveal. +const UPDATE_COMMITMENTS_INTERVAL: Duration = Duration::from_secs(30); +const UPDATE_COMMITMENTS_THRESHOLD_FACTOR: f64 = 0.95; + +#[tracing::instrument(name = "update_commitments", skip_all)] +pub async fn update_commitments_loop( + contract: Arc, + chain_state: BlockchainState, +) { + loop { + if let Err(e) = update_commitments_if_necessary(contract.clone(), &chain_state) + .in_current_span() + .await + { + tracing::error!("Update commitments. error: {:?}", e); + } + time::sleep(UPDATE_COMMITMENTS_INTERVAL).await; + } +} + +pub async fn update_commitments_if_necessary( + contract: Arc, + chain_state: &BlockchainState, +) -> Result<()> { + //TODO: we can reuse the result from the last call from the watch_blocks thread to reduce RPCs + let latest_safe_block = get_latest_safe_block(chain_state).in_current_span().await; + let provider_address = chain_state.provider_address; + let provider_info = contract + .get_provider_info(provider_address) + .block(latest_safe_block) // To ensure we are not revealing sooner than we should + .call() + .await + .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; + if provider_info.max_num_hashes == 0 { + return Ok(()); + } + let threshold = + ((provider_info.max_num_hashes as f64) * UPDATE_COMMITMENTS_THRESHOLD_FACTOR) as u64; + if provider_info.sequence_number - provider_info.current_commitment_sequence_number > threshold + { + let seq_number = provider_info.sequence_number - 1; + let provider_revelation = chain_state + .state + .reveal(seq_number) + .map_err(|e| anyhow!("Error revealing: {:?}", e))?; + let contract_call = + contract.advance_provider_commitment(provider_address, seq_number, provider_revelation); + send_and_confirm(contract_call).await?; + } + Ok(()) +} diff --git a/apps/fortuna/src/keeper/fee.rs b/apps/fortuna/src/keeper/fee.rs new file mode 100644 index 0000000000..66ff53ba81 --- /dev/null +++ b/apps/fortuna/src/keeper/fee.rs @@ -0,0 +1,253 @@ +use { + crate::{ + api::BlockchainState, chain::ethereum::InstrumentedSignablePythContract, + eth_utils::utils::estimate_tx_cost, eth_utils::utils::send_and_confirm, + keeper::AccountLabel, keeper::ChainId, keeper::KeeperMetrics, + }, + anyhow::{anyhow, Result}, + ethers::{ + middleware::Middleware, + signers::Signer, + types::{Address, U256}, + }, + std::sync::Arc, + tokio::time::{self, Duration}, + tracing::{self, Instrument}, +}; + +#[tracing::instrument(name = "withdraw_fees", skip_all, fields())] +pub async fn withdraw_fees_wrapper( + contract: Arc, + provider_address: Address, + poll_interval: Duration, + min_balance: U256, +) { + loop { + if let Err(e) = withdraw_fees_if_necessary(contract.clone(), provider_address, min_balance) + .in_current_span() + .await + { + tracing::error!("Withdrawing fees. error: {:?}", e); + } + time::sleep(poll_interval).await; + } +} + +/// Withdraws accumulated fees in the contract as needed to maintain the balance of the keeper wallet. +pub async fn withdraw_fees_if_necessary( + contract: Arc, + provider_address: Address, + min_balance: U256, +) -> Result<()> { + let provider = contract.provider(); + let wallet = contract.wallet(); + + let keeper_balance = provider + .get_balance(wallet.address(), None) + .await + .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?; + + let provider_info = contract + .get_provider_info(provider_address) + .call() + .await + .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; + + if provider_info.fee_manager != wallet.address() { + return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", provider, provider_info.fee_manager, wallet.address())); + } + + let fees = provider_info.accrued_fees_in_wei; + + if keeper_balance < min_balance && U256::from(fees) > min_balance { + tracing::info!("Claiming accrued fees..."); + let contract_call = contract.withdraw_as_fee_manager(provider_address, fees); + send_and_confirm(contract_call).await?; + } else if keeper_balance < min_balance { + tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up.", keeper_balance, min_balance) + } + + Ok(()) +} + +#[tracing::instrument(name = "adjust_fee", skip_all)] +#[allow(clippy::too_many_arguments)] +pub async fn adjust_fee_wrapper( + contract: Arc, + chain_state: BlockchainState, + provider_address: Address, + poll_interval: Duration, + legacy_tx: bool, + gas_limit: u64, + min_profit_pct: u64, + target_profit_pct: u64, + max_profit_pct: u64, + min_fee_wei: u128, + metrics: Arc, +) { + // The maximum balance of accrued fees + provider wallet balance. None if we haven't observed a value yet. + let mut high_water_pnl: Option = None; + // The sequence number where the keeper last updated the on-chain fee. None if we haven't observed it yet. + let mut sequence_number_of_last_fee_update: Option = None; + loop { + if let Err(e) = adjust_fee_if_necessary( + contract.clone(), + chain_state.id.clone(), + provider_address, + legacy_tx, + gas_limit, + min_profit_pct, + target_profit_pct, + max_profit_pct, + min_fee_wei, + &mut high_water_pnl, + &mut sequence_number_of_last_fee_update, + metrics.clone(), + ) + .in_current_span() + .await + { + tracing::error!("Withdrawing fees. error: {:?}", e); + } + time::sleep(poll_interval).await; + } +} + +/// Adjust the fee charged by the provider to ensure that it is profitable at the prevailing gas price. +/// This method targets a fee as a function of the maximum cost of the callback, +/// c = (gas_limit) * (current gas price), with min_fee_wei as a lower bound on the fee. +/// +/// The method then updates the on-chain fee if all of the following are satisfied: +/// - the on-chain fee does not fall into an interval [c*min_profit, c*max_profit]. The tolerance +/// factor prevents the on-chain fee from changing with every single gas price fluctuation. +/// Profit scalars are specified in percentage units, min_profit = (min_profit_pct + 100) / 100 +/// - either the fee is increasing or the keeper is earning a profit -- i.e., fees only decrease when the keeper is profitable +/// - at least one random number has been requested since the last fee update +/// +/// These conditions are intended to make sure that the keeper is profitable while also minimizing the number of fee +/// update transactions. +#[allow(clippy::too_many_arguments)] +pub async fn adjust_fee_if_necessary( + contract: Arc, + chain_id: ChainId, + provider_address: Address, + legacy_tx: bool, + gas_limit: u64, + min_profit_pct: u64, + target_profit_pct: u64, + max_profit_pct: u64, + min_fee_wei: u128, + high_water_pnl: &mut Option, + sequence_number_of_last_fee_update: &mut Option, + metrics: Arc, +) -> Result<()> { + let provider_info = contract + .get_provider_info(provider_address) + .call() + .await + .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; + + if provider_info.fee_manager != contract.wallet().address() { + return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", contract.provider(), provider_info.fee_manager, contract.wallet().address())); + } + + // Calculate target window for the on-chain fee. + let middleware = contract.client(); + let max_callback_cost: u128 = estimate_tx_cost(middleware, legacy_tx, gas_limit.into()) + .await + .map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?; + + let account_label = AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }; + + metrics + .gas_price_estimate + .get_or_create(&account_label) + .set((max_callback_cost / u128::from(gas_limit)) as f64 / 1e9); + + let target_fee_min = std::cmp::max( + (max_callback_cost * u128::from(min_profit_pct)) / 100, + min_fee_wei, + ); + let target_fee = std::cmp::max( + (max_callback_cost * u128::from(target_profit_pct)) / 100, + min_fee_wei, + ); + metrics + .target_provider_fee + .get_or_create(&account_label) + .set(((max_callback_cost * u128::from(target_profit_pct)) / 100) as f64 / 1e18); + + let target_fee_max = std::cmp::max( + (max_callback_cost * u128::from(max_profit_pct)) / 100, + min_fee_wei, + ); + + // Calculate current P&L to determine if we can reduce fees. + let current_keeper_balance = contract + .provider() + .get_balance(contract.wallet().address(), None) + .await + .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?; + let current_keeper_fees = U256::from(provider_info.accrued_fees_in_wei); + let current_pnl = current_keeper_balance + current_keeper_fees; + + let can_reduce_fees = match high_water_pnl { + Some(x) => current_pnl >= *x, + None => false, + }; + + // Determine if the chain has seen activity since the last fee update. + let is_chain_active: bool = match sequence_number_of_last_fee_update { + Some(n) => provider_info.sequence_number > *n, + None => { + // We don't want to adjust the fees on server start for unused chains, hence false here. + false + } + }; + + let provider_fee: u128 = provider_info.fee_in_wei; + if is_chain_active + && ((provider_fee > target_fee_max && can_reduce_fees) || provider_fee < target_fee_min) + { + tracing::info!( + "Adjusting fees. Current: {:?} Target: {:?}", + provider_fee, + target_fee + ); + let contract_call = contract.set_provider_fee_as_fee_manager(provider_address, target_fee); + send_and_confirm(contract_call).await?; + + *sequence_number_of_last_fee_update = Some(provider_info.sequence_number); + } else { + tracing::info!( + "Skipping fee adjustment. Current: {:?} Target: {:?} [{:?}, {:?}] Current Sequence Number: {:?} Last updated sequence number {:?} Current pnl: {:?} High water pnl: {:?}", + provider_fee, + target_fee, + target_fee_min, + target_fee_max, + provider_info.sequence_number, + sequence_number_of_last_fee_update, + current_pnl, + high_water_pnl + ) + } + + // Update high water pnl + *high_water_pnl = Some(std::cmp::max( + current_pnl, + high_water_pnl.unwrap_or(U256::from(0)), + )); + + // Update sequence number on server start. + match sequence_number_of_last_fee_update { + Some(_) => (), + None => { + *sequence_number_of_last_fee_update = Some(provider_info.sequence_number); + } + }; + + Ok(()) +} diff --git a/apps/fortuna/src/keeper/keeper_metrics.rs b/apps/fortuna/src/keeper/keeper_metrics.rs new file mode 100644 index 0000000000..53eb927445 --- /dev/null +++ b/apps/fortuna/src/keeper/keeper_metrics.rs @@ -0,0 +1,261 @@ +use { + ethers::types::Address, + prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram}, + registry::Registry, + }, + std::sync::atomic::AtomicU64, + std::sync::Arc, + tokio::sync::RwLock, +}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct AccountLabel { + pub chain_id: String, + pub address: String, +} + +pub struct KeeperMetrics { + pub current_sequence_number: Family, + pub end_sequence_number: Family, + pub balance: Family>, + pub collected_fee: Family>, + pub current_fee: Family>, + pub target_provider_fee: Family>, + pub total_gas_spent: Family>, + pub total_gas_fee_spent: Family>, + pub requests: Family, + pub requests_processed: Family, + pub requests_processed_success: Family, + pub requests_processed_failure: Family, + pub requests_reprocessed: Family, + pub reveals: Family, + pub request_duration_ms: Family, + pub retry_count: Family, + pub final_gas_multiplier: Family, + pub final_fee_multiplier: Family, + pub gas_price_estimate: Family>, +} + +impl Default for KeeperMetrics { + fn default() -> Self { + Self { + current_sequence_number: Family::default(), + end_sequence_number: Family::default(), + balance: Family::default(), + collected_fee: Family::default(), + current_fee: Family::default(), + target_provider_fee: Family::default(), + total_gas_spent: Family::default(), + total_gas_fee_spent: Family::default(), + requests: Family::default(), + requests_processed: Family::default(), + requests_processed_success: Family::default(), + requests_processed_failure: Family::default(), + requests_reprocessed: Family::default(), + reveals: Family::default(), + request_duration_ms: Family::new_with_constructor(|| { + Histogram::new( + vec![ + 1000.0, 2500.0, 5000.0, 7500.0, 10000.0, 20000.0, 30000.0, 40000.0, + 50000.0, 60000.0, 120000.0, 180000.0, 240000.0, 300000.0, 600000.0, + ] + .into_iter(), + ) + }), + retry_count: Family::new_with_constructor(|| { + Histogram::new(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 20.0].into_iter()) + }), + final_gas_multiplier: Family::new_with_constructor(|| { + Histogram::new( + vec![100.0, 125.0, 150.0, 200.0, 300.0, 400.0, 500.0, 600.0].into_iter(), + ) + }), + final_fee_multiplier: Family::new_with_constructor(|| { + Histogram::new(vec![100.0, 110.0, 120.0, 140.0, 160.0, 180.0, 200.0].into_iter()) + }), + gas_price_estimate: Family::default(), + } + } +} + +impl KeeperMetrics { + pub async fn new( + registry: Arc>, + chain_labels: Vec<(String, Address)>, + ) -> Self { + let mut writable_registry = registry.write().await; + let keeper_metrics = KeeperMetrics::default(); + + writable_registry.register( + "current_sequence_number", + "The sequence number for a new request", + keeper_metrics.current_sequence_number.clone(), + ); + + writable_registry.register( + "end_sequence_number", + "The sequence number for the end request", + keeper_metrics.end_sequence_number.clone(), + ); + + writable_registry.register( + "requests", + "Number of requests received through events", + keeper_metrics.requests.clone(), + ); + + writable_registry.register( + "requests_processed", + "Number of requests processed", + keeper_metrics.requests_processed.clone(), + ); + + writable_registry.register( + "requests_processed_success", + "Number of requests processed successfully", + keeper_metrics.requests_processed_success.clone(), + ); + + writable_registry.register( + "requests_processed_failure", + "Number of requests processed with failure", + keeper_metrics.requests_processed_failure.clone(), + ); + + writable_registry.register( + "reveal", + "Number of reveals", + keeper_metrics.reveals.clone(), + ); + + writable_registry.register( + "balance", + "Balance of the keeper", + keeper_metrics.balance.clone(), + ); + + writable_registry.register( + "collected_fee", + "Collected fee on the contract", + keeper_metrics.collected_fee.clone(), + ); + + writable_registry.register( + "current_fee", + "Current fee charged by the provider", + keeper_metrics.current_fee.clone(), + ); + + writable_registry.register( + "target_provider_fee", + "Target fee in ETH -- differs from current_fee in that this is the goal, and current_fee is the on-chain value.", + keeper_metrics.target_provider_fee.clone(), + ); + + writable_registry.register( + "total_gas_spent", + "Total gas spent revealing requests", + keeper_metrics.total_gas_spent.clone(), + ); + + writable_registry.register( + "total_gas_fee_spent", + "Total amount of ETH spent on gas for revealing requests", + keeper_metrics.total_gas_fee_spent.clone(), + ); + + writable_registry.register( + "requests_reprocessed", + "Number of requests reprocessed", + keeper_metrics.requests_reprocessed.clone(), + ); + + writable_registry.register( + "request_duration_ms", + "Time taken to process each successful callback request in milliseconds", + keeper_metrics.request_duration_ms.clone(), + ); + + writable_registry.register( + "retry_count", + "Number of retries for successful transactions", + keeper_metrics.retry_count.clone(), + ); + + writable_registry.register( + "final_gas_multiplier", + "Final gas multiplier percentage for successful transactions", + keeper_metrics.final_gas_multiplier.clone(), + ); + + writable_registry.register( + "final_fee_multiplier", + "Final fee multiplier percentage for successful transactions", + keeper_metrics.final_fee_multiplier.clone(), + ); + + writable_registry.register( + "gas_price_estimate", + "Gas price estimate for the blockchain (in gwei)", + keeper_metrics.gas_price_estimate.clone(), + ); + + // *Important*: When adding a new metric: + // 1. Register it above using `writable_registry.register(...)` + // 2. Add a get_or_create call in the loop below to initialize it for each chain/provider pair + for (chain_id, provider_address) in chain_labels { + let account_label = AccountLabel { + chain_id, + address: provider_address.to_string(), + }; + + let _ = keeper_metrics + .current_sequence_number + .get_or_create(&account_label); + let _ = keeper_metrics + .end_sequence_number + .get_or_create(&account_label); + let _ = keeper_metrics.balance.get_or_create(&account_label); + let _ = keeper_metrics.collected_fee.get_or_create(&account_label); + let _ = keeper_metrics.current_fee.get_or_create(&account_label); + let _ = keeper_metrics + .target_provider_fee + .get_or_create(&account_label); + let _ = keeper_metrics.total_gas_spent.get_or_create(&account_label); + let _ = keeper_metrics + .total_gas_fee_spent + .get_or_create(&account_label); + let _ = keeper_metrics.requests.get_or_create(&account_label); + let _ = keeper_metrics + .requests_processed + .get_or_create(&account_label); + let _ = keeper_metrics + .requests_processed_success + .get_or_create(&account_label); + let _ = keeper_metrics + .requests_processed_failure + .get_or_create(&account_label); + let _ = keeper_metrics + .requests_reprocessed + .get_or_create(&account_label); + let _ = keeper_metrics.reveals.get_or_create(&account_label); + let _ = keeper_metrics + .request_duration_ms + .get_or_create(&account_label); + let _ = keeper_metrics.retry_count.get_or_create(&account_label); + let _ = keeper_metrics + .final_gas_multiplier + .get_or_create(&account_label); + let _ = keeper_metrics + .final_fee_multiplier + .get_or_create(&account_label); + let _ = keeper_metrics + .gas_price_estimate + .get_or_create(&account_label); + } + + keeper_metrics + } +} diff --git a/apps/fortuna/src/keeper/process_event.rs b/apps/fortuna/src/keeper/process_event.rs new file mode 100644 index 0000000000..425f5e0ae1 --- /dev/null +++ b/apps/fortuna/src/keeper/process_event.rs @@ -0,0 +1,308 @@ +use { + super::keeper_metrics::{AccountLabel, KeeperMetrics}, + crate::{ + api::BlockchainState, + chain::{ethereum::InstrumentedSignablePythContract, reader::RequestedWithCallbackEvent}, + config::EscalationPolicyConfig, + }, + anyhow::{anyhow, Result}, + backoff::ExponentialBackoff, + ethers::middleware::Middleware, + ethers::signers::Signer, + ethers::types::U256, + std::sync::{atomic::AtomicU64, Arc}, + tokio::time::{timeout, Duration}, + tracing::{self, Instrument}, +}; + +/// Process an event with backoff. It will retry the reveal on failure for 5 minutes. +#[tracing::instrument(name = "process_event_with_backoff", skip_all, fields( + sequence_number = event.sequence_number +))] +pub async fn process_event_with_backoff( + event: RequestedWithCallbackEvent, + chain_state: BlockchainState, + contract: Arc, + gas_limit: U256, + escalation_policy: EscalationPolicyConfig, + metrics: Arc, +) { + let start_time = std::time::Instant::now(); + let account_label = AccountLabel { + chain_id: chain_state.id.clone(), + address: chain_state.provider_address.to_string(), + }; + + metrics.requests.get_or_create(&account_label).inc(); + tracing::info!("Started processing event"); + let backoff = ExponentialBackoff { + max_elapsed_time: Some(Duration::from_secs(300)), // retry for 5 minutes + ..Default::default() + }; + + let num_retries = Arc::new(AtomicU64::new(0)); + + let success = backoff::future::retry_notify( + backoff, + || async { + let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); + + let gas_multiplier_pct = escalation_policy.get_gas_multiplier_pct(num_retries); + let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries); + process_event( + &event, + &chain_state, + &contract, + gas_limit.saturating_mul(escalation_policy.gas_limit_tolerance_pct.into()) / 100, + gas_multiplier_pct, + fee_multiplier_pct, + metrics.clone(), + ) + .await + }, + |e, dur| { + let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed); + tracing::error!( + "Error on retry {} at duration {:?}: {}", + retry_number, + dur, + e + ); + num_retries.store(retry_number + 1, std::sync::atomic::Ordering::Relaxed); + }, + ) + .await; + + let duration = start_time.elapsed(); + + metrics + .requests_processed + .get_or_create(&account_label) + .inc(); + + match success { + Ok(()) => { + tracing::info!("Processed event successfully in {:?}", duration); + + metrics + .requests_processed_success + .get_or_create(&account_label) + .inc(); + + metrics + .request_duration_ms + .get_or_create(&account_label) + .observe(duration.as_millis() as f64); + + // Track retry count, gas multiplier, and fee multiplier for successful transactions + let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed); + metrics + .retry_count + .get_or_create(&account_label) + .observe(num_retries as f64); + + let gas_multiplier = escalation_policy.get_gas_multiplier_pct(num_retries); + metrics + .final_gas_multiplier + .get_or_create(&account_label) + .observe(gas_multiplier as f64); + + let fee_multiplier = escalation_policy.get_fee_multiplier_pct(num_retries); + metrics + .final_fee_multiplier + .get_or_create(&account_label) + .observe(fee_multiplier as f64); + } + Err(e) => { + // In case the callback did not succeed, we double-check that the request is still on-chain. + // If the request is no longer on-chain, one of the transactions we sent likely succeeded, but + // the RPC gave us an error anyway. + let req = chain_state + .contract + .get_request(event.provider_address, event.sequence_number) + .await; + + tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req); + + // We only count failures for cases where we are completely certain that the callback failed. + if req.is_ok_and(|x| x.is_some()) { + metrics + .requests_processed_failure + .get_or_create(&account_label) + .inc(); + } + } + } +} + +const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30; + +/// Process a callback on a chain. It estimates the gas for the reveal with callback and +/// submits the transaction if the gas estimate is below the gas limit. +/// It will return a permanent or transient error depending on the error type and whether +/// retry is possible or not. +pub async fn process_event( + event: &RequestedWithCallbackEvent, + chain_config: &BlockchainState, + contract: &InstrumentedSignablePythContract, + gas_limit: U256, + // A value of 100 submits the tx with the same gas/fee as the estimate. + gas_estimate_multiplier_pct: u64, + fee_estimate_multiplier_pct: u64, + metrics: Arc, +) -> Result<(), backoff::Error> { + // ignore requests that are not for the configured provider + if chain_config.provider_address != event.provider_address { + return Ok(()); + } + let provider_revelation = chain_config + .state + .reveal(event.sequence_number) + .map_err(|e| backoff::Error::permanent(anyhow!("Error revealing: {:?}", e)))?; + + let gas_estimate_res = chain_config + .contract + .estimate_reveal_with_callback_gas( + contract.wallet().address(), + event.provider_address, + event.sequence_number, + event.user_random_number, + provider_revelation, + ) + .in_current_span() + .await; + + let gas_estimate = gas_estimate_res.map_err(|e| { + // we consider the error transient even if it is a contract revert since + // it can be because of routing to a lagging RPC node. Retrying such errors will + // incur a few additional RPC calls, but it is fine. + backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e)) + })?; + + // The gas limit on the simulated transaction is the configured gas limit on the chain, + // but we are willing to pad the gas a bit to ensure reliable submission. + if gas_estimate > gas_limit { + return Err(backoff::Error::permanent(anyhow!( + "Gas estimate for reveal with callback is higher than the gas limit {} > {}", + gas_estimate, + gas_limit + ))); + } + + // Pad the gas estimate after checking it against the simulation gas limit, ensuring that + // the padded gas estimate doesn't exceed the maximum amount of gas we are willing to use. + let gas_estimate = gas_estimate.saturating_mul(gas_estimate_multiplier_pct.into()) / 100; + + let contract_call = contract + .reveal_with_callback( + event.provider_address, + event.sequence_number, + event.user_random_number, + provider_revelation, + ) + .gas(gas_estimate); + + let client = contract.client(); + let mut transaction = contract_call.tx.clone(); + + // manually fill the tx with the gas info, so we can log the details in case of error + client + .fill_transaction(&mut transaction, None) + .await + .map_err(|e| { + backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e)) + })?; + + // Apply the fee escalation policy. Note: the unwrap_or_default should never default as we have a gas oracle + // in the client that sets the gas price. + transaction.set_gas_price( + transaction + .gas_price() + .unwrap_or_default() + .saturating_mul(fee_estimate_multiplier_pct.into()) + / 100, + ); + + let pending_tx = client + .send_transaction(transaction.clone(), None) + .await + .map_err(|e| { + backoff::Error::transient(anyhow!( + "Error submitting the reveal transaction. Tx:{:?}, Error:{:?}", + transaction, + e + )) + })?; + + let reset_nonce = || { + let nonce_manager = contract.client_ref().inner().inner(); + nonce_manager.reset(); + }; + + let pending_receipt = timeout( + Duration::from_secs(TX_CONFIRMATION_TIMEOUT_SECS), + pending_tx, + ) + .await + .map_err(|_| { + // Tx can get stuck in mempool without any progress if the nonce is too high + // in this case ethers internal polling will not reduce the number of retries + // and keep retrying indefinitely. So we set a manual timeout here and reset the nonce. + reset_nonce(); + backoff::Error::transient(anyhow!( + "Tx stuck in mempool. Resetting nonce. Tx:{:?}", + transaction + )) + })?; + + let receipt = pending_receipt + .map_err(|e| { + backoff::Error::transient(anyhow!( + "Error waiting for transaction receipt. Tx:{:?} Error:{:?}", + transaction, + e + )) + })? + .ok_or_else(|| { + // RPC may not return an error on tx submission if the nonce is too high. + // But we will never get a receipt. So we reset the nonce manager to get the correct nonce. + reset_nonce(); + backoff::Error::transient(anyhow!( + "Can't verify the reveal, probably dropped from mempool. Resetting nonce. Tx:{:?}", + transaction + )) + })?; + + tracing::info!( + sequence_number = &event.sequence_number, + transaction_hash = &receipt.transaction_hash.to_string(), + gas_used = ?receipt.gas_used, + "Revealed with res: {:?}", + receipt + ); + + let account_label = AccountLabel { + chain_id: chain_config.id.clone(), + address: chain_config.provider_address.to_string(), + }; + + if let Some(gas_used) = receipt.gas_used { + let gas_used_float = gas_used.as_u128() as f64 / 1e18; + metrics + .total_gas_spent + .get_or_create(&account_label) + .inc_by(gas_used_float); + + if let Some(gas_price) = receipt.effective_gas_price { + let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18; + metrics + .total_gas_fee_spent + .get_or_create(&account_label) + .inc_by(gas_fee); + } + } + + metrics.reveals.get_or_create(&account_label).inc(); + + Ok(()) +} diff --git a/apps/fortuna/src/keeper/track.rs b/apps/fortuna/src/keeper/track.rs new file mode 100644 index 0000000000..5f9fd1ad8e --- /dev/null +++ b/apps/fortuna/src/keeper/track.rs @@ -0,0 +1,102 @@ +use { + super::keeper_metrics::{AccountLabel, KeeperMetrics}, + crate::{ + api::ChainId, chain::ethereum::InstrumentedPythContract, + eth_utils::traced_client::TracedClient, + }, + ethers::middleware::Middleware, + ethers::{providers::Provider, types::Address}, + std::sync::Arc, + tracing, +}; + +/// tracks the balance of the given address on the given chain +/// if there was an error, the function will just return +#[tracing::instrument(skip_all)] +pub async fn track_balance( + chain_id: String, + provider: Arc>, + address: Address, + metrics: Arc, +) { + let balance = match provider.get_balance(address, None).await { + // This conversion to u128 is fine as the total balance will never cross the limits + // of u128 practically. + Ok(r) => r.as_u128(), + Err(e) => { + tracing::error!("Error while getting balance. error: {:?}", e); + return; + } + }; + // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus. + // The balance is in wei, so we need to divide by 1e18 to convert it to eth. + let balance = balance as f64 / 1e18; + + metrics + .balance + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: address.to_string(), + }) + .set(balance); +} + +/// tracks the collected fees and the hashchain data of the given provider address on the given chain +/// if there is a error the function will just return +#[tracing::instrument(skip_all)] +pub async fn track_provider( + chain_id: ChainId, + contract: InstrumentedPythContract, + provider_address: Address, + metrics: Arc, +) { + let provider_info = match contract.get_provider_info(provider_address).call().await { + Ok(info) => info, + Err(e) => { + tracing::error!("Error while getting provider info. error: {:?}", e); + return; + } + }; + + // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. + // The fee is in wei, so we divide by 1e18 to convert it to eth. + let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18; + let current_fee: f64 = provider_info.fee_in_wei as f64 / 1e18; + + let current_sequence_number = provider_info.sequence_number; + let end_sequence_number = provider_info.end_sequence_number; + + metrics + .collected_fee + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + .set(collected_fee); + + metrics + .current_fee + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + .set(current_fee); + + metrics + .current_sequence_number + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + // sequence_number type on chain is u64 but practically it will take + // a long time for it to cross the limits of i64. + // currently prometheus only supports i64 for Gauge types + .set(current_sequence_number as i64); + metrics + .end_sequence_number + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + .set(end_sequence_number as i64); +} diff --git a/apps/fortuna/src/main.rs b/apps/fortuna/src/main.rs index c09ef12e9c..de72834a1c 100644 --- a/apps/fortuna/src/main.rs +++ b/apps/fortuna/src/main.rs @@ -6,6 +6,7 @@ pub mod api; pub mod chain; pub mod command; pub mod config; +pub mod eth_utils; pub mod keeper; pub mod state;