Skip to content

Commit f622b21

Browse files
committed
feat: move block not found to provider
1 parent f2b6d47 commit f622b21

File tree

3 files changed

+22
-48
lines changed

3 files changed

+22
-48
lines changed

src/block_range_scanner.rs

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@
6565
6666
use std::{cmp::Ordering, ops::RangeInclusive, time::Duration};
6767
use tokio::{
68-
join,
6968
sync::{mpsc, oneshot},
69+
try_join,
7070
};
7171
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
7272

@@ -398,10 +398,8 @@ impl<N: Network> Service<N> {
398398
self.provider.get_block_by_number(end_height)
399399
)?;
400400

401-
let start_block_num =
402-
start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number();
403-
let end_block_num =
404-
end_block.ok_or_else(|| ScannerError::BlockNotFound(end_height))?.header().number();
401+
let start_block_num = start_block.header().number();
402+
let end_block_num = end_block.header().number();
405403

406404
let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
407405
Ordering::Greater => (end_block_num, start_block_num),
@@ -435,12 +433,8 @@ impl<N: Network> Service<N> {
435433
self.provider.get_block_by_number(BlockNumberOrTag::Latest)
436434
)?;
437435

438-
let start_block_num =
439-
start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number();
440-
let latest_block = latest_block
441-
.ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))?
442-
.header()
443-
.number();
436+
let start_block_num = start_block.header().number();
437+
let latest_block = latest_block.header().number();
444438

445439
let confirmed_tip_num = latest_block.saturating_sub(block_confirmations);
446440

