Skip to content

Commit 16128db

Browse files
authored
feat: Support conversions (#164)
1 parent a24ba66 commit 16128db

File tree

14 files changed

+369
-202
lines changed

14 files changed

+369
-202
lines changed

README.md

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ Create an event stream for the given event filters registered with the `EventSca
6060

6161
```rust
6262
use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}, sol_types::SolEvent};
63-
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
63+
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder};
6464
use tokio_stream::StreamExt;
6565

6666
use crate::MyContract;
@@ -71,7 +71,7 @@ async fn run_scanner(
7171
) -> Result<(), Box<dyn std::error::Error>> {
7272
// Connect to provider
7373
let provider = ProviderBuilder::new().connect(ws_url).await?;
74-
let robust_provider = RobustProvider::new(provider);
74+
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
7575

7676
// Configure scanner with custom batch size (optional)
7777
let mut scanner = EventScannerBuilder::live()
@@ -116,30 +116,33 @@ async fn run_scanner(
116116
`EventScannerBuilder` provides mode-specific constructors and functions to configure settings before connecting.
117117
Once configured, connect using:
118118

119-
- `connect(robust_provider)` - Connect using a `RobustProvider` wrapping your alloy provider
119+
- `connect(provider)` - Connect using a `RobustProvider` wrapping your alloy provider or using an alloy provider directly
120120

121121
This will connect the `EventScanner` and allow you to create event streams and start scanning in various [modes](#scanning-modes).
122122

123123
```rust
124124
use alloy::providers::{Provider, ProviderBuilder};
125-
use event_scanner::robust_provider::RobustProvider;
125+
use event_scanner::robust_provider::RobustProviderBuilder;
126126

127127
// Connect to provider (example with WebSocket)
128128
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
129-
let robust_provider = RobustProvider::new(provider);
130129

131130
// Live streaming mode
132131
let scanner = EventScannerBuilder::live()
133132
.max_block_range(500) // Optional: set max blocks per read (default: 1000)
134133
.block_confirmations(12) // Optional: set block confirmations (default: 12)
135-
.connect(robust_provider.clone());
134+
.connect(provider.clone());
136135

137136
// Historical block range mode
138137
let scanner = EventScannerBuilder::historic()
139138
.from_block(1_000_000)
140139
.to_block(2_000_000)
141140
.max_block_range(500)
142-
.connect(robust_provider.clone());
141+
.connect(provider.clone());
142+
143+
// we can also wrap the provider in a RobustProvider
144+
// for more advanced configurations like retries and fallbacks
145+
let robust_provider = RobustProviderBuilder::new(provider).build().await?;
143146

144147
// Latest events mode
145148
let scanner = EventScannerBuilder::latest(100)

examples/historical_scanning/main.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
22
use alloy_node_bindings::Anvil;
33

4-
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
4+
use event_scanner::{
5+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
6+
};
57
use tokio_stream::StreamExt;
68
use tracing::{error, info};
79
use tracing_subscriber::EnvFilter;
@@ -52,12 +54,14 @@ async fn main() -> anyhow::Result<()> {
5254

5355
let _ = counter_contract.increase().send().await?.get_receipt().await?;
5456

55-
let robust_provider = RobustProvider::new(provider)
57+
let robust_provider = RobustProviderBuilder::new(provider)
5658
.max_timeout(std::time::Duration::from_secs(30))
5759
.max_retries(5)
58-
.min_delay(std::time::Duration::from_millis(500));
60+
.min_delay(std::time::Duration::from_millis(500))
61+
.build()
62+
.await?;
5963

60-
let mut scanner = EventScannerBuilder::historic().connect(robust_provider);
64+
let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?;
6165

6266
let mut stream = scanner.subscribe(increase_filter);
6367

examples/latest_events_scanning/main.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
22
use alloy_node_bindings::Anvil;
3-
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
3+
use event_scanner::{
4+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
5+
};
46
use tokio_stream::StreamExt;
57
use tracing::{error, info};
68
use tracing_subscriber::EnvFilter;
@@ -49,12 +51,14 @@ async fn main() -> anyhow::Result<()> {
4951
.contract_address(*contract_address)
5052
.event(Counter::CountIncreased::SIGNATURE);
5153

52-
let robust_provider = RobustProvider::new(provider)
54+
let robust_provider = RobustProviderBuilder::new(provider)
5355
.max_timeout(std::time::Duration::from_secs(30))
5456
.max_retries(5)
55-
.min_delay(std::time::Duration::from_millis(500));
57+
.min_delay(std::time::Duration::from_millis(500))
58+
.build()
59+
.await?;
5660

57-
let mut scanner = EventScannerBuilder::latest(5).connect(robust_provider);
61+
let mut scanner = EventScannerBuilder::latest(5).connect(robust_provider).await?;
5862

5963
let mut stream = scanner.subscribe(increase_filter);
6064

examples/live_scanning/main.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
22
use alloy_node_bindings::Anvil;
3-
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
3+
use event_scanner::{
4+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
5+
};
46

57
use tokio_stream::StreamExt;
68
use tracing::{error, info};
@@ -50,12 +52,14 @@ async fn main() -> anyhow::Result<()> {
5052
.contract_address(*contract_address)
5153
.event(Counter::CountIncreased::SIGNATURE);
5254

53-
let robust_provider = RobustProvider::new(provider)
55+
let robust_provider = RobustProviderBuilder::new(provider)
5456
.max_timeout(std::time::Duration::from_secs(30))
5557
.max_retries(5)
56-
.min_delay(std::time::Duration::from_millis(500));
58+
.min_delay(std::time::Duration::from_millis(500))
59+
.build()
60+
.await?;
5761

58-
let mut scanner = EventScannerBuilder::live().connect(robust_provider);
62+
let mut scanner = EventScannerBuilder::live().connect(robust_provider).await?;
5963

6064
let mut stream = scanner.subscribe(increase_filter);
6165

examples/sync_from_block_scanning/main.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use std::time::Duration;
22

33
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
44
use alloy_node_bindings::Anvil;
5-
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
5+
use event_scanner::{
6+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
7+
};
68
use tokio::time::sleep;
79
use tokio_stream::StreamExt;
810
use tracing::{error, info};
@@ -58,12 +60,14 @@ async fn main() -> anyhow::Result<()> {
5860
info!("Historical event {} created", i + 1);
5961
}
6062

61-
let robust_provider = RobustProvider::new(provider)
63+
let robust_provider = RobustProviderBuilder::new(provider)
6264
.max_timeout(Duration::from_secs(30))
6365
.max_retries(5)
64-
.min_delay(Duration::from_millis(500));
66+
.min_delay(Duration::from_millis(500))
67+
.build()
68+
.await?;
6569

66-
let mut scanner = EventScannerBuilder::sync().from_block(0).connect(robust_provider);
70+
let mut scanner = EventScannerBuilder::sync().from_block(0).connect(robust_provider).await?;
6771

6872
let mut stream = scanner.subscribe(increase_filter);
6973

examples/sync_from_latest_scanning/main.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent};
22
use alloy_node_bindings::Anvil;
3-
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
3+
use event_scanner::{
4+
EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder,
5+
};
46

57
use tokio_stream::StreamExt;
68
use tracing::{error, info};
@@ -50,12 +52,14 @@ async fn main() -> anyhow::Result<()> {
5052
.contract_address(*contract_address)
5153
.event(Counter::CountIncreased::SIGNATURE);
5254

53-
let robust_provider = RobustProvider::new(provider)
55+
let robust_provider = RobustProviderBuilder::new(provider)
5456
.max_timeout(std::time::Duration::from_secs(30))
5557
.max_retries(5)
56-
.min_delay(std::time::Duration::from_millis(500));
58+
.min_delay(std::time::Duration::from_millis(500))
59+
.build()
60+
.await?;
5761

58-
let mut client = EventScannerBuilder::sync().from_latest(5).connect(robust_provider);
62+
let mut client = EventScannerBuilder::sync().from_latest(5).connect(robust_provider).await?;
5963

6064
let mut stream = client.subscribe(increase_filter);
6165

src/block_range_scanner.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//! BlockRangeScanner, BlockRangeScannerClient, DEFAULT_BLOCK_CONFIRMATIONS,
1313
//! DEFAULT_MAX_BLOCK_RANGE, Message,
1414
//! },
15-
//! robust_provider::RobustProvider,
15+
//! robust_provider::RobustProviderBuilder,
1616
//! };
1717
//! use tokio::time::Duration;
1818
//! use tracing::{error, info};
@@ -24,8 +24,8 @@
2424
//!
2525
//! // Configuration
2626
//! let provider = ProviderBuilder::new().connect("ws://localhost:8546").await?;
27-
//! let robust_provider = RobustProvider::new(provider);
28-
//! let block_range_scanner = BlockRangeScanner::new().connect(robust_provider);
27+
//! let robust_provider = RobustProviderBuilder::new(provider).build().await?;
28+
//! let block_range_scanner = BlockRangeScanner::new().connect(robust_provider).await?;
2929
//!
3030
//! // Create client to send subscribe command to block scanner
3131
//! let client: BlockRangeScannerClient = block_range_scanner.run()?;
@@ -69,7 +69,7 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream};
6969
use crate::{
7070
ScannerMessage,
7171
error::ScannerError,
72-
robust_provider::{Error as RobustProviderError, RobustProvider},
72+
robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider},
7373
types::{ScannerStatus, TryStream},
7474
};
7575
use alloy::{
@@ -148,9 +148,16 @@ impl BlockRangeScanner {
148148
}
149149

150150
/// Connects to an existing provider
151-
#[must_use]
152-
pub fn connect<N: Network>(self, provider: RobustProvider<N>) -> ConnectedBlockRangeScanner<N> {
153-
ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range }
151+
///
152+
/// # Errors
153+
///
154+
/// Returns an error if the provider connection fails.
155+
pub async fn connect<N: Network>(
156+
self,
157+
provider: impl IntoRobustProvider<N>,
158+
) -> Result<ConnectedBlockRangeScanner<N>, ScannerError> {
159+
let provider = provider.into_robust_provider().await?;
160+
Ok(ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range })
154161
}
155162
}
156163

0 commit comments

Comments
 (0)