Skip to content

Commit b770b00

Browse files
LeoPatOZ0xNeshi
andauthored
feat: Add Multiple providers (#140)
Co-authored-by: Nenad <[email protected]>
1 parent c0a0d70 commit b770b00

File tree

23 files changed

+815
-471
lines changed

23 files changed

+815
-471
lines changed

README.md

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,24 @@ event-scanner = "0.4.0-alpha"
5959
Create an event stream for the given event filters registered with the `EventScanner`:
6060

6161
```rust
62-
use alloy::{network::Ethereum, sol_types::SolEvent};
63-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
62+
use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}, sol_types::SolEvent};
63+
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder};
6464
use tokio_stream::StreamExt;
6565

6666
use crate::MyContract;
6767

6868
async fn run_scanner(
69-
ws_url: alloy::transports::http::reqwest::Url,
69+
ws_url: &str,
7070
contract: alloy::primitives::Address,
7171
) -> Result<(), Box<dyn std::error::Error>> {
72+
// Connect to provider
73+
let provider = ProviderBuilder::new().connect(ws_url).await?;
74+
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
75+
7276
// Configure scanner with custom batch size (optional)
7377
let mut scanner = EventScannerBuilder::live()
7478
.max_block_range(500) // Process up to 500 blocks per batch
75-
.connect_ws::<Ethereum>(ws_url).await?;
79+
.connect(robust_provider);
7680

7781
// Register an event listener
7882
let filter = EventFilter::new()
@@ -109,48 +113,56 @@ async fn run_scanner(
109113

110114
### Building a Scanner
111115

112-
`EventScannerBuilder` provides mode-specific constructors and a functions to configure settings before connecting.
113-
Once configured, connect using one of:
116+
`EventScannerBuilder` provides mode-specific constructors and functions to configure settings before connecting.
117+
Once configured, connect using:
114118

115-
- `connect_ws::<Ethereum>(ws_url)`
116-
- `connect_ipc::<Ethereum>(path)`
117-
- `connect::<Ethereum>(provider)`
119+
- `connect(provider)` - Connect using a `RobustProvider` wrapping your alloy provider or using an alloy provider directly
118120

119121
This will connect the `EventScanner` and allow you to create event streams and start scanning in various [modes](#scanning-modes).
120122

121123
```rust
124+
use alloy::providers::{Provider, ProviderBuilder};
125+
use event_scanner::robust_provider::RobustProviderBuilder;
126+
127+
// Connect to provider (example with WebSocket)
128+
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
129+
122130
// Live streaming mode
123131
let scanner = EventScannerBuilder::live()
124132
.max_block_range(500) // Optional: set max blocks per read (default: 1000)
125133
.block_confirmations(12) // Optional: set block confirmations (default: 12)
126-
.connect_ws::<Ethereum>(ws_url).await?;
134+
.connect(provider.clone());
127135

128136
// Historical block range mode
129137
let scanner = EventScannerBuilder::historic()
130138
.from_block(1_000_000)
131139
.to_block(2_000_000)
132140
.max_block_range(500)
133-
.connect_ws::<Ethereum>(ws_url).await?;
141+
.connect(provider.clone());
142+
143+
// we can also wrap the provider in a RobustProvider
144+
// for more advanced configurations like retries and fallbacks
145+
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
134146

135147
// Latest events mode
136148
let scanner = EventScannerBuilder::latest(100)
137149
// .from_block(1_000_000) // Optional: set start of search range
138150
// .to_block(2_000_000) // Optional: set end of search range
139151
.max_block_range(500)
140-
.connect_ws::<Ethereum>(ws_url).await?;
152+
.connect(robust_provider.clone());
141153

142154
// Sync from block then switch to live mode
143155
let scanner = EventScannerBuilder::sync()
144156
.from_block(100)
145157
.max_block_range(500)
146158
.block_confirmations(12)
147-
.connect_ws::<Ethereum>(ws_url).await?;
159+
.connect(robust_provider.clone());
148160

149161
// Sync the latest 60 events then switch to live mode
150162
let scanner = EventScannerBuilder::sync()
151163
.from_latest(60)
152164
.block_confirmations(12)
153-
.connect_ws::<Ethereum>(ws_url).await?;
165+
.connect(robust_provider);
154166
```
155167

156168
Invoking `scanner.start()` starts the scanner in the specified mode.

examples/historical_scanning/main.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent};
1+
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
22
use alloy_node_bindings::Anvil;
33

4-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
4+
use event_scanner::{
5+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
6+
};
57
use tokio_stream::StreamExt;
68
use tracing::{error, info};
79
use tracing_subscriber::EnvFilter;
@@ -38,9 +40,11 @@ async fn main() -> anyhow::Result<()> {
3840

3941
let anvil = Anvil::new().block_time_f64(0.1).try_spawn()?;
4042
let wallet = anvil.wallet();
41-
let provider =
42-
ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?;
43-
let counter_contract = Counter::deploy(provider).await?;
43+
let provider = ProviderBuilder::new()
44+
.wallet(wallet.unwrap())
45+
.connect(anvil.ws_endpoint_url().as_str())
46+
.await?;
47+
let counter_contract = Counter::deploy(provider.clone()).await?;
4448

4549
let contract_address = counter_contract.address();
4650

@@ -50,8 +54,14 @@ async fn main() -> anyhow::Result<()> {
5054

5155
let _ = counter_contract.increase().send().await?.get_receipt().await?;
5256

53-
let mut scanner =
54-
EventScannerBuilder::historic().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
57+
let robust_provider = RobustProviderBuilder::new(provider)
58+
.max_timeout(std::time::Duration::from_secs(30))
59+
.max_retries(5)
60+
.min_delay(std::time::Duration::from_millis(500))
61+
.build()
62+
.await?;
63+
64+
let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?;
5565

5666
let mut stream = scanner.subscribe(increase_filter);
5767

examples/latest_events_scanning/main.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent};
1+
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
22
use alloy_node_bindings::Anvil;
3-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
3+
use event_scanner::{
4+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
5+
};
46
use tokio_stream::StreamExt;
57
use tracing::{error, info};
68
use tracing_subscriber::EnvFilter;
@@ -37,18 +39,26 @@ async fn main() -> anyhow::Result<()> {
3739

3840
let anvil = Anvil::new().block_time_f64(0.5).try_spawn()?;
3941
let wallet = anvil.wallet();
40-
let provider =
41-
ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?;
42-
let counter_contract = Counter::deploy(provider).await?;
42+
let provider = ProviderBuilder::new()
43+
.wallet(wallet.unwrap())
44+
.connect(anvil.ws_endpoint_url().as_str())
45+
.await?;
46+
let counter_contract = Counter::deploy(provider.clone()).await?;
4347

4448
let contract_address = counter_contract.address();
4549

4650
let increase_filter = EventFilter::new()
4751
.contract_address(*contract_address)
4852
.event(Counter::CountIncreased::SIGNATURE);
4953

50-
let mut scanner =
51-
EventScannerBuilder::latest(5).connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
54+
let robust_provider = RobustProviderBuilder::new(provider)
55+
.max_timeout(std::time::Duration::from_secs(30))
56+
.max_retries(5)
57+
.min_delay(std::time::Duration::from_millis(500))
58+
.build()
59+
.await?;
60+
61+
let mut scanner = EventScannerBuilder::latest(5).connect(robust_provider).await?;
5262

5363
let mut stream = scanner.subscribe(increase_filter);
5464

examples/live_scanning/main.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent};
1+
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
22
use alloy_node_bindings::Anvil;
3-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
3+
use event_scanner::{
4+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
5+
};
46

57
use tokio_stream::StreamExt;
68
use tracing::{error, info};
@@ -38,18 +40,26 @@ async fn main() -> anyhow::Result<()> {
3840

3941
let anvil = Anvil::new().block_time(1).try_spawn()?;
4042
let wallet = anvil.wallet();
41-
let provider =
42-
ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?;
43-
let counter_contract = Counter::deploy(provider).await?;
43+
let provider = ProviderBuilder::new()
44+
.wallet(wallet.unwrap())
45+
.connect(anvil.ws_endpoint_url().as_str())
46+
.await?;
47+
let counter_contract = Counter::deploy(provider.clone()).await?;
4448

4549
let contract_address = counter_contract.address();
4650

4751
let increase_filter = EventFilter::new()
4852
.contract_address(*contract_address)
4953
.event(Counter::CountIncreased::SIGNATURE);
5054

51-
let mut scanner =
52-
EventScannerBuilder::live().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
55+
let robust_provider = RobustProviderBuilder::new(provider)
56+
.max_timeout(std::time::Duration::from_secs(30))
57+
.max_retries(5)
58+
.min_delay(std::time::Duration::from_millis(500))
59+
.build()
60+
.await?;
61+
62+
let mut scanner = EventScannerBuilder::live().connect(robust_provider).await?;
5363

5464
let mut stream = scanner.subscribe(increase_filter);
5565

examples/sync_from_block_scanning/main.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use std::time::Duration;
22

3-
use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent};
3+
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
44
use alloy_node_bindings::Anvil;
5-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
5+
use event_scanner::{
6+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
7+
};
68
use tokio::time::sleep;
79
use tokio_stream::StreamExt;
810
use tracing::{error, info};
@@ -40,9 +42,11 @@ async fn main() -> anyhow::Result<()> {
4042

4143
let anvil = Anvil::new().block_time(1).try_spawn()?;
4244
let wallet = anvil.wallet();
43-
let provider =
44-
ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?;
45-
let counter_contract = Counter::deploy(provider).await?;
45+
let provider = ProviderBuilder::new()
46+
.wallet(wallet.unwrap())
47+
.connect(anvil.ws_endpoint_url().as_str())
48+
.await?;
49+
let counter_contract = Counter::deploy(provider.clone()).await?;
4650

4751
let contract_address = counter_contract.address();
4852

@@ -56,11 +60,15 @@ async fn main() -> anyhow::Result<()> {
5660
info!("Historical event {} created", i + 1);
5761
}
5862

59-
let mut scanner = EventScannerBuilder::sync()
60-
.from_block(0)
61-
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
63+
let robust_provider = RobustProviderBuilder::new(provider)
64+
.max_timeout(Duration::from_secs(30))
65+
.max_retries(5)
66+
.min_delay(Duration::from_millis(500))
67+
.build()
6268
.await?;
6369

70+
let mut scanner = EventScannerBuilder::sync().from_block(0).connect(robust_provider).await?;
71+
6472
let mut stream = scanner.subscribe(increase_filter);
6573

6674
info!("Starting sync scanner...");

examples/sync_from_latest_scanning/main.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent};
1+
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
22
use alloy_node_bindings::Anvil;
3-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
3+
use event_scanner::{
4+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
5+
};
46

57
use tokio_stream::StreamExt;
68
use tracing::{error, info};
@@ -38,21 +40,27 @@ async fn main() -> anyhow::Result<()> {
3840

3941
let anvil = Anvil::new().block_time_f64(0.5).try_spawn()?;
4042
let wallet = anvil.wallet();
41-
let provider =
42-
ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?;
43-
let counter_contract = Counter::deploy(provider).await?;
43+
let provider = ProviderBuilder::new()
44+
.wallet(wallet.unwrap())
45+
.connect(anvil.ws_endpoint_url().as_str())
46+
.await?;
47+
let counter_contract = Counter::deploy(provider.clone()).await?;
4448

4549
let contract_address = counter_contract.address();
4650

4751
let increase_filter = EventFilter::new()
4852
.contract_address(*contract_address)
4953
.event(Counter::CountIncreased::SIGNATURE);
5054

51-
let mut client = EventScannerBuilder::sync()
52-
.from_latest(5)
53-
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
55+
let robust_provider = RobustProviderBuilder::new(provider)
56+
.max_timeout(std::time::Duration::from_secs(30))
57+
.max_retries(5)
58+
.min_delay(std::time::Duration::from_millis(500))
59+
.build()
5460
.await?;
5561

62+
let mut client = EventScannerBuilder::sync().from_latest(5).connect(robust_provider).await?;
63+
5664
let mut stream = client.subscribe(increase_filter);
5765

5866
for _ in 0..10 {

0 commit comments

Comments
 (0)