Skip to content

Commit 3c4ebb1

Browse files
committed
ref: only connect to robust provider
1 parent a717ef4 commit 3c4ebb1

File tree

10 files changed

+81
-63
lines changed

10 files changed

+81
-63
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ Once configured, connect using one of:
113113

114114
- `connect_ws::<Ethereum>(ws_url)`
115115
- `connect_ipc::<Ethereum>(path)`
116-
- `connect::<Ethereum>(provider)`
116+
- `connect::<Ethereum>(robust_provider)`
117117

118118
This will connect the `EventScanner` and allow you to create event streams and start scanning in various [modes](#scanning-modes).
119119

examples/historical_scanning/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use alloy::{
66
};
77
use alloy_node_bindings::Anvil;
88

9-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
9+
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
1010
use tokio_stream::StreamExt;
1111
use tracing::{error, info};
1212
use tracing_subscriber::EnvFilter;
@@ -57,8 +57,8 @@ async fn main() -> anyhow::Result<()> {
5757

5858
let _ = counter_contract.increase().send().await?.get_receipt().await?;
5959

60-
let mut scanner =
61-
EventScannerBuilder::historic().connect::<Ethereum>(provider.root().to_owned());
60+
let robust_provider = RobustProvider::new(provider.root().clone());
61+
let mut scanner = EventScannerBuilder::historic().connect::<Ethereum>(robust_provider);
6262

6363
let mut stream = scanner.subscribe(increase_filter);
6464

examples/latest_events_scanning/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use alloy::{
55
sol_types::SolEvent,
66
};
77
use alloy_node_bindings::Anvil;
8-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
8+
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
99
use tokio_stream::StreamExt;
1010
use tracing::{error, info};
1111
use tracing_subscriber::EnvFilter;
@@ -54,8 +54,8 @@ async fn main() -> anyhow::Result<()> {
5454
.contract_address(*contract_address)
5555
.event(Counter::CountIncreased::SIGNATURE);
5656

57-
let mut scanner =
58-
EventScannerBuilder::latest(5).connect::<Ethereum>(provider.root().to_owned());
57+
let robust_provider = RobustProvider::new(provider.root().clone());
58+
let mut scanner = EventScannerBuilder::latest(5).connect::<Ethereum>(robust_provider);
5959

6060
let mut stream = scanner.subscribe(increase_filter);
6161

examples/live_scanning/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use alloy::{
55
sol_types::SolEvent,
66
};
77
use alloy_node_bindings::Anvil;
8-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
8+
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
99

1010
use tokio_stream::StreamExt;
1111
use tracing::{error, info};
@@ -55,7 +55,8 @@ async fn main() -> anyhow::Result<()> {
5555
.contract_address(*contract_address)
5656
.event(Counter::CountIncreased::SIGNATURE);
5757

58-
let mut scanner = EventScannerBuilder::live().connect::<Ethereum>(provider.root().to_owned());
58+
let robust_provider = RobustProvider::new(provider.root().clone());
59+
let mut scanner = EventScannerBuilder::live().connect::<Ethereum>(robust_provider);
5960

6061
let mut stream = scanner.subscribe(increase_filter);
6162

examples/sync_from_block_scanning/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use alloy::{
77
sol_types::SolEvent,
88
};
99
use alloy_node_bindings::Anvil;
10-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
10+
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
1111
use tokio::time::sleep;
1212
use tokio_stream::StreamExt;
1313
use tracing::{error, info};
@@ -63,8 +63,9 @@ async fn main() -> anyhow::Result<()> {
6363
info!("Historical event {} created", i + 1);
6464
}
6565

66+
let robust_provider = RobustProvider::new(provider.root().clone());
6667
let mut scanner =
67-
EventScannerBuilder::sync().from_block(0).connect::<Ethereum>(provider.root().to_owned());
68+
EventScannerBuilder::sync().from_block(0).connect::<Ethereum>(robust_provider);
6869

6970
let mut stream = scanner.subscribe(increase_filter);
7071

examples/sync_from_latest_scanning/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use alloy::{
55
sol_types::SolEvent,
66
};
77
use alloy_node_bindings::Anvil;
8-
use event_scanner::{EventFilter, EventScannerBuilder, Message};
8+
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProvider};
99

1010
use tokio_stream::StreamExt;
1111
use tracing::{error, info};
@@ -55,8 +55,9 @@ async fn main() -> anyhow::Result<()> {
5555
.contract_address(*contract_address)
5656
.event(Counter::CountIncreased::SIGNATURE);
5757

58+
let robust_provider = RobustProvider::new(provider.root().clone());
5859
let mut client =
59-
EventScannerBuilder::sync().from_latest(5).connect::<Ethereum>(provider.root().to_owned());
60+
EventScannerBuilder::sync().from_latest(5).connect::<Ethereum>(robust_provider);
6061

6162
let mut stream = client.subscribe(increase_filter);
6263

src/block_range_scanner.rs

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -147,20 +147,9 @@ impl BlockRangeScanner {
147147
}
148148

149149
/// Connects to an existing provider
150-
///
151-
/// This method also tries to connect any fallback providers (both WebSocket and IPC)
152-
///
153-
/// # Errors
154-
///
155-
/// Returns an error if any fallback connection fails
156-
pub fn connect<N: Network>(
157-
self,
158-
provider: impl Into<RobustProvider<N>>,
159-
) -> ConnectedBlockRangeScanner<N> {
160-
ConnectedBlockRangeScanner {
161-
provider: provider.into(),
162-
max_block_range: self.max_block_range,
163-
}
150+
#[must_use]
151+
pub fn connect<N: Network>(self, provider: RobustProvider<N>) -> ConnectedBlockRangeScanner<N> {
152+
ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range }
164153
}
165154
}
166155

@@ -905,11 +894,12 @@ mod tests {
905894
let anvil = Anvil::new().try_spawn()?;
906895
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
907896

897+
let robust_provider = RobustProvider::new(provider.root().clone());
898+
908899
provider.subscribe_blocks();
909900
// --- Zero block confirmations -> stream immediately ---
910901

911-
let client =
912-
BlockRangeScanner::new().connect::<Ethereum>(provider.root().to_owned()).run()?;
902+
let client = BlockRangeScanner::new().connect::<Ethereum>(robust_provider).run()?;
913903

914904
let mut stream = client.stream_live(0).await?;
915905

@@ -953,12 +943,12 @@ mod tests {
953943
let anvil = Anvil::new().try_spawn()?;
954944

955945
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
946+
let robust_provider = RobustProvider::new(provider.root().clone());
956947
provider.anvil_mine(Some(20), None).await?;
957948

958949
let block_confirmations = 5;
959950

960-
let client =
961-
BlockRangeScanner::new().connect::<Ethereum>(provider.root().to_owned()).run()?;
951+
let client = BlockRangeScanner::new().connect::<Ethereum>(robust_provider).run()?;
962952

963953
let stream = client.stream_from(BlockNumberOrTag::Latest, block_confirmations).await?;
964954

@@ -987,11 +977,11 @@ mod tests {
987977
let anvil = Anvil::new().try_spawn()?;
988978

989979
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
980+
let robust_provider = RobustProvider::new(provider.root().clone());
990981

991982
let block_confirmations = 5;
992983

993-
let client =
994-
BlockRangeScanner::new().connect::<Ethereum>(provider.root().to_owned()).run()?;
984+
let client = BlockRangeScanner::new().connect::<Ethereum>(robust_provider).run()?;
995985

996986
let mut receiver = client.stream_live(block_confirmations).await?;
997987

@@ -1029,11 +1019,11 @@ mod tests {
10291019
let anvil = Anvil::new().block_time(1).try_spawn()?;
10301020

10311021
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1022+
let robust_provider = RobustProvider::new(provider.root().clone());
10321023

10331024
let block_confirmations = 3;
10341025

1035-
let client =
1036-
BlockRangeScanner::new().connect::<Ethereum>(provider.root().to_owned()).run()?;
1026+
let client = BlockRangeScanner::new().connect::<Ethereum>(robust_provider).run()?;
10371027

10381028
let mut receiver = client.stream_live(block_confirmations).await?;
10391029

@@ -1092,14 +1082,15 @@ mod tests {
10921082
async fn historical_emits_correction_range_when_reorg_below_end() -> anyhow::Result<()> {
10931083
let anvil = Anvil::new().try_spawn()?;
10941084
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1085+
let robust_provider = RobustProvider::new(provider.root().clone());
10951086

10961087
provider.anvil_mine(Some(120), None).await?;
10971088

10981089
let end_num = 110;
10991090

11001091
let client = BlockRangeScanner::new()
11011092
.max_block_range(30)
1102-
.connect::<Ethereum>(provider.root().to_owned())
1093+
.connect::<Ethereum>(robust_provider)
11031094
.run()?;
11041095

11051096
let mut stream = client
@@ -1126,14 +1117,15 @@ mod tests {
11261117
async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Result<()> {
11271118
let anvil = Anvil::new().try_spawn()?;
11281119
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1120+
let robust_provider = RobustProvider::new(provider.root().clone());
11291121

11301122
provider.anvil_mine(Some(120), None).await?;
11311123

11321124
let end_num = 120;
11331125

11341126
let client = BlockRangeScanner::new()
11351127
.max_block_range(30)
1136-
.connect::<Ethereum>(provider.root().to_owned())
1128+
.connect::<Ethereum>(robust_provider)
11371129
.run()?;
11381130

11391131
let mut stream = client
@@ -1162,12 +1154,13 @@ mod tests {
11621154
let anvil = Anvil::new().try_spawn()?;
11631155

11641156
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1157+
let robust_provider = RobustProvider::new(provider.root().clone());
11651158

11661159
provider.anvil_mine(Some(100), None).await?;
11671160

11681161
let client = BlockRangeScanner::new()
11691162
.max_block_range(5)
1170-
.connect::<Ethereum>(provider.root().to_owned())
1163+
.connect::<Ethereum>(robust_provider.clone())
11711164
.run()?;
11721165

11731166
// ranges where each batch is of max blocks per epoch size
@@ -1197,7 +1190,7 @@ mod tests {
11971190
// range where blocks per epoch is larger than the number of blocks on chain
11981191
let client = BlockRangeScanner::new()
11991192
.max_block_range(200)
1200-
.connect::<Ethereum>(provider.root().to_owned())
1193+
.connect::<Ethereum>(robust_provider)
12011194
.run()?;
12021195

12031196
let mut stream = client.stream_historical(0, 20).await?;
@@ -1216,11 +1209,12 @@ mod tests {
12161209
let anvil = Anvil::new().try_spawn()?;
12171210

12181211
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1212+
let robust_provider = RobustProvider::new(provider.root().clone());
12191213
provider.anvil_mine(Some(11), None).await?;
12201214

12211215
let client = BlockRangeScanner::new()
12221216
.max_block_range(5)
1223-
.connect::<Ethereum>(provider.root().to_owned())
1217+
.connect::<Ethereum>(robust_provider)
12241218
.run()?;
12251219

12261220
let mut stream = client.stream_historical(10, 0).await?;
@@ -1333,12 +1327,13 @@ mod tests {
13331327
let anvil = Anvil::new().try_spawn()?;
13341328

13351329
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1330+
let robust_provider = RobustProvider::new(provider.root().clone());
13361331

13371332
provider.anvil_mine(Some(150), None).await?;
13381333

13391334
let client = BlockRangeScanner::new()
13401335
.max_block_range(100)
1341-
.connect::<Ethereum>(provider.root().to_owned())
1336+
.connect::<Ethereum>(robust_provider)
13421337
.run()?;
13431338

13441339
let mut stream = client.rewind(100, 150).await?;
@@ -1356,12 +1351,13 @@ mod tests {
13561351
let anvil = Anvil::new().try_spawn()?;
13571352

13581353
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1354+
let robust_provider = RobustProvider::new(provider.root().clone());
13591355

13601356
provider.anvil_mine(Some(15), None).await?;
13611357

13621358
let client = BlockRangeScanner::new()
13631359
.max_block_range(5)
1364-
.connect::<Ethereum>(provider.root().to_owned())
1360+
.connect::<Ethereum>(robust_provider)
13651361
.run()?;
13661362

13671363
let mut stream = client.rewind(0, 14).await?;
@@ -1380,12 +1376,13 @@ mod tests {
13801376
let anvil = Anvil::new().try_spawn()?;
13811377

13821378
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1379+
let robust_provider = RobustProvider::new(provider.root().clone());
13831380

13841381
provider.anvil_mine(Some(15), None).await?;
13851382

13861383
let client = BlockRangeScanner::new()
13871384
.max_block_range(4)
1388-
.connect::<Ethereum>(provider.root().to_owned())
1385+
.connect::<Ethereum>(robust_provider)
13891386
.run()?;
13901387

13911388
let mut stream = client.rewind(3, 12).await?;
@@ -1404,12 +1401,13 @@ mod tests {
14041401
let anvil = Anvil::new().try_spawn()?;
14051402

14061403
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1404+
let robust_provider = RobustProvider::new(provider.root().clone());
14071405

14081406
provider.anvil_mine(Some(15), None).await?;
14091407

14101408
let client = BlockRangeScanner::new()
14111409
.max_block_range(5)
1412-
.connect::<Ethereum>(provider.root().to_owned())
1410+
.connect::<Ethereum>(robust_provider)
14131411
.run()?;
14141412

14151413
let mut stream = client.rewind(7, 7).await?;
@@ -1425,12 +1423,13 @@ mod tests {
14251423
let anvil = Anvil::new().try_spawn()?;
14261424

14271425
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1426+
let robust_provider = RobustProvider::new(provider.root().to_owned());
14281427

14291428
provider.anvil_mine(Some(15), None).await?;
14301429

14311430
let client = BlockRangeScanner::new()
14321431
.max_block_range(1)
1433-
.connect::<Ethereum>(provider.root().to_owned())
1432+
.connect::<Ethereum>(robust_provider)
14341433
.run()?;
14351434

14361435
let mut stream = client.rewind(5, 8).await?;
@@ -1450,12 +1449,13 @@ mod tests {
14501449
let anvil = Anvil::new().try_spawn()?;
14511450

14521451
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1452+
let robust_provider = RobustProvider::new(provider.root().clone());
14531453
// Mine 20 blocks, so the total number of blocks is 21 (including 0th block)
14541454
provider.anvil_mine(Some(20), None).await?;
14551455

14561456
let client = BlockRangeScanner::new()
14571457
.max_block_range(7)
1458-
.connect::<Ethereum>(provider.root().to_owned())
1458+
.connect::<Ethereum>(robust_provider)
14591459
.run()?;
14601460

14611461
let mut stream =
@@ -1474,12 +1474,13 @@ mod tests {
14741474
let anvil = Anvil::new().try_spawn()?;
14751475

14761476
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1477+
let robust_provider = RobustProvider::new(provider.root().clone());
14771478
// Ensure blocks at 3 and 15 exist
14781479
provider.anvil_mine(Some(16), None).await?;
14791480

14801481
let client = BlockRangeScanner::new()
14811482
.max_block_range(5)
1482-
.connect::<Ethereum>(provider.root().to_owned())
1483+
.connect::<Ethereum>(robust_provider)
14831484
.run()?;
14841485

14851486
let mut stream = client.rewind(15, 3).await?;
@@ -1504,11 +1505,12 @@ mod tests {
15041505
let anvil = Anvil::new().try_spawn()?;
15051506

15061507
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
1508+
let robust_provider = RobustProvider::new(provider.root().to_owned());
15071509

15081510
// Do not mine up to 999 so start won't exist
15091511
let client = BlockRangeScanner::new()
15101512
.max_block_range(5)
1511-
.connect::<Ethereum>(provider.root().to_owned())
1513+
.connect::<Ethereum>(robust_provider)
15121514
.run()?;
15131515

15141516
let stream = client.rewind(0, 999).await;

0 commit comments

Comments
 (0)