diff --git a/apps/argus/Cargo.lock b/apps/argus/Cargo.lock index 1d30bf76ce..7569c01028 100644 --- a/apps/argus/Cargo.lock +++ b/apps/argus/Cargo.lock @@ -1594,7 +1594,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "7.4.7" +version = "7.4.8" dependencies = [ "anyhow", "axum", diff --git a/apps/argus/src/api.rs b/apps/argus/src/api.rs index 9aacd2f10c..a6663aefc8 100644 --- a/apps/argus/src/api.rs +++ b/apps/argus/src/api.rs @@ -1,6 +1,6 @@ use { crate::{ - chain::reader::{BlockNumber, BlockStatus, EntropyReader}, + chain::reader::{BlockNumber, BlockStatus}, }, anyhow::Result, axum::{ @@ -16,7 +16,7 @@ use { metrics::{counter::Counter, family::Family}, registry::Registry, }, - std::{collections::HashMap, sync::Arc}, + std::sync::Arc, tokio::sync::RwLock, url::Url, }; @@ -40,8 +40,6 @@ pub struct ApiMetrics { #[derive(Clone)] pub struct ApiState { - pub chains: Arc>, - pub metrics_registry: Arc>, /// Prometheus metrics @@ -50,7 +48,6 @@ pub struct ApiState { impl ApiState { pub async fn new( - chains: HashMap, metrics_registry: Arc>, ) -> ApiState { let metrics = ApiMetrics { @@ -65,7 +62,6 @@ impl ApiState { ); ApiState { - chains: Arc::new(chains), metrics: Arc::new(metrics), metrics_registry, } @@ -77,8 +73,6 @@ impl ApiState { pub struct BlockchainState { /// The chain id for this blockchain, useful for logging pub id: ChainId, - /// The contract that the server is fulfilling requests for. - pub contract: Arc, /// The address of the provider that this server is operating for. pub provider_address: Address, /// The server will wait for this many block confirmations of a request before revealing diff --git a/apps/argus/src/chain/ethereum.rs b/apps/argus/src/chain/ethereum.rs index 67b7e75807..020ff77a21 100644 --- a/apps/argus/src/chain/ethereum.rs +++ b/apps/argus/src/chain/ethereum.rs @@ -2,7 +2,7 @@ use { crate::{ api::ChainId, chain::reader::{ - self, BlockNumber, BlockStatus, EntropyReader, RequestedWithCallbackEvent, + self, BlockNumber, BlockStatus, RequestedWithCallbackEvent, }, config::EthereumConfig, }, @@ -13,7 +13,6 @@ use { traced_client::{RpcMetrics, TracedClient}, }, anyhow::{anyhow, Error, Result}, - axum::async_trait, ethers::{ abi::RawLog, contract::{abigen, EthLogDecode}, @@ -199,9 +198,9 @@ impl InstrumentedPythContract { } } -#[async_trait] -impl EntropyReader for PythRandom> { - async fn get_request( +impl PythRandom { + + pub async fn get_request_wrapper( &self, provider_address: Address, sequence_number: u64, @@ -226,7 +225,7 @@ impl EntropyReader for PythRandom> { } } - async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result { + pub async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result { let block_number: EthersBlockNumber = confirmed_block_status.into(); let block = self .client() @@ -240,7 +239,7 @@ impl EntropyReader for PythRandom> { .as_u64()) } - async fn get_request_with_callback_events( + pub async fn get_request_with_callback_events( &self, from_block: BlockNumber, to_block: BlockNumber, @@ -260,7 +259,7 @@ impl EntropyReader for PythRandom> { .collect()) } - async fn estimate_reveal_with_callback_gas( + pub async fn estimate_reveal_with_callback_gas( &self, sender: Address, provider: Address, diff --git a/apps/argus/src/chain/reader.rs b/apps/argus/src/chain/reader.rs index fde5fd48bc..853ea7b2ba 100644 --- a/apps/argus/src/chain/reader.rs +++ b/apps/argus/src/chain/reader.rs @@ -1,7 +1,5 @@ use { - anyhow::Result, - axum::async_trait, - ethers::types::{Address, BlockNumber as EthersBlockNumber, U256}, + ethers::types::{Address, BlockNumber as EthersBlockNumber}, }; pub type BlockNumber = u64; @@ -36,34 +34,6 @@ pub struct RequestedWithCallbackEvent { pub provider_address: Address, } -/// EntropyReader is the read-only interface of the Entropy contract. -#[async_trait] -pub trait EntropyReader: Send + Sync { - /// Get an in-flight request (if it exists) - /// Note that if we support additional blockchains in the future, the type of `provider` may - /// need to become more generic. - async fn get_request(&self, provider: Address, sequence_number: u64) - -> Result>; - - async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result; - - async fn get_request_with_callback_events( - &self, - from_block: BlockNumber, - to_block: BlockNumber, - ) -> Result>; - - /// Estimate the gas required to reveal a random number with a callback. - async fn estimate_reveal_with_callback_gas( - &self, - sender: Address, - provider: Address, - sequence_number: u64, - user_random_number: [u8; 32], - provider_revelation: [u8; 32], - ) -> Result; -} - /// An in-flight request stored in the contract. /// (This struct is missing many fields that are defined in the contract, as they /// aren't used in fortuna anywhere. Feel free to add any missing fields as necessary.) @@ -75,110 +45,3 @@ pub struct Request { pub block_number: BlockNumber, pub use_blockhash: bool, } - -#[cfg(test)] -pub mod mock { - use { - crate::chain::reader::{BlockNumber, BlockStatus, EntropyReader, Request}, - anyhow::Result, - axum::async_trait, - ethers::types::{Address, U256}, - std::sync::RwLock, - }; - - /// Mock version of the entropy contract intended for testing. - /// This class is internally locked to allow tests to modify the in-flight requests while - /// the API is also holding a pointer to the same data structure. - pub struct MockEntropyReader { - block_number: RwLock, - /// The set of requests that are currently in-flight. - requests: RwLock>, - } - - impl MockEntropyReader { - pub fn with_requests( - block_number: BlockNumber, - requests: &[(Address, u64, BlockNumber, bool)], - ) -> MockEntropyReader { - MockEntropyReader { - block_number: RwLock::new(block_number), - requests: RwLock::new( - requests - .iter() - .map(|&(a, s, b, u)| Request { - provider: a, - sequence_number: s, - block_number: b, - use_blockhash: u, - }) - .collect(), - ), - } - } - - /// Insert a new request into the set of in-flight requests. - pub fn insert( - &self, - provider: Address, - sequence: u64, - block_number: BlockNumber, - use_blockhash: bool, - ) -> &Self { - self.requests.write().unwrap().push(Request { - provider, - sequence_number: sequence, - block_number, - use_blockhash, - }); - self - } - - pub fn set_block_number(&self, block_number: BlockNumber) -> &Self { - *(self.block_number.write().unwrap()) = block_number; - self - } - } - - #[async_trait] - impl EntropyReader for MockEntropyReader { - async fn get_request( - &self, - provider: Address, - sequence_number: u64, - ) -> Result> { - Ok(self - .requests - .read() - .unwrap() - .iter() - .find(|&r| r.sequence_number == sequence_number && r.provider == provider) - .map(|r| (*r).clone())) - } - - async fn get_block_number( - &self, - _confirmed_block_status: BlockStatus, - ) -> Result { - Ok(*self.block_number.read().unwrap()) - } - - async fn get_request_with_callback_events( - &self, - _from_block: BlockNumber, - _to_block: BlockNumber, - ) -> Result> { - Ok(vec![]) - } - - async fn estimate_reveal_with_callback_gas( - &self, - _sender: Address, - _provider: Address, - _sequence_number: u64, - _user_random_number: [u8; 32], - _provider_revelation: [u8; 32], - ) -> Result { - Ok(U256::from(5)) - } - } -} diff --git a/apps/argus/src/command/run.rs b/apps/argus/src/command/run.rs index 02540718d3..942eecee95 100644 --- a/apps/argus/src/command/run.rs +++ b/apps/argus/src/command/run.rs @@ -1,7 +1,6 @@ use { crate::{ api::{self, BlockchainState, ChainId}, - chain::ethereum::InstrumentedPythContract, config::{Config, EthereumConfig, RunOptions}, keeper::{self, keeper_metrics::KeeperMetrics}, }, @@ -37,11 +36,10 @@ const TRACK_INTERVAL: Duration = Duration::from_secs(10); pub async fn run_api( socket_addr: SocketAddr, - chains: HashMap, metrics_registry: Arc>, mut rx_exit: watch::Receiver, ) -> Result<()> { - let api_state = api::ApiState::new(chains, metrics_registry).await; + let api_state = api::ApiState::new(metrics_registry).await; // Initialize Axum Router. Note the type here is a `Router` due to the use of the // `with_state` method which replaces `Body` with `State` in the type signature. @@ -111,13 +109,11 @@ pub async fn run(opts: &RunOptions) -> Result<()> { let mut tasks = Vec::new(); for (chain_id, chain_config) in config.chains.clone() { - let rpc_metrics = rpc_metrics.clone(); tasks.push(spawn(async move { let state = setup_chain_state( &config.provider.address, &chain_id, &chain_config, - rpc_metrics, ) .await; @@ -174,7 +170,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { rpc_metrics.clone(), )); - run_api(opts.addr, chains, metrics_registry, rx_exit).await?; + run_api(opts.addr, metrics_registry, rx_exit).await?; Ok(()) } @@ -183,17 +179,9 @@ async fn setup_chain_state( provider: &Address, chain_id: &ChainId, chain_config: &EthereumConfig, - rpc_metrics: Arc, ) -> Result { - let contract = Arc::new(InstrumentedPythContract::from_config( - chain_config, - chain_id.clone(), - rpc_metrics, - )?); - let state = BlockchainState { id: chain_id.clone(), - contract, provider_address: *provider, reveal_delay_blocks: chain_config.reveal_delay_blocks, confirmed_block_status: chain_config.confirmed_block_status, diff --git a/apps/argus/src/keeper.rs b/apps/argus/src/keeper.rs index d9ee55513f..58ecfcca9e 100644 --- a/apps/argus/src/keeper.rs +++ b/apps/argus/src/keeper.rs @@ -61,8 +61,6 @@ pub async fn run_keeper_threads( rpc_metrics: Arc, ) { tracing::info!("starting keeper"); - let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; - tracing::info!("latest safe block: {}", &latest_safe_block); let contract = Arc::new( InstrumentedSignablePythContract::from_config( @@ -78,6 +76,9 @@ pub async fn run_keeper_threads( let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::::new())); + let latest_safe_block = get_latest_safe_block(contract.clone(), &chain_state).in_current_span().await; + tracing::info!("latest safe block: {}", &latest_safe_block); + // Spawn a thread to handle the events from last BACKLOG_RANGE blocks. let gas_limit: U256 = chain_eth_config.gas_limit.into(); spawn( @@ -101,6 +102,7 @@ pub async fn run_keeper_threads( // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel. spawn( watch_blocks_wrapper( + contract.clone(), chain_state.clone(), latest_safe_block, tx, diff --git a/apps/argus/src/keeper/block.rs b/apps/argus/src/keeper/block.rs index 834c53e80a..4941220797 100644 --- a/apps/argus/src/keeper/block.rs +++ b/apps/argus/src/keeper/block.rs @@ -37,10 +37,9 @@ pub struct BlockRange { } /// Get the latest safe block number for the chain. Retry internally if there is an error. -pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber { +pub async fn get_latest_safe_block(contract: Arc, chain_state: &BlockchainState) -> BlockNumber { loop { - match chain_state - .contract + match contract .get_block_number(chain_state.confirmed_block_status) .await { @@ -120,8 +119,7 @@ pub async fn process_single_block_batch( fulfilled_requests_cache: Arc>>, ) { loop { - let events_res = chain_state - .contract + let events_res = contract .get_request_with_callback_events(block_range.from, block_range.to) .await; @@ -169,6 +167,7 @@ pub async fn process_single_block_batch( initial_safe_block = latest_safe_block ))] pub async fn watch_blocks_wrapper( + contract: Arc, chain_state: BlockchainState, latest_safe_block: BlockNumber, tx: mpsc::Sender, @@ -177,6 +176,7 @@ pub async fn watch_blocks_wrapper( let mut last_safe_block_processed = latest_safe_block; loop { if let Err(e) = watch_blocks( + contract.clone(), chain_state.clone(), &mut last_safe_block_processed, tx.clone(), @@ -196,6 +196,7 @@ pub async fn watch_blocks_wrapper( /// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even /// know about it. pub async fn watch_blocks( + contract: Arc, chain_state: BlockchainState, last_safe_block_processed: &mut BlockNumber, tx: mpsc::Sender, @@ -241,7 +242,7 @@ pub async fn watch_blocks( } } - let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; + let latest_safe_block = get_latest_safe_block(contract.clone(), &chain_state).in_current_span().await; if latest_safe_block > *last_safe_block_processed { let mut from = latest_safe_block.saturating_sub(RETRY_PREVIOUS_BLOCKS); diff --git a/apps/argus/src/keeper/process_event.rs b/apps/argus/src/keeper/process_event.rs index 39b7716a18..b654194e5a 100644 --- a/apps/argus/src/keeper/process_event.rs +++ b/apps/argus/src/keeper/process_event.rs @@ -111,9 +111,8 @@ pub async fn process_event_with_backoff( // In case the callback did not succeed, we double-check that the request is still on-chain. // If the request is no longer on-chain, one of the transactions we sent likely succeeded, but // the RPC gave us an error anyway. - let req = chain_state - .contract - .get_request(event.provider_address, event.sequence_number) + let req = contract + .get_request_wrapper(event.provider_address, event.sequence_number) .await; tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req);