Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
466 changes: 125 additions & 341 deletions src/block_range_scanner.rs

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ pub enum ScannerError {
#[error("RPC error: {0}")]
RpcError(Arc<RpcError<TransportErrorKind>>),

/// The internal service has shut down and can no longer process commands.
#[error("Service is shutting down")]
ServiceShutdown,

/// A requested block (by number, hash or tag) could not be retrieved.
#[error("Block not found, Block Id: {0}")]
BlockNotFound(BlockId),
Expand Down
9 changes: 5 additions & 4 deletions src/event_scanner/scanner/historic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,14 @@ impl<N: Network> EventScanner<Historic, N> {
///
/// # Errors
///
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
/// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved.
pub async fn start(self) -> Result<(), ScannerError> {
let client = self.block_range_scanner.run()?;
let stream = client.stream_historical(self.config.from_block, self.config.to_block).await?;
pub async fn start(mut self) -> Result<(), ScannerError> {
let stream = self
.block_range_scanner
.stream_historical(self.config.from_block, self.config.to_block)
.await?;

let max_concurrent_fetches = self.config.max_concurrent_fetches;
let provider = self.block_range_scanner.provider().clone();
Expand Down
9 changes: 5 additions & 4 deletions src/event_scanner/scanner/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,14 @@ impl<N: Network> EventScanner<LatestEvents, N> {
///
/// # Errors
///
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
/// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved.
pub async fn start(self) -> Result<(), ScannerError> {
let client = self.block_range_scanner.run()?;
let stream = client.rewind(self.config.from_block, self.config.to_block).await?;
pub async fn start(mut self) -> Result<(), ScannerError> {
let stream = self
.block_range_scanner
.stream_rewind(self.config.from_block, self.config.to_block)
.await?;

let max_concurrent_fetches = self.config.max_concurrent_fetches;
let provider = self.block_range_scanner.provider().clone();
Expand Down
7 changes: 2 additions & 5 deletions src/event_scanner/scanner/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,10 @@ impl<N: Network> EventScanner<Live, N> {
///
/// # Errors
///
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
pub async fn start(self) -> Result<(), ScannerError> {
let client = self.block_range_scanner.run()?;
let stream = client.stream_live(self.config.block_confirmations).await?;

pub async fn start(mut self) -> Result<(), ScannerError> {
let stream = self.block_range_scanner.stream_live(self.config.block_confirmations).await?;
let max_concurrent_fetches = self.config.max_concurrent_fetches;
let provider = self.block_range_scanner.provider().clone();
let listeners = self.listeners.clone();
Expand Down
8 changes: 4 additions & 4 deletions src/event_scanner/scanner/sync/from_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ impl<N: Network> EventScanner<SyncFromBlock, N> {
///
/// # Errors
///
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
/// * [`ScannerError::BlockNotFound`] - if `from_block` cannot be resolved.
pub async fn start(self) -> Result<(), ScannerError> {
let client = self.block_range_scanner.run()?;
let stream =
client.stream_from(self.config.from_block, self.config.block_confirmations).await?;
let stream = self
.block_range_scanner
.stream_from(self.config.from_block, self.config.block_confirmations)
.await?;

let max_concurrent_fetches = self.config.max_concurrent_fetches;
let provider = self.block_range_scanner.provider().clone();
Expand Down
33 changes: 18 additions & 15 deletions src/event_scanner/scanner/sync/from_latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
///
/// # Errors
///
/// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started.
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
#[allow(clippy::missing_panics_doc)]
pub async fn start(self) -> Result<(), ScannerError> {
pub async fn start(mut self) -> Result<(), ScannerError> {
let count = self.config.count;
let provider = self.block_range_scanner.provider().clone();
let listeners = self.listeners.clone();
Expand All @@ -84,16 +83,17 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {

info!(count = count, "Starting scanner, mode: fetch latest events and switch to live");

let client = self.block_range_scanner.run()?;

// Fetch the latest block number.
// This is used to determine the starting point for the rewind stream and the live
// stream. We do this before starting the streams to avoid a race condition
// where the latest block changes while we're setting up the streams.
let latest_block = provider.get_block_number().await?;

// Setup rewind and live streams to run in parallel.
let rewind_stream = client.rewind(latest_block, BlockNumberOrTag::Earliest).await?;
let rewind_stream = self
.block_range_scanner
.stream_rewind(latest_block, BlockNumberOrTag::Earliest)
.await?;

// Start streaming...
tokio::spawn(async move {
Expand All @@ -114,17 +114,20 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
// We actually rely on the sync mode for the live stream, as more blocks could have been
// minted while the scanner was collecting the latest `count` events.
// Note: Sync mode will notify the client when it switches to live streaming.
let sync_stream =
match client.stream_from(latest_block + 1, self.config.block_confirmations).await {
Ok(stream) => stream,
Err(e) => {
error!(error = %e, "Error during sync mode setup");
for listener in listeners {
_ = listener.sender.try_stream(e.clone()).await;
}
return;
let sync_stream = match self
.block_range_scanner
.stream_from(latest_block + 1, self.config.block_confirmations)
.await
{
Ok(stream) => stream,
Err(e) => {
error!(error = %e, "Error during sync mode setup");
for listener in listeners {
_ = listener.sender.try_stream(e.clone()).await;
}
};
return;
}
};

// Start the live (sync) stream.
handle_stream(
Expand Down
Loading