Skip to content

Commit 77eabd5

Browse files
committed
feat: add fallback provider logic to event scanners
1 parent 6bab724 commit 77eabd5

File tree

2 files changed

+140
-31
lines changed

2 files changed

+140
-31
lines changed

src/block_range_scanner.rs

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,9 @@ impl BlockRangeScanner {
217217
let provider =
218218
RootProvider::<N>::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?);
219219

220-
let fallback_providers = self.connect_all_fallbacks::<N>().await?;
220+
let fallback_providers =
221+
Self::connect_all_fallbacks::<N>(&self.fallback_ws_urls, &self.fallback_ipc_paths)
222+
.await?;
221223

222224
Ok(self.connect_with_fallbacks(provider, fallback_providers))
223225
}
@@ -237,7 +239,9 @@ impl BlockRangeScanner {
237239
let provider =
238240
RootProvider::<N>::new(ClientBuilder::default().ipc(IpcConnect::new(ipc_path)).await?);
239241

240-
let fallback_providers = self.connect_all_fallbacks::<N>().await?;
242+
let fallback_providers =
243+
Self::connect_all_fallbacks::<N>(&self.fallback_ws_urls, &self.fallback_ipc_paths)
244+
.await?;
241245

242246
Ok(self.connect_with_fallbacks(provider, fallback_providers))
243247
}
@@ -253,37 +257,16 @@ impl BlockRangeScanner {
253257
self,
254258
provider: RootProvider<N>,
255259
) -> Result<ConnectedBlockRangeScanner<N>, RpcError<TransportErrorKind>> {
256-
let fallback_providers = self.connect_all_fallbacks::<N>().await?;
260+
let fallback_providers =
261+
Self::connect_all_fallbacks::<N>(&self.fallback_ws_urls, &self.fallback_ipc_paths)
262+
.await?;
257263

258264
Ok(self.connect_with_fallbacks(provider, fallback_providers))
259265
}
260266

261-
/// Establishes connections to all configured fallback providers (both WebSocket and IPC).
262-
///
263-
/// # Errors
264-
///
265-
/// Returns an error if any fallback connection fails
266-
async fn connect_all_fallbacks<N: Network>(
267-
&self,
268-
) -> Result<Vec<RootProvider<N>>, RpcError<TransportErrorKind>> {
269-
let mut fallback_providers = Vec::new();
270-
271-
for url in &self.fallback_ws_urls {
272-
let client = ClientBuilder::default().ws(WsConnect::new(url.clone())).await?;
273-
fallback_providers.push(RootProvider::<N>::new(client));
274-
}
275-
276-
for path in &self.fallback_ipc_paths {
277-
let client = ClientBuilder::default().ipc(IpcConnect::new(path.clone())).await?;
278-
fallback_providers.push(RootProvider::<N>::new(client));
279-
}
280-
281-
Ok(fallback_providers)
282-
}
283-
284267
/// Connects to an existing provider with fallback providers
285268
#[must_use]
286-
fn connect_with_fallbacks<N: Network>(
269+
pub fn connect_with_fallbacks<N: Network>(
287270
self,
288271
provider: RootProvider<N>,
289272
fallback_providers: Vec<RootProvider<N>>,
@@ -302,6 +285,30 @@ impl BlockRangeScanner {
302285
max_block_range: self.max_block_range,
303286
}
304287
}
288+
289+
/// Establishes connections to all configured fallback providers (both WebSocket and IPC).
290+
///
291+
/// # Errors
292+
///
293+
/// Returns an error if any fallback connection fails
294+
async fn connect_all_fallbacks<N: Network>(
295+
fallback_ws_urls: &Vec<Url>,
296+
fallback_ipc_paths: &Vec<String>,
297+
) -> Result<Vec<RootProvider<N>>, RpcError<TransportErrorKind>> {
298+
let mut fallback_providers = Vec::new();
299+
300+
for url in fallback_ws_urls {
301+
let client = ClientBuilder::default().ws(WsConnect::new(url.clone())).await?;
302+
fallback_providers.push(RootProvider::<N>::new(client));
303+
}
304+
305+
for path in fallback_ipc_paths {
306+
let client = ClientBuilder::default().ipc(IpcConnect::new(path.clone())).await?;
307+
fallback_providers.push(RootProvider::<N>::new(client));
308+
}
309+
310+
Ok(fallback_providers)
311+
}
305312
}
306313

