Skip to content

Commit 736fe6e

Browse files
authored
Refactor robust subscription (#190)
1 parent befed9c commit 736fe6e

File tree

6 files changed

+342
-204
lines changed

6 files changed

+342
-204
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ alloy-node-bindings.workspace = true
6969
tokio-stream.workspace = true
7070
tracing.workspace = true
7171
backon.workspace = true
72+
tokio-util = "0.7.17"
7273

7374
[dev-dependencies]
7475
tracing-subscriber.workspace = true

README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,44 @@ All examples spin up a local `anvil` instance, deploy a demo counter contract, a
266266

267267
---
268268

269+
## Robust Provider
270+
271+
`event-scanner` ships with a `robust_provider` module that wraps Alloy providers with:
272+
273+
- bounded per-call timeouts and exponential backoff retries
274+
- automatic failover from a primary provider to one or more fallbacks
275+
- resilient WebSocket block subscriptions with timeout handling and reconnection.
276+
277+
The main entry point is `robust_provider::RobustProviderBuilder`, which accepts a wide
278+
range of provider types (URLs, `RootProvider`, layered providers, etc.) through the
279+
`IntoProvider` and `IntoRobustProvider` traits.
280+
281+
A typical setup looks like:
282+
283+
```rust
284+
use alloy::providers::ProviderBuilder;
285+
use event_scanner::robust_provider::RobustProviderBuilder;
286+
use std::time::Duration;
287+
288+
async fn example() -> anyhow::Result<()> {
289+
let ws = ProviderBuilder::new().connect("ws://localhost:8545").await?;
290+
let http = ProviderBuilder::new().connect_http("http://localhost:8545".parse()?);
291+
292+
let provider = RobustProviderBuilder::new(ws)
293+
.fallback(http)
294+
.call_timeout(Duration::from_secs(30))
295+
.subscription_timeout(Duration::from_secs(120))
296+
.build()
297+
.await?;
298+
Ok(())
299+
}
300+
```
301+
302+
You can then pass this `robust` provider into `EventScannerBuilder::connect` just like
303+
any other provider.
304+
305+
---
306+
269307
## Testing
270308

271309
(We recommend using [nextest](https://crates.io/crates/cargo-nextest) to run the tests)

src/robust_provider/mod.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,64 @@
1+
//! Robust, retrying wrapper around Alloy providers.
2+
//!
3+
//! This module exposes [`RobustProvider`], a small wrapper around Alloy's
4+
//! `RootProvider` that adds:
5+
//! * bounded per-call timeouts
6+
//! * exponential backoff retries
7+
//! * transparent failover between a primary and one or more fallback providers
8+
//! * more robust WebSocket block subscriptions with automatic reconnection
9+
//!
10+
//! Use [`RobustProviderBuilder`] to construct a provider with sensible defaults
11+
//! and optional fallbacks, or implement the [`IntoRobustProvider`] and [`IntoProvider`]
12+
//! traits to support custom providers.
13+
//!
14+
//! # How it works
15+
//!
16+
//! All RPC calls performed through [`RobustProvider`] are wrapped in a total
17+
//! timeout and retried with exponential backoff up to `max_retries`. If the
18+
//! primary provider keeps failing, the call is retried against the configured
19+
//! fallback providers in the order they were added. For subscriptions,
20+
//! [`RobustSubscription`] also tracks lag, switches to fallbacks on repeated
21+
//! failure, and periodically attempts to reconnect to the primary provider.
22+
//!
23+
//! # Examples
24+
//!
25+
//! Creating a robust WebSocket provider with a fallback:
26+
//!
27+
//! ```rust,no_run
28+
//! use alloy::providers::{Provider, ProviderBuilder};
29+
//! use event_scanner::robust_provider::RobustProviderBuilder;
30+
//! use std::time::Duration;
31+
//! use tokio_stream::StreamExt;
32+
//!
33+
//! # async fn example() -> anyhow::Result<()> {
34+
//! let ws = ProviderBuilder::new().connect("ws://localhost:8545").await?;
35+
//! let ws_fallback = ProviderBuilder::new().connect("ws://localhost:8456").await?;
36+
//!
37+
//! let robust = RobustProviderBuilder::new(ws)
38+
//! .fallback(ws_fallback)
39+
//! .call_timeout(Duration::from_secs(30))
40+
//! .subscription_timeout(Duration::from_secs(120))
41+
//! .build()
42+
//! .await?;
43+
//!
44+
//! // Make RPC calls with automatic retries and fallback
45+
//! let block_number = robust.get_block_number().await?;
46+
//! println!("Current block: {}", block_number);
47+
//!
48+
//! // Create subscriptions that automatically reconnect on failure
49+
//! let sub = robust.subscribe_blocks().await?;
50+
//! let mut stream = sub.into_stream();
51+
//! while let Some(response) = stream.next().await {
52+
//! match response {
53+
//! Ok(block) => println!("New block: {:?}", block),
54+
//! Err(e) => println!("Got error: {:?}", e),
55+
//! }
56+
//! }
57+
//! # Ok(()) }
58+
//! ```
59+
//!
60+
//! You can also convert existing providers using [`IntoRobustProvider`]
61+
162
pub mod builder;
263
pub mod error;
364
pub mod provider;

0 commit comments

Comments
 (0)