Skip to content

Commit a5affb3

Browse files
LeoPatOZ0xNeshi
andauthored
feat: Retry logic (#135)
Co-authored-by: Nenad <[email protected]>
1 parent c2b09a5 commit a5affb3

File tree

9 files changed

+435
-84
lines changed

9 files changed

+435
-84
lines changed

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ tracing = "0.1"
3535
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
3636
test-log = { version = "0.2.18", features = ["trace"] }
3737
hex = "0.4"
38+
backon = "1.5.2"
3839

3940
[package]
4041
name = "event-scanner"
@@ -67,6 +68,7 @@ chrono.workspace = true
6768
alloy-node-bindings.workspace = true
6869
tokio-stream.workspace = true
6970
tracing.workspace = true
71+
backon.workspace = true
7072

7173
[dev-dependencies]
7274
tracing-subscriber.workspace = true

src/block_range_scanner.rs

Lines changed: 86 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,6 @@
4141
//! error!("Received error from subscription: {e}");
4242
//! match e {
4343
//! ScannerError::ServiceShutdown => break,
44-
//! ScannerError::WebSocketConnectionFailed(_) => {
45-
//! error!(
46-
//! "WebSocket connection failed, continuing to listen for reconnection"
47-
//! );
48-
//! }
4944
//! _ => {
5045
//! error!("Non-fatal error, continuing: {e}");
5146
//! }
@@ -63,24 +58,28 @@
6358
//! }
6459
//! ```
6560
66-
use std::{cmp::Ordering, ops::RangeInclusive};
61+
use std::{cmp::Ordering, ops::RangeInclusive, time::Duration};
6762
use tokio::{
68-
join,
6963
sync::{mpsc, oneshot},
64+
try_join,
7065
};
7166
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
7267

7368
use crate::{
7469
ScannerMessage,
7570
error::ScannerError,
71+
robust_provider::{
72+
DEFAULT_MAX_RETRIES, DEFAULT_MAX_TIMEOUT, DEFAULT_RETRY_INTERVAL,
73+
Error as RobustProviderError, RobustProvider,
74+
},
7675
types::{ScannerStatus, TryStream},
7776
};
7877
use alloy::{
7978
consensus::BlockHeader,
8079
eips::BlockNumberOrTag,
8180
network::{BlockResponse, Network, primitives::HeaderResponse},
8281
primitives::{B256, BlockNumber},
83-
providers::{Provider, RootProvider},
82+
providers::RootProvider,
8483
pubsub::Subscription,
8584
rpc::client::ClientBuilder,
8685
transports::{
@@ -113,6 +112,12 @@ impl PartialEq<RangeInclusive<BlockNumber>> for Message {
113112
}
114113
}
115114

115+
impl From<RobustProviderError> for Message {
116+
fn from(error: RobustProviderError) -> Self {
117+
Message::Error(error.into())
118+
}
119+
}
120+
116121
impl From<RpcError<TransportErrorKind>> for Message {
117122
fn from(error: RpcError<TransportErrorKind>) -> Self {
118123
Message::Error(error.into())
@@ -128,6 +133,9 @@ impl From<ScannerError> for Message {
128133
#[derive(Clone, Copy)]
129134
pub struct BlockRangeScanner {
130135
pub max_block_range: u64,
136+
pub max_timeout: Duration,
137+
pub max_retries: usize,
138+
pub retry_interval: Duration,
131139
}
132140

133141
impl Default for BlockRangeScanner {
@@ -139,7 +147,12 @@ impl Default for BlockRangeScanner {
139147
impl BlockRangeScanner {
140148
#[must_use]
141149
pub fn new() -> Self {
142-
Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE }
150+
Self {
151+
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
152+
max_timeout: DEFAULT_MAX_TIMEOUT,
153+
max_retries: DEFAULT_MAX_RETRIES,
154+
retry_interval: DEFAULT_RETRY_INTERVAL,
155+
}
143156
}
144157

145158
#[must_use]
@@ -148,6 +161,24 @@ impl BlockRangeScanner {
148161
self
149162
}
150163

164+
#[must_use]
165+
pub fn with_max_timeout(mut self, rpc_timeout: Duration) -> Self {
166+
self.max_timeout = rpc_timeout;
167+
self
168+
}
169+
170+
#[must_use]
171+
pub fn with_max_retries(mut self, rpc_max_retries: usize) -> Self {
172+
self.max_retries = rpc_max_retries;
173+
self
174+
}
175+
176+
#[must_use]
177+
pub fn with_retry_interval(mut self, rpc_retry_interval: Duration) -> Self {
178+
self.retry_interval = rpc_retry_interval;
179+
self
180+
}
181+
151182
/// Connects to the provider via WebSocket
152183
///
153184
/// # Errors
@@ -182,19 +213,26 @@ impl BlockRangeScanner {
182213
/// Returns an error if the connection fails
183214
#[must_use]
184215
pub fn connect<N: Network>(self, provider: RootProvider<N>) -> ConnectedBlockRangeScanner<N> {
185-
ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range }
216+
let robust_provider = RobustProvider::new(provider)
217+
.max_timeout(self.max_timeout)
218+
.max_retries(self.max_retries)
219+
.retry_interval(self.retry_interval);
220+
ConnectedBlockRangeScanner {
221+
provider: robust_provider,
222+
max_block_range: self.max_block_range,
223+
}
186224
}
187225
}
188226

189227
pub struct ConnectedBlockRangeScanner<N: Network> {
190-
provider: RootProvider<N>,
228+
provider: RobustProvider<N>,
191229
max_block_range: u64,
192230
}
193231

194232
impl<N: Network> ConnectedBlockRangeScanner<N> {
195-
/// Returns the underlying Provider.
233+
/// Returns the `RobustProvider`
196234
#[must_use]
197-
pub fn provider(&self) -> &RootProvider<N> {
235+
pub fn provider(&self) -> &RobustProvider<N> {
198236
&self.provider
199237
}
200238

@@ -240,15 +278,15 @@ pub enum Command {
240278
}
241279

242280
struct Service<N: Network> {
243-
provider: RootProvider<N>,
281+
provider: RobustProvider<N>,
244282
max_block_range: u64,
245283
error_count: u64,
246284
command_receiver: mpsc::Receiver<Command>,
247285
shutdown: bool,
248286
}
249287

250288
impl<N: Network> Service<N> {
251-
pub fn new(provider: RootProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
289+
pub fn new(provider: RobustProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
252290
let (cmd_tx, cmd_rx) = mpsc::channel(100);
253291

254292
let service = Self {
@@ -351,10 +389,8 @@ impl<N: Network> Service<N> {
351389
self.provider.get_block_by_number(end_height)
352390
)?;
353391

354-
let start_block_num =
355-
start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number();
356-
let end_block_num =
357-
end_block.ok_or_else(|| ScannerError::BlockNotFound(end_height))?.header().number();
392+
let start_block_num = start_block.header().number();
393+
let end_block_num = end_block.header().number();
358394

359395
let (start_block_num, end_block_num) = match start_block_num.cmp(&end_block_num) {
360396
Ordering::Greater => (end_block_num, start_block_num),
@@ -388,23 +424,14 @@ impl<N: Network> Service<N> {
388424
let get_start_block = async || -> Result<BlockNumber, ScannerError> {
389425
let block = match start_height {
390426
BlockNumberOrTag::Number(num) => num,
391-
block_tag => provider
392-
.get_block_by_number(block_tag)
393-
.await?
394-
.ok_or_else(|| ScannerError::BlockNotFound(block_tag))?
395-
.header()
396-
.number(),
427+
block_tag => provider.get_block_by_number(block_tag).await?.header().number(),
397428
};
398429
Ok(block)
399430
};
400431

401432
let get_latest_block = async || -> Result<BlockNumber, ScannerError> {
402-
let block = provider
403-
.get_block_by_number(BlockNumberOrTag::Latest)
404-
.await?
405-
.ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))?
406-
.header()
407-
.number();
433+
let block =
434+
provider.get_block_by_number(BlockNumberOrTag::Latest).await?.header().number();
408435
Ok(block)
409436
};
410437

@@ -496,13 +523,10 @@ impl<N: Network> Service<N> {
496523
let max_block_range = self.max_block_range;
497524
let provider = self.provider.clone();
498525

499-
let (start_block, end_block) = join!(
526+
let (start_block, end_block) = try_join!(
500527
self.provider.get_block_by_number(start_height),
501528
self.provider.get_block_by_number(end_height),
502-
);
503-
504-
let start_block = start_block?.ok_or(ScannerError::BlockNotFound(start_height))?;
505-
let end_block = end_block?.ok_or(ScannerError::BlockNotFound(end_height))?;
529+
)?;
506530

507531
// normalize block range
508532
let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
@@ -529,7 +553,7 @@ impl<N: Network> Service<N> {
529553
to: N::BlockResponse,
530554
max_block_range: u64,
531555
sender: &mpsc::Sender<Message>,
532-
provider: &RootProvider<N>,
556+
provider: &RobustProvider<N>,
533557
) {
534558
let mut batch_count = 0;
535559

@@ -584,12 +608,10 @@ impl<N: Network> Service<N> {
584608
batch_from = from;
585609
// store the updated end block hash
586610
tip_hash = match provider.get_block_by_number(from.into()).await {
587-
Ok(block) => block
588-
.unwrap_or_else(|| {
589-
panic!("Block with number '{from}' should exist post-reorg")
590-
})
591-
.header()
592-
.hash(),
611+
Ok(block) => block.header().hash(),
612+
Err(RobustProviderError::BlockNotFound(_)) => {
613+
panic!("Block with number '{from}' should exist post-reorg");
614+
}
593615
Err(e) => {
594616
error!(error = %e, "Terminal RPC call error, shutting down");
595617
_ = sender.try_stream(e);
@@ -644,9 +666,9 @@ impl<N: Network> Service<N> {
644666
info!(batch_count = batch_count, "Historical sync completed");
645667
}
646668

647-
async fn stream_live_blocks<P: Provider<N>>(
669+
async fn stream_live_blocks(
648670
mut range_start: BlockNumber,
649-
provider: P,
671+
provider: RobustProvider<N>,
650672
sender: mpsc::Sender<Message>,
651673
block_confirmations: u64,
652674
max_block_range: u64,
@@ -749,22 +771,22 @@ impl<N: Network> Service<N> {
749771
}
750772

751773
async fn get_block_subscription(
752-
provider: &impl Provider<N>,
774+
provider: &RobustProvider<N>,
753775
) -> Result<Subscription<N::HeaderResponse>, ScannerError> {
754-
let ws_stream = provider
755-
.subscribe_blocks()
756-
.await
757-
.map_err(|_| ScannerError::WebSocketConnectionFailed(1))?;
758-
776+
let ws_stream = provider.subscribe_blocks().await?;
759777
Ok(ws_stream)
760778
}
761779
}
762780

763781
async fn reorg_detected<N: Network>(
764-
provider: &RootProvider<N>,
782+
provider: &RobustProvider<N>,
765783
hash_to_check: B256,
766-
) -> Result<bool, RpcError<TransportErrorKind>> {
767-
Ok(provider.get_block_by_hash(hash_to_check).await?.is_none())
784+
) -> Result<bool, ScannerError> {
785+
match provider.get_block_by_hash(hash_to_check).await {
786+
Ok(_) => Ok(false),
787+
Err(RobustProviderError::BlockNotFound(_)) => Ok(true),
788+
Err(e) => Err(e.into()),
789+
}
768790
}
769791

770792
pub struct BlockRangeScannerClient {
@@ -913,6 +935,7 @@ mod tests {
913935
use super::*;
914936
use crate::{assert_closed, assert_empty, assert_next};
915937
use alloy::{
938+
eips::BlockId,
916939
network::Ethereum,
917940
providers::{ProviderBuilder, ext::AnvilApi},
918941
rpc::types::anvil::ReorgOptions,
@@ -1365,13 +1388,15 @@ mod tests {
13651388

13661389
#[tokio::test]
13671390
async fn try_send_forwards_errors_to_subscribers() {
1368-
let (tx, mut rx) = mpsc::channel(1);
1391+
let (tx, mut rx) = mpsc::channel::<Message>(1);
13691392

1370-
_ = tx.try_stream(ScannerError::WebSocketConnectionFailed(4)).await;
1393+
_ = tx.try_stream(ScannerError::BlockNotFound(4.into())).await;
13711394

13721395
assert!(matches!(
13731396
rx.recv().await,
1374-
Some(Message::Error(ScannerError::WebSocketConnectionFailed(4)))
1397+
Some(ScannerMessage::Error(ScannerError::BlockNotFound(BlockId::Number(
1398+
BlockNumberOrTag::Number(4)
1399+
))))
13751400
));
13761401
}
13771402

@@ -1566,7 +1591,10 @@ mod tests {
15661591

15671592
let stream = client.rewind(0, 999).await;
15681593

1569-
assert!(matches!(stream, Err(ScannerError::BlockNotFound(BlockNumberOrTag::Number(999)))));
1594+
assert!(matches!(
1595+
stream,
1596+
Err(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(999))))
1597+
));
15701598

15711599
Ok(())
15721600
}

0 commit comments

Comments
 (0)