Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions apps/fortuna/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ pub struct EthereumConfig {
/// TODO: Change type from String to Url
pub geth_rpc_addr: String,

/// URL of a Geth RPC wss endpoint to use for subscribing to blockchain events.
pub geth_rpc_wss: Option<String>,

/// Address of a Pyth Randomness contract to interact with.
pub contract_addr: Address,

Expand Down
10 changes: 1 addition & 9 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,7 @@ pub async fn run_keeper_threads(

let (tx, rx) = mpsc::channel::<BlockRange>(1000);
// Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
spawn(
watch_blocks_wrapper(
chain_state.clone(),
latest_safe_block,
tx,
chain_eth_config.geth_rpc_wss.clone(),
)
.in_current_span(),
);
spawn(watch_blocks_wrapper(chain_state.clone(), latest_safe_block, tx).in_current_span());

// Spawn a thread for block processing with configured delays
spawn(
Expand Down
48 changes: 3 additions & 45 deletions apps/fortuna/src/keeper/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ use {
keeper::keeper_metrics::KeeperMetrics,
keeper::process_event::process_event_with_backoff,
},
anyhow::{anyhow, Result},
ethers::{
providers::{Middleware, Provider, Ws},
types::U256,
},
futures::StreamExt,
anyhow::Result,
ethers::types::U256,
std::{collections::HashSet, sync::Arc},
tokio::{
spawn,
Expand Down Expand Up @@ -176,15 +172,13 @@ pub async fn watch_blocks_wrapper(
chain_state: BlockchainState,
latest_safe_block: BlockNumber,
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) {
let mut last_safe_block_processed = latest_safe_block;
loop {
if let Err(e) = watch_blocks(
chain_state.clone(),
&mut last_safe_block_processed,
tx.clone(),
geth_rpc_wss.clone(),
)
.in_current_span()
.await
Expand All @@ -203,47 +197,11 @@ pub async fn watch_blocks(
chain_state: BlockchainState,
last_safe_block_processed: &mut BlockNumber,
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) -> Result<()> {
tracing::info!("Watching blocks to handle new events");

let provider_option = match geth_rpc_wss {
Some(wss) => Some(match Provider::<Ws>::connect(wss.clone()).await {
Ok(provider) => provider,
Err(e) => {
tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e);
return Err(e.into());
}
}),
None => {
tracing::info!("No wss provided");
None
}
};

let mut stream_option = match provider_option {
Some(ref provider) => Some(match provider.subscribe_blocks().await {
Ok(client) => client,
Err(e) => {
tracing::error!("Error while subscribing to blocks. error {:?}", e);
return Err(e.into());
}
}),
None => None,
};

loop {
match stream_option {
Some(ref mut stream) => {
if stream.next().await.is_none() {
tracing::error!("Error blocks subscription stream ended");
return Err(anyhow!("Error blocks subscription stream ended"));
}
}
None => {
time::sleep(POLL_INTERVAL).await;
}
}
time::sleep(POLL_INTERVAL).await;

let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
if latest_safe_block > *last_safe_block_processed {
Expand Down
Loading