diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index 5bf3582992..6ceeff35e4 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -110,9 +110,6 @@ pub struct EthereumConfig { /// TODO: Change type from String to Url pub geth_rpc_addr: String, - /// URL of a Geth RPC wss endpoint to use for subscribing to blockchain events. - pub geth_rpc_wss: Option, - /// Address of a Pyth Randomness contract to interact with. pub contract_addr: Address, diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index ab689c4267..c585ff3908 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -99,15 +99,7 @@ pub async fn run_keeper_threads( let (tx, rx) = mpsc::channel::(1000); // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel. - spawn( - watch_blocks_wrapper( - chain_state.clone(), - latest_safe_block, - tx, - chain_eth_config.geth_rpc_wss.clone(), - ) - .in_current_span(), - ); + spawn(watch_blocks_wrapper(chain_state.clone(), latest_safe_block, tx).in_current_span()); // Spawn a thread for block processing with configured delays spawn( diff --git a/apps/fortuna/src/keeper/block.rs b/apps/fortuna/src/keeper/block.rs index 5382bd9577..c73719039b 100644 --- a/apps/fortuna/src/keeper/block.rs +++ b/apps/fortuna/src/keeper/block.rs @@ -6,12 +6,8 @@ use { keeper::keeper_metrics::KeeperMetrics, keeper::process_event::process_event_with_backoff, }, - anyhow::{anyhow, Result}, - ethers::{ - providers::{Middleware, Provider, Ws}, - types::U256, - }, - futures::StreamExt, + anyhow::Result, + ethers::types::U256, std::{collections::HashSet, sync::Arc}, tokio::{ spawn, @@ -176,7 +172,6 @@ pub async fn watch_blocks_wrapper( chain_state: BlockchainState, latest_safe_block: BlockNumber, tx: mpsc::Sender, - geth_rpc_wss: Option, ) { let mut last_safe_block_processed = latest_safe_block; loop { @@ -184,7 +179,6 @@ pub async fn watch_blocks_wrapper( chain_state.clone(), &mut last_safe_block_processed, tx.clone(), - geth_rpc_wss.clone(), ) .in_current_span() .await @@ -203,47 +197,11 @@ pub async fn watch_blocks( chain_state: BlockchainState, last_safe_block_processed: &mut BlockNumber, tx: mpsc::Sender, - geth_rpc_wss: Option, ) -> Result<()> { tracing::info!("Watching blocks to handle new events"); - let provider_option = match geth_rpc_wss { - Some(wss) => Some(match Provider::::connect(wss.clone()).await { - Ok(provider) => provider, - Err(e) => { - tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e); - return Err(e.into()); - } - }), - None => { - tracing::info!("No wss provided"); - None - } - }; - - let mut stream_option = match provider_option { - Some(ref provider) => Some(match provider.subscribe_blocks().await { - Ok(client) => client, - Err(e) => { - tracing::error!("Error while subscribing to blocks. error {:?}", e); - return Err(e.into()); - } - }), - None => None, - }; - loop { - match stream_option { - Some(ref mut stream) => { - if stream.next().await.is_none() { - tracing::error!("Error blocks subscription stream ended"); - return Err(anyhow!("Error blocks subscription stream ended")); - } - } - None => { - time::sleep(POLL_INTERVAL).await; - } - } + time::sleep(POLL_INTERVAL).await; let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; if latest_safe_block > *last_safe_block_processed {