Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 58 additions & 61 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
//!
//! ```rust,no_run
//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
//! use event_scanner::block_range_scanner::BlockScannerError;
//! use std::ops::Range;
//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
//!
//! use alloy::transports::http::reqwest::Url;
//! use event_scanner::block_range_scanner::{BlockRangeScanner, BlockScannerClient};
//! use event_scanner::block_range_scanner::{
//! BlockRangeScanner, BlockRangeScannerClient, Error as BlockRangeScannerError,
//! };
//! use tokio::time::Duration;
//! use tracing::{error, info};
//!
Expand All @@ -26,10 +27,10 @@
//! .await?;
//!
//! // Create client to send subscribe command to block scanner
//! let subscription_client: BlockScannerClient = block_range_scanner.run()?;
//! let client: BlockRangeScannerClient = block_range_scanner.run()?;
//!
//! let mut receiver: ReceiverStream<Result<Range<BlockNumber>, BlockScannerError>> =
//! subscription_client
//! let mut receiver: ReceiverStream<Result<Range<BlockNumber>, BlockRangeScannerError>> =
//! client
//! .subscribe(
//! BlockNumberOrTag::Latest,
//! None, // just subscribe to new blocks
Expand All @@ -46,8 +47,8 @@
//!
//! // Decide whether to continue or break based on error type
//! match e {
//! BlockScannerError::ServiceShutdown => break,
//! BlockScannerError::WebSocketConnectionFailed(_) => {
//! BlockRangeScannerError::ServiceShutdown => break,
//! BlockRangeScannerError::WebSocketConnectionFailed(_) => {
//! // Maybe implement backoff and retry logic here
//! error!(
//! "WebSocket connection failed, continuing to listen for reconnection"
Expand Down Expand Up @@ -106,7 +107,7 @@ const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;
// const STATE_SYNC_MAX_RETRIES: u64 = 12;