@@ -536,13 +530,10 @@ impl<N: Network> Service<N> {
536530
start_height: BlockNumberOrTag,
537531
end_height: BlockNumberOrTag,
538532
) -> Result<(), ScannerError> {
539-
let (start_block, end_block) = join!(
533+
let (start_block, end_block) = try_join!(
540534
self.provider.get_block_by_number(start_height),
541535
self.provider.get_block_by_number(end_height),
542-
);
543-
544-
let start_block = start_block?.ok_or(ScannerError::BlockNotFound(start_height))?;
545-
let end_block = end_block?.ok_or(ScannerError::BlockNotFound(end_height))?;
536+
)?;
546537

547538
// normalize block range
548539
let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
@@ -606,13 +597,7 @@ impl<N: Network> Service<N> {
606597
// restart rewind
607598
batch_from = from;
608599
// store the updated end block hash
609-
tip_hash = self
610-
.provider
611-
.get_block_by_number(from.into())
612-
.await?
613-
.expect("Chain should have the same height post-reorg")
614-
.header()
615-
.hash();
600+
tip_hash = self.provider.get_block_by_number(from.into()).await?.header().hash();
616601
} else {
617602
// SAFETY: `batch_to` is always greater than `to`, so `batch_to - 1` is always
618603
// a valid unsigned integer
@@ -785,11 +770,7 @@ impl<N: Network> Service<N> {
785770
async fn get_block_subscription(
786771
provider: &RobustProvider<N>,
787772
) -> Result<Subscription<N::HeaderResponse>, ScannerError> {
788-
let ws_stream = provider
789-
.subscribe_blocks()
790-
.await
791-
.map_err(|_| ScannerError::WebSocketConnectionFailed(1))?;
792-
773+
let ws_stream = provider.subscribe_blocks().await?;
793774
Ok(ws_stream)
794775
}
795776

@@ -1656,13 +1637,11 @@ mod tests {
16561637
let (tx, mut rx) = mpsc::channel(1);
16571638
service.subscriber = Some(tx);
16581639

1659-
service
1660-
.send_to_subscriber(Message::Error(ScannerError::WebSocketConnectionFailed(4)))
1661-
.await;
1640+
service.send_to_subscriber(Message::Error(ScannerError::BlockNotFound(4.into()))).await;
16621641

16631642
match rx.recv().await.expect("subscriber should stay open") {
1664-
Message::Error(ScannerError::WebSocketConnectionFailed(attempts)) => {
1665-
assert_eq!(attempts, 4);
1643+
Message::Error(ScannerError::BlockNotFound(attempts)) => {
1644+
assert_eq!(attempts, 4.into());
16661645
}
16671646
other => panic!("unexpected message: {other:?}"),
16681647
}

src/error.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,14 @@ use std::{ops::RangeInclusive, sync::Arc};
33
use alloy::{
44
eips::BlockNumberOrTag,
55
primitives::BlockNumber,
6-
transports::{RpcError, TransportErrorKind, http::reqwest},
6+
transports::{RpcError, TransportErrorKind},
77
};
88
use thiserror::Error;
99

1010
use crate::{block_range_scanner::Message, robust_provider::RobustProviderError};
1111

1212
#[derive(Error, Debug, Clone)]
1313
pub enum ScannerError {
14-
#[error("HTTP request failed: {0}")]
15-
HttpError(Arc<reqwest::Error>),
16-
1714
// #[error("WebSocket error: {0}")]
1815
// WebSocketError(#[from] tokio_tungstenite::tungstenite::Error),
1916
#[error("Serialization error: {0}")]
@@ -37,9 +34,6 @@ pub enum ScannerError {
3734
#[error("Historical sync failed: {0}")]
3835
HistoricalSyncError(String),
3936

40-
#[error("WebSocket connection failed after {0} attempts")]
41-
WebSocketConnectionFailed(usize),
42-
4337
#[error("Block not found, block number: {0}")]
4438
BlockNotFound(BlockNumberOrTag),
4539

@@ -56,6 +50,7 @@ impl From<RobustProviderError> for ScannerError {
5650
RobustProviderError::RpcError(err) => ScannerError::RpcError(err),
5751
RobustProviderError::Timeout => ScannerError::Timeout,
5852
RobustProviderError::RetryFail(num) => ScannerError::RetryFail(num),
53+
RobustProviderError::BlockNotFound(block) => ScannerError::BlockNotFound(block),
5954
}
6055
}
6156
}
@@ -69,12 +64,6 @@ impl From<Result<RangeInclusive<BlockNumber>, ScannerError>> for Message {
6964
}
7065
}
7166

72-
impl From<reqwest::Error> for ScannerError {
73-
fn from(error: reqwest::Error) -> Self {
74-
ScannerError::HttpError(Arc::new(error))
75-
}
76-
}
77-
7867
impl From<serde_json::Error> for ScannerError {
7968
fn from(error: serde_json::Error) -> Self {
8069
ScannerError::SerializationError(Arc::new(error))

src/robust_provider.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub enum RobustProviderError {
2020
Timeout,
2121
#[error("Retry failed after {0} tries")]
2222
RetryFail(usize),
23+
#[error("Block not found, block number: {0}")]
24+
BlockNotFound(BlockNumberOrTag),
2325
}
2426

2527
impl From<RpcError<TransportErrorKind>> for RobustProviderError {
@@ -87,7 +89,7 @@ impl<N: Network> RobustProvider<N> {
8789
pub async fn get_block_by_number(
8890
&self,
8991
number: BlockNumberOrTag,
90-
) -> Result<Option<N::BlockResponse>, RobustProviderError> {
92+
) -> Result<N::BlockResponse, RobustProviderError> {
9193
info!("eth_getBlockByNumber called");
9294
let operation = async || {
9395
self.provider.get_block_by_number(number).await.map_err(RobustProviderError::from)
@@ -96,7 +98,11 @@ impl<N: Network> RobustProvider<N> {
9698
if let Err(e) = &result {
9799
error!(error = %e, "eth_getByBlockNumber failed");
98100
}
99-
result
101+
102+
match result? {
103+
Some(block) => Ok(block),
104+
None => Err(RobustProviderError::BlockNotFound(number)),
105+
}
100106
}
101107

102108
/// Fetch the latest block number with retry and timeout.

0 commit comments

Comments
 (0)