Skip to content

Commit 3ded19b

Browse files
LeoPatOZ0xNeshi
andauthored
Add robust subscription (#172)
Co-authored-by: Nenad <[email protected]>
1 parent f8241c5 commit 3ded19b

File tree

17 files changed

+1498
-243
lines changed

17 files changed

+1498
-243
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ alloy-node-bindings.workspace = true
6060
tokio-stream.workspace = true
6161
tracing.workspace = true
6262
backon.workspace = true
63+
tokio-util = "0.7.17"
6364

6465
[dev-dependencies]
6566
tracing-subscriber.workspace = true

README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,44 @@ All examples spin up a local `anvil` instance, deploy a demo counter contract, a
270270

271271
---
272272

273+
## Robust Provider
274+
275+
`event-scanner` ships with a `robust_provider` module that wraps Alloy providers with:
276+
277+
- bounded per-call timeouts and exponential backoff retries
278+
- automatic failover from a primary provider to one or more fallbacks
279+
- resilient WebSocket block subscriptions with timeout handling and reconnection.
280+
281+
The main entry point is `robust_provider::RobustProviderBuilder`, which accepts a wide
282+
range of provider types (URLs, `RootProvider`, layered providers, etc.) through the
283+
`IntoRobustProvider` and `IntoRobustProvider` traits.
284+
285+
A typical setup looks like:
286+
287+
```rust
288+
use alloy::providers::ProviderBuilder;
289+
use event_scanner::robust_provider::RobustProviderBuilder;
290+
use std::time::Duration;
291+
292+
async fn example() -> anyhow::Result<()> {
293+
let ws = ProviderBuilder::new().connect("ws://localhost:8545").await?;
294+
let http = ProviderBuilder::new().connect_http("http://localhost:8545".parse()?);
295+
296+
let provider = RobustProviderBuilder::new(ws)
297+
.fallback(http)
298+
.call_timeout(Duration::from_secs(30))
299+
.subscription_timeout(Duration::from_secs(120))
300+
.build()
301+
.await?;
302+
Ok(())
303+
}
304+
```
305+
306+
You can then pass this `robust` provider into `EventScannerBuilder::connect` just like
307+
any other provider.
308+
309+
---
310+
273311
## Testing
274312

275313
(We recommend using [nextest](https://crates.io/crates/cargo-nextest) to run the tests)

examples/historical_scanning/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async fn main() -> anyhow::Result<()> {
5555
let _ = counter_contract.increase().send().await?.get_receipt().await?;
5656

5757
let robust_provider = RobustProviderBuilder::new(provider)
58-
.max_timeout(std::time::Duration::from_secs(30))
58+
.call_timeout(std::time::Duration::from_secs(30))
5959
.max_retries(5)
6060
.min_delay(std::time::Duration::from_millis(500))
6161
.build()

examples/latest_events_scanning/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> {
5252
.event(Counter::CountIncreased::SIGNATURE);
5353

5454
let robust_provider = RobustProviderBuilder::new(provider)
55-
.max_timeout(std::time::Duration::from_secs(30))
55+
.call_timeout(std::time::Duration::from_secs(30))
5656
.max_retries(5)
5757
.min_delay(std::time::Duration::from_millis(500))
5858
.build()

examples/live_scanning/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> {
5353
.event(Counter::CountIncreased::SIGNATURE);
5454

5555
let robust_provider = RobustProviderBuilder::new(provider)
56-
.max_timeout(std::time::Duration::from_secs(30))
56+
.call_timeout(std::time::Duration::from_secs(30))
5757
.max_retries(5)
5858
.min_delay(std::time::Duration::from_millis(500))
5959
.build()

examples/sync_from_block_scanning/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
6161
}
6262

6363
let robust_provider = RobustProviderBuilder::new(provider)
64-
.max_timeout(Duration::from_secs(30))
64+
.call_timeout(Duration::from_secs(30))
6565
.max_retries(5)
6666
.min_delay(Duration::from_millis(500))
6767
.build()

examples/sync_from_latest_scanning/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> {
5353
.event(Counter::CountIncreased::SIGNATURE);
5454

5555
let robust_provider = RobustProviderBuilder::new(provider)
56-
.max_timeout(std::time::Duration::from_secs(30))
56+
.call_timeout(std::time::Duration::from_secs(30))
5757
.max_retries(5)
5858
.min_delay(std::time::Duration::from_millis(500))
5959
.build()

src/block_range_scanner.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
//! }
6060
//! ```
6161
62+
use crate::robust_provider::subscription::{self, RobustSubscription};
6263
use std::{cmp::Ordering, ops::RangeInclusive};
6364
use tokio::{
6465
sync::{mpsc, oneshot},
@@ -68,7 +69,7 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream};
6869

6970
use crate::{
7071
ScannerError, ScannerMessage,
71-
robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider},
72+
robust_provider::{IntoRobustProvider, RobustProvider, provider::Error as RobustProviderError},
7273
types::{IntoScannerResult, Notification, ScannerResult, TryStream},
7374
};
7475

@@ -77,7 +78,6 @@ use alloy::{
7778
eips::BlockId,
7879
network::{BlockResponse, Network, primitives::HeaderResponse},
7980
primitives::{B256, BlockNumber},
80-
pubsub::Subscription,
8181
};
8282
use tracing::{debug, error, info, warn};
8383

@@ -583,16 +583,45 @@ impl<N: Network> Service<N> {
583583

584584
async fn stream_live_blocks(
585585
mut range_start: BlockNumber,
586-
subscription: Subscription<N::HeaderResponse>,
586+
subscription: RobustSubscription<N>,
587587
sender: mpsc::Sender<BlockScannerResult>,
588588
block_confirmations: u64,
589589
max_block_range: u64,
590590
) {
591591
// ensure we start streaming only after the expected_next_block cutoff
592592
let cutoff = range_start;
593-
let mut stream = subscription.into_stream().skip_while(|header| header.number() < cutoff);
593+
let mut stream = subscription.into_stream().skip_while(|result| match result {
594+
Ok(header) => header.number() < cutoff,
595+
Err(_) => false,
596+
});
597+
598+
while let Some(result) = stream.next().await {
599+
let incoming_block = match result {
600+
Ok(block) => block,
601+
Err(e) => {
602+
error!(error = %e, "Error receiving block from stream");
603+
match e {
604+
subscription::Error::Lagged(_) => {
605+
// scanner already accounts for skipped block numbers
606+
// next block will be the actual incoming block
607+
continue;
608+
}
609+
subscription::Error::Timeout => {
610+
_ = sender.try_stream(ScannerError::Timeout).await;
611+
return;
612+
}
613+
subscription::Error::RpcError(rpc_err) => {
614+
_ = sender.try_stream(ScannerError::RpcError(rpc_err)).await;
615+
return;
616+
}
617+
subscription::Error::Closed => {
618+
_ = sender.try_stream(ScannerError::SubscriptionClosed).await;
619+
return;
620+
}
621+
}
622+
}
623+
};
594624

595-
while let Some(incoming_block) = stream.next().await {
596625
let incoming_block_num = incoming_block.number();
597626
info!(block_number = incoming_block_num, "Received block header");
598627

src/error.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use alloy::{
66
};
77
use thiserror::Error;
88

9-
use crate::{robust_provider::Error as RobustProviderError, types::ScannerResult};
9+
use crate::{robust_provider::provider::Error as RobustProviderError, types::ScannerResult};
1010

1111
#[derive(Error, Debug, Clone)]
1212
pub enum ScannerError {
@@ -30,6 +30,9 @@ pub enum ScannerError {
3030

3131
#[error("Max block range must be greater than 0")]
3232
InvalidMaxBlockRange,
33+
34+
#[error("Subscription closed")]
35+
SubscriptionClosed,
3336
}
3437

3538
impl From<RobustProviderError> for ScannerError {
@@ -47,6 +50,7 @@ impl From<RpcError<TransportErrorKind>> for ScannerError {
4750
ScannerError::RpcError(Arc::new(error))
4851
}
4952
}
53+
5054
impl<T: Clone> PartialEq<ScannerError> for ScannerResult<T> {
5155
fn eq(&self, other: &ScannerError) -> bool {
5256
match self {

0 commit comments

Comments
 (0)