#[derive(Error, Debug)]
pub enum BlockScannerError {
pub enum Error {
#[error("HTTP request failed: {0}")]
HttpError(#[from] reqwest::Error),

Expand Down Expand Up @@ -140,19 +141,19 @@ pub enum BlockScannerError {
#[derive(Debug)]
pub enum Command {
Subscribe {
sender: mpsc::Sender<Result<Range<BlockNumber>, BlockScannerError>>,
sender: mpsc::Sender<Result<Range<BlockNumber>, Error>>,
start_height: BlockNumberOrTag,
end_height: Option<BlockNumberOrTag>,
response: oneshot::Sender<Result<(), BlockScannerError>>,
response: oneshot::Sender<Result<(), Error>>,
},
Unsubscribe {
response: oneshot::Sender<Result<(), BlockScannerError>>,
response: oneshot::Sender<Result<(), Error>>,
},
GetStatus {
response: oneshot::Sender<ServiceStatus>,
},
Shutdown {
response: oneshot::Sender<Result<(), BlockScannerError>>,
response: oneshot::Sender<Result<(), Error>>,
},
}

Expand Down Expand Up @@ -243,10 +244,10 @@ impl BlockRangeScanner {
pub async fn connect_ws<N: Network>(
self,
ws_url: Url,
) -> TransportResult<ConnectedBlockScanner<N>> {
) -> TransportResult<ConnectedBlockRangeScanner<N>> {
let provider =
RootProvider::<N>::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?);
Ok(ConnectedBlockScanner {
Ok(ConnectedBlockRangeScanner {
provider,
config: Config {
blocks_read_per_epoch: self.blocks_read_per_epoch,
Expand All @@ -265,9 +266,9 @@ impl BlockRangeScanner {
pub async fn connect_ipc<N: Network>(
self,
ipc_path: String,
) -> TransportResult<ConnectedBlockScanner<N>> {
) -> TransportResult<ConnectedBlockRangeScanner<N>> {
let provider = RootProvider::<N>::new(ClientBuilder::default().ipc(ipc_path.into()).await?);
Ok(ConnectedBlockScanner {
Ok(ConnectedBlockRangeScanner {
provider,
config: Config {
blocks_read_per_epoch: self.blocks_read_per_epoch,
Expand All @@ -279,12 +280,12 @@ impl BlockRangeScanner {
}
}

pub struct ConnectedBlockScanner<N: Network> {
pub struct ConnectedBlockRangeScanner<N: Network> {
provider: RootProvider<N>,
config: Config,
}

impl<N: Network> ConnectedBlockScanner<N> {
impl<N: Network> ConnectedBlockRangeScanner<N> {
/// Returns the underlying Provider.
#[must_use]
pub fn provider(&self) -> &impl Provider<N> {
Expand All @@ -297,20 +298,19 @@ impl<N: Network> ConnectedBlockScanner<N> {
/// # Errors
///
/// Returns an error if the subscription service fails to start.
pub fn run(&self) -> anyhow::Result<BlockScannerClient> {
let (service, cmd_tx) =
BlockScannerService::new(self.config.clone(), self.provider.clone());
pub fn run(&self) -> anyhow::Result<BlockRangeScannerClient> {
let (service, cmd_tx) = Service::new(self.config.clone(), self.provider.clone());
tokio::spawn(async move {
service.run().await;
});
Ok(BlockScannerClient::new(cmd_tx))
Ok(BlockRangeScannerClient::new(cmd_tx))
}
}

struct BlockScannerService<N: Network> {
struct Service<N: Network> {
config: Config,
provider: RootProvider<N>,
subscriber: Option<mpsc::Sender<Result<Range<BlockNumber>, BlockScannerError>>>,
subscriber: Option<mpsc::Sender<Result<Range<BlockNumber>, Error>>>,
current: Option<BlockHashAndNumber>,
websocket_connected: bool,
processed_count: u64,
Expand All @@ -319,7 +319,7 @@ struct BlockScannerService<N: Network> {
shutdown: bool,
}

impl<N: Network> BlockScannerService<N> {
impl<N: Network> Service<N> {
pub fn new(config: Config, provider: RootProvider<N>) -> (Self, mpsc::Sender<Command>) {
let (cmd_tx, cmd_rx) = mpsc::channel(100);

Expand Down Expand Up @@ -360,7 +360,7 @@ impl<N: Network> BlockScannerService<N> {
info!("Subscription service stopped");
}

async fn handle_command(&mut self, command: Command) -> Result<(), BlockScannerError> {
async fn handle_command(&mut self, command: Command) -> Result<(), Error> {
match command {
Command::Subscribe { sender, start_height, end_height, response } => {
let result = self.handle_subscribe(sender, start_height, end_height).await;
Expand All @@ -385,12 +385,12 @@ impl<N: Network> BlockScannerService<N> {

async fn handle_subscribe(
&mut self,
sender: mpsc::Sender<Result<Range<BlockNumber>, BlockScannerError>>,
sender: mpsc::Sender<Result<Range<BlockNumber>, Error>>,
start_height: BlockNumberOrTag,
end_height: Option<BlockNumberOrTag>,
) -> Result<(), BlockScannerError> {
) -> Result<(), Error> {
if self.subscriber.is_some() {
return Err(BlockScannerError::MultipleSubscribers);
return Err(Error::MultipleSubscribers);
}

// TODO: update local state relate to reorg and validate data
Expand All @@ -407,7 +407,7 @@ impl<N: Network> BlockScannerService<N> {
&mut self,
start_height: BlockNumberOrTag,
end_height: Option<BlockNumberOrTag>,
) -> Result<(), BlockScannerError> {
) -> Result<(), Error> {
// Step 1: Establish WebSocket connection
let (buffer_tx, buffer_rx) = mpsc::channel(MAX_BUFFERED_MESSAGES);

Expand Down Expand Up @@ -451,15 +451,15 @@ impl<N: Network> BlockScannerService<N> {
if let Err(e) = self.sync_historical_data(start_block, sync_end_block).await {
warn!("aborting ws_task");
ws_task.abort();
return Err(BlockScannerError::HistoricalSyncError(e.to_string()));
return Err(Error::HistoricalSyncError(e.to_string()));
}

// Step 3: Process buffered WebSocket messages
let sender = self.subscriber.clone().expect("subscriber should be set");
tokio::spawn(async move {
if end_height.is_none() {
Self::process_buffered_messages(buffer_rx, sender, cutoff).await;
} else if sender.send(Err(BlockScannerError::Eof)).await.is_err() {
} else if sender.send(Err(Error::Eof)).await.is_err() {
warn!("Subscriber channel closed, cleaning up");
}
});
Expand All @@ -477,7 +477,7 @@ impl<N: Network> BlockScannerService<N> {
&mut self,
start: N::BlockResponse,
end: N::BlockResponse,
) -> Result<(), BlockScannerError> {
) -> Result<(), Error> {
let mut batch_count = 0;

self.current = Some(BlockHashAndNumber::from_header::<N>(start.header()));
Expand Down Expand Up @@ -514,7 +514,7 @@ impl<N: Network> BlockScannerService<N> {
Ok(())
}

async fn ensure_current_not_reorged(&mut self) -> Result<(), BlockScannerError> {
async fn ensure_current_not_reorged(&mut self) -> Result<(), Error> {
let current_block =
self.provider.get_block_by_hash(self.current.as_ref().unwrap().hash).await?;
if current_block.is_some() {
Expand All @@ -524,7 +524,7 @@ impl<N: Network> BlockScannerService<N> {
self.rewind_on_reorg_detected().await
}

async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockScannerError> {
async fn rewind_on_reorg_detected(&mut self) -> Result<(), Error> {
let mut new_current_height =
if self.current.as_ref().unwrap().number <= self.config.reorg_rewind_depth {
0
Expand Down Expand Up @@ -596,7 +596,7 @@ impl<N: Network> BlockScannerService<N> {

async fn process_buffered_messages(
mut buffer_rx: mpsc::Receiver<Range<BlockNumber>>,
sender: mpsc::Sender<Result<Range<BlockNumber>, BlockScannerError>>,
sender: mpsc::Sender<Result<Range<BlockNumber>, Error>>,
cutoff: BlockNumber,
) {
let mut processed = 0;
Expand Down Expand Up @@ -631,16 +631,14 @@ impl<N: Network> BlockScannerService<N> {

async fn get_block_subscription(
provider: &impl Provider<N>,
) -> Result<Subscription<N::HeaderResponse>, BlockScannerError> {
let ws_stream = provider
.subscribe_blocks()
.await
.map_err(|_| BlockScannerError::WebSocketConnectionFailed(1))?;
) -> Result<Subscription<N::HeaderResponse>, Error> {
let ws_stream =
provider.subscribe_blocks().await.map_err(|_| Error::WebSocketConnectionFailed(1))?;

Ok(ws_stream)
}

async fn send_to_subscriber(&mut self, result: Result<Range<BlockNumber>, BlockScannerError>) {
async fn send_to_subscriber(&mut self, result: Result<Range<BlockNumber>, Error>) {
if let Some(ref sender) = self.subscriber {
if sender.send(result).await.is_err() {
self.subscriber = None;
Expand Down Expand Up @@ -669,11 +667,11 @@ impl<N: Network> BlockScannerService<N> {
}
}

pub struct BlockScannerClient {
pub struct BlockRangeScannerClient {
command_sender: mpsc::Sender<Command>,
}

impl BlockScannerClient {
impl BlockRangeScannerClient {
/// Creates a new subscription client.
///
/// # Arguments
Expand All @@ -693,13 +691,12 @@ impl BlockScannerClient {
///
/// # Errors
///
/// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down.
/// * `Error::ServiceShutdown` - if the service is already shutting down.
pub async fn subscribe(
&self,
start_height: BlockNumberOrTag,
end_height: Option<BlockNumberOrTag>,
) -> Result<ReceiverStream<Result<Range<BlockNumber>, BlockScannerError>>, BlockScannerError>
{
) -> Result<ReceiverStream<Result<Range<BlockNumber>, Error>>, Error> {
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
let (response_tx, response_rx) = oneshot::channel();

Expand All @@ -710,9 +707,9 @@ impl BlockScannerClient {
response: response_tx,
};

self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?;
self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?;

response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)??;
response_rx.await.map_err(|_| Error::ServiceShutdown)??;

let stream = ReceiverStream::new(blocks_receiver);

Expand All @@ -723,45 +720,45 @@ impl BlockScannerClient {
///
/// # Errors
///
/// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down.
pub async fn unsubscribe(&self) -> Result<(), BlockScannerError> {
/// * `Error::ServiceShutdown` - if the service is already shutting down.
pub async fn unsubscribe(&self) -> Result<(), Error> {
let (response_tx, response_rx) = oneshot::channel();

let command = Command::Unsubscribe { response: response_tx };

self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?;
self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?;

response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)?
response_rx.await.map_err(|_| Error::ServiceShutdown)?
}

/// Returns the current status of the subscription service.
///
/// # Errors
///
/// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down.
pub async fn get_status(&self) -> Result<ServiceStatus, BlockScannerError> {
/// * `Error::ServiceShutdown` - if the service is already shutting down.
pub async fn get_status(&self) -> Result<ServiceStatus, Error> {
let (response_tx, response_rx) = oneshot::channel();

let command = Command::GetStatus { response: response_tx };

self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?;
self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?;

response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)
response_rx.await.map_err(|_| Error::ServiceShutdown)
}

/// Shuts down the subscription service and unsubscribes the current subscriber.
///
/// # Errors
///
/// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down.
pub async fn shutdown(&self) -> Result<(), BlockScannerError> {
/// * `Error::ServiceShutdown` - if the service is already shutting down.
pub async fn shutdown(&self) -> Result<(), Error> {
let (response_tx, response_rx) = oneshot::channel();

let command = Command::Shutdown { response: response_tx };

self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?;
self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?;

response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)?
response_rx.await.map_err(|_| Error::ServiceShutdown)?
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/event_scanner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use crate::{
block_range_scanner::{BlockRangeScanner, BlockScannerError, ConnectedBlockScanner},
block_range_scanner::{self, BlockRangeScanner, ConnectedBlockRangeScanner},
callback::strategy::{CallbackStrategy, StateSyncAwareStrategy},
types::EventFilter,
};
Expand Down Expand Up @@ -100,7 +100,7 @@ impl EventScannerBuilder {
pub async fn connect_ws<N: Network>(
self,
ws_url: Url,
) -> Result<EventScanner<N>, BlockScannerError> {
) -> Result<EventScanner<N>, block_range_scanner::Error> {
let block_range_scanner = self.block_range_scanner.connect_ws(ws_url).await?;
Ok(EventScanner {
block_range_scanner,
Expand All @@ -117,7 +117,7 @@ impl EventScannerBuilder {
pub async fn connect_ipc<N: Network>(
self,
ipc_path: impl Into<String>,
) -> Result<EventScanner<N>, BlockScannerError> {
) -> Result<EventScanner<N>, block_range_scanner::Error> {
let block_range_scanner = self.block_range_scanner.connect_ipc(ipc_path.into()).await?;
Ok(EventScanner {
block_range_scanner,
Expand All @@ -134,7 +134,7 @@ impl EventScannerBuilder {
}

pub struct EventScanner<N: Network> {
block_range_scanner: ConnectedBlockScanner<N>,
block_range_scanner: ConnectedBlockRangeScanner<N>,
tracked_events: Vec<EventFilter>,
callback_strategy: Arc<dyn CallbackStrategy>,
}
Expand Down