307314
pub struct ConnectedBlockRangeScanner<N: Network> {

src/event_scanner/scanner/mod.rs

Lines changed: 106 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,9 +365,62 @@ impl EventScannerBuilder<SyncFromBlock> {
365365
}
366366

367367
impl<M> EventScannerBuilder<M> {
368+
/// Adds a fallback WebSocket URL to the scanner.
369+
///
370+
/// The WebSocket connection will be established when calling the `connect` methods.
371+
/// Multiple fallback providers can be added by calling this method multiple times.
372+
///
373+
/// # Example
374+
///
375+
/// ```no_run
376+
/// # use alloy::network::Ethereum;
377+
/// # use event_scanner::EventScannerBuilder;
378+
/// #
379+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
380+
/// # let ws_url = "ws://localhost:8545".parse()?;
381+
/// # let fallback_url = "ws://fallback:8545".parse()?;
382+
/// let scanner = EventScannerBuilder::historic()
383+
/// .fallback_ws(fallback_url)
384+
/// .connect_ws::<Ethereum>(ws_url)
385+
/// .await?;
386+
/// # Ok(())
387+
/// # }
388+
/// ```
389+
#[must_use]
390+
pub fn fallback_ws(mut self, url: Url) -> Self {
391+
self.block_range_scanner = self.block_range_scanner.fallback_ws(url);
392+
self
393+
}
394+
395+
/// Adds a fallback IPC path to the scanner.
396+
///
397+
/// The IPC connection will be established when calling the `connect` methods.
398+
/// Multiple fallback providers can be added by calling this method multiple times.
399+
///
400+
/// # Example
401+
///
402+
/// ```no_run
403+
/// # use alloy::network::Ethereum;
404+
/// # use event_scanner::EventScannerBuilder;
405+
/// #
406+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
407+
/// # let ws_url = "ws://localhost:8545".parse()?;
408+
/// let scanner = EventScannerBuilder::historic()
409+
/// .fallback_ipc("/tmp/fallback.ipc".to_string())
410+
/// .connect_ws::<Ethereum>(ws_url)
411+
/// .await?;
412+
/// # Ok(())
413+
/// # }
414+
/// ```
415+
#[must_use]
416+
pub fn fallback_ipc(mut self, path: String) -> Self {
417+
self.block_range_scanner = self.block_range_scanner.fallback_ipc(path);
418+
self
419+
}
420+
368421
/// Connects to the provider via WebSocket.
369422
///
370-
/// Final builder method: consumes the builder and returns the built [`HistoricEventScanner`].
423+
/// Final builder method: consumes the builder and returns the built [`EventScanner`].
371424
///
372425
/// # Errors
373426
///
@@ -379,7 +432,7 @@ impl<M> EventScannerBuilder<M> {
379432

380433
/// Connects to the provider via IPC.
381434
///
382-
/// Final builder method: consumes the builder and returns the built [`HistoricEventScanner`].
435+
/// Final builder method: consumes the builder and returns the built [`EventScanner`].
383436
///
384437
/// # Errors
385438
///
@@ -394,18 +447,31 @@ impl<M> EventScannerBuilder<M> {
394447

395448
/// Connects to an existing provider.
396449
///
397-
/// Final builder method: consumes the builder and returns the built [`HistoricEventScanner`].
450+
/// Final builder method: consumes the builder and returns the built [`EventScanner`].
398451
///
399452
/// # Errors
400453
///
401-
/// Returns an error if the connection fails
454+
/// Returns an error if any fallback connection fails
402455
pub async fn connect<N: Network>(
403456
self,
404457
provider: RootProvider<N>,
405458
) -> TransportResult<EventScanner<M, N>> {
406459
let block_range_scanner = self.block_range_scanner.connect::<N>(provider).await?;
407460
Ok(EventScanner { config: self.config, block_range_scanner, listeners: Vec::new() })
408461
}
462+
463+
/// Connects to an existing provider with fallback providers
464+
///
465+
/// Final builder method: consumes the builder and returns the built [`EventScanner`].
466+
pub fn connect_with_fallbacks<N: Network>(
467+
self,
468+
provider: RootProvider<N>,
469+
fallback_providers: Vec<RootProvider<N>>,
470+
) -> EventScanner<M, N> {
471+
let block_range_scanner =
472+
self.block_range_scanner.connect_with_fallbacks::<N>(provider, fallback_providers);
473+
EventScanner { config: self.config, block_range_scanner, listeners: Vec::new() }
474+
}
409475
}
410476

411477
impl<M, N: Network> EventScanner<M, N> {
@@ -485,4 +551,40 @@ mod tests {
485551

486552
Ok(())
487553
}
554+
555+
#[test]
556+
fn test_scanner_builder_fallback_methods() {
557+
let ws_url: Url = "ws://fallback:8545".parse().unwrap();
558+
let ipc_path = "/tmp/fallback.ipc".to_string();
559+
560+
let builder = EventScannerBuilder::historic()
561+
.fallback_ws(ws_url.clone())
562+
.fallback_ipc(ipc_path.clone());
563+
564+
assert_eq!(builder.block_range_scanner.fallback_ws_urls.len(), 1);
565+
assert_eq!(builder.block_range_scanner.fallback_ws_urls[0], ws_url);
566+
assert_eq!(builder.block_range_scanner.fallback_ipc_paths.len(), 1);
567+
assert_eq!(builder.block_range_scanner.fallback_ipc_paths[0], ipc_path);
568+
}
569+
570+
#[test]
571+
fn test_scanner_builder_multiple_fallbacks() {
572+
let ws_url1: Url = "ws://fallback1:8545".parse().unwrap();
573+
let ws_url2: Url = "ws://fallback2:8545".parse().unwrap();
574+
let ipc_path1 = "/tmp/fallback1.ipc".to_string();
575+
let ipc_path2 = "/tmp/fallback2.ipc".to_string();
576+
577+
let builder = EventScannerBuilder::live()
578+
.fallback_ws(ws_url1.clone())
579+
.fallback_ws(ws_url2.clone())
580+
.fallback_ipc(ipc_path1.clone())
581+
.fallback_ipc(ipc_path2.clone());
582+
583+
assert_eq!(builder.block_range_scanner.fallback_ws_urls.len(), 2);
584+
assert_eq!(builder.block_range_scanner.fallback_ws_urls[0], ws_url1);
585+
assert_eq!(builder.block_range_scanner.fallback_ws_urls[1], ws_url2);
586+
assert_eq!(builder.block_range_scanner.fallback_ipc_paths.len(), 2);
587+
assert_eq!(builder.block_range_scanner.fallback_ipc_paths[0], ipc_path1);
588+
assert_eq!(builder.block_range_scanner.fallback_ipc_paths[1], ipc_path2);
589+
}
488590
}

0 commit comments

Comments
 (0)