Skip to content

Commit 25e6e06

Browse files
authored
Scanner notififcation update (#192)
1 parent 5682aa8 commit 25e6e06

File tree

18 files changed

+77
-73
lines changed

18 files changed

+77
-73
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ async fn run_scanner(
9494
Message::Data(logs) => {
9595
println!("Received {} logs: {logs:?}", logs.len());
9696
}
97-
Message::Status(status) => {
98-
println!("Status update: {status:?}");
97+
Message::Notification(notification) => {
98+
println!("Notification received: {notification:?}");
9999
}
100100
Message::Error(err) => {
101101
eprintln!("Error: {err}");
@@ -226,7 +226,7 @@ let multi_sigs = EventFilter::new()
226226
The scanner delivers three types of messages through the event stream:
227227

228228
- **`Message::Data(Vec<Log>)`** – Contains a batch of matching event logs. Each log includes the raw event data, transaction hash, block number, and other metadata.
229-
- **`Message::Status(ScannerStatus)`**Status notifications from the scanner:
229+
- **`Message::Notification(Notification)`**Notifications from the scanner:
230230
- **`Message::Error(ScannerError)`** – Error notifications if the scanner encounters issues (e.g., RPC failures, connection problems)
231231

232232
Always handle all message types in your stream processing loop to ensure robust error handling and proper reorg detection.

examples/historical_scanning/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async fn main() -> anyhow::Result<()> {
7777
Message::Error(e) => {
7878
error!("Received error: {}", e);
7979
}
80-
Message::Status(info) => {
80+
Message::Notification(info) => {
8181
info!("Received info: {:?}", info);
8282
}
8383
}

examples/latest_events_scanning/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ async fn main() -> anyhow::Result<()> {
7878
Message::Error(e) => {
7979
error!("Received error: {}", e);
8080
}
81-
Message::Status(info) => {
82-
info!("Received status: {:?}", info);
81+
Message::Notification(info) => {
82+
info!("Received notification: {:?}", info);
8383
}
8484
}
8585
}

examples/live_scanning/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async fn main() -> anyhow::Result<()> {
7777
Message::Error(e) => {
7878
error!("Received error: {}", e);
7979
}
80-
Message::Status(info) => {
80+
Message::Notification(info) => {
8181
info!("Received info: {:?}", info);
8282
}
8383
}

examples/sync_from_block_scanning/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ async fn main() -> anyhow::Result<()> {
101101
Message::Error(e) => {
102102
error!("Received error: {}", e);
103103
}
104-
Message::Status(info) => {
105-
info!("Received status: {:?}", info);
104+
Message::Notification(info) => {
105+
info!("Received notification: {:?}", info);
106106
}
107107
}
108108

examples/sync_from_latest_scanning/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async fn main() -> anyhow::Result<()> {
8585
Message::Error(e) => {
8686
error!("Received error: {}", e);
8787
}
88-
Message::Status(info) => {
88+
Message::Notification(info) => {
8989
info!("Received info: {:?}", info);
9090
}
9191
}

src/block_range_scanner.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
//! }
4848
//! }
4949
//! }
50-
//! Message::Status(status) => {
51-
//! info!("Received status message: {:?}", status);
50+
//! Message::Notification(notification) => {
51+
//! info!("Received notification: {:?}", notification);
5252
//! }
5353
//! }
5454
//! }
@@ -70,7 +70,7 @@ use crate::{
7070
ScannerMessage,
7171
error::ScannerError,
7272
robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider},
73-
types::{ScannerStatus, TryStream},
73+
types::{Notification, TryStream},
7474
};
7575
use alloy::{
7676
consensus::BlockHeader,
@@ -436,7 +436,7 @@ impl<N: Network> Service<N> {
436436

437437
info!("Chain tip reached, switching to live");
438438

439-
if !sender.try_stream(ScannerStatus::SwitchingToLive).await {
439+
if !sender.try_stream(Notification::SwitchingToLive).await {
440440
return;
441441
}
442442

@@ -541,7 +541,7 @@ impl<N: Network> Service<N> {
541541
if reorged {
542542
info!(block_number = %from, hash = %tip_hash, "Reorg detected");
543543

544-
if !sender.try_stream(ScannerStatus::ReorgDetected).await {
544+
if !sender.try_stream(Notification::ReorgDetected).await {
545545
break;
546546
}
547547

@@ -624,7 +624,7 @@ impl<N: Network> Service<N> {
624624

625625
if incoming_block_num < range_start {
626626
warn!("Reorg detected: sending forked range");
627-
if !sender.try_stream(ScannerStatus::ReorgDetected).await {
627+
if !sender.try_stream(Notification::ReorgDetected).await {
628628
return;
629629
}
630630

@@ -684,7 +684,7 @@ impl<N: Network> Service<N> {
684684
}
685685
}
686686
other => {
687-
// Could be error or status
687+
// Could be error or notification
688688
if !sender.try_stream(other).await {
689689
break;
690690
}

src/event_scanner/scanner/common.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,9 @@ pub fn spawn_log_consumers<N: Network>(
133133
break;
134134
}
135135
}
136-
Ok(BlockRangeMessage::Status(status)) => {
137-
info!(status = ?status, "Received status message");
138-
if !sender.try_stream(status).await {
136+
Ok(BlockRangeMessage::Notification(notification)) => {
137+
info!(notification = ?notification, "Received notification");
138+
if !sender.try_stream(notification).await {
139139
break;
140140
}
141141
}

src/event_scanner/scanner/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ impl EventScannerBuilder<Unspecified> {
167167
/// Message::Data(logs) => {
168168
/// println!("Received {} new events", logs.len());
169169
/// }
170-
/// Message::Status(status) => {
171-
/// println!("Status: {:?}", status);
170+
/// Message::Notification(notification) => {
171+
/// println!("Notification received: {:?}", notification);
172172
/// }
173173
/// Message::Error(e) => {
174174
/// eprintln!("Error: {}", e);
@@ -196,12 +196,12 @@ impl EventScannerBuilder<Unspecified> {
196196
/// # Reorg behavior
197197
///
198198
/// When a reorg is detected:
199-
/// 1. Emits [`ScannerStatus::ReorgDetected`][reorg] to all listeners
199+
/// 1. Emits [`Notification::ReorgDetected`][reorg] to all listeners
200200
/// 2. Adjusts the next confirmed range using `block_confirmations`
201201
/// 3. Re-emits events from the corrected confirmed block range
202202
/// 4. Continues streaming from the new chain state
203203
///
204-
/// [reorg]: crate::types::ScannerStatus::ReorgDetected
204+
/// [reorg]: crate::types::Notification::ReorgDetected
205205
#[must_use]
206206
pub fn live() -> EventScannerBuilder<Live> {
207207
EventScannerBuilder::default()
@@ -304,7 +304,7 @@ impl EventScannerBuilder<Unspecified> {
304304
///
305305
/// During the scan, the scanner periodically checks the tip to detect reorgs. On reorg
306306
/// detection:
307-
/// 1. Emits [`ScannerStatus::ReorgDetected`][reorg] to all listeners
307+
/// 1. Emits [`Notification::ReorgDetected`][reorg] to all listeners
308308
/// 2. Resets to the updated tip
309309
/// 3. Restarts the scan from the new tip
310310
/// 4. Continues until `count` events are collected
@@ -319,7 +319,7 @@ impl EventScannerBuilder<Unspecified> {
319319
/// [subscribe]: EventScanner::subscribe
320320
/// [start]: EventScanner::start
321321
/// [sync_from_latest]: EventScannerBuilder::from_latest
322-
/// [reorg]: crate::ScannerStatus::ReorgDetected
322+
/// [reorg]: crate::Notification::ReorgDetected
323323
#[must_use]
324324
pub fn latest(count: usize) -> EventScannerBuilder<LatestEvents> {
325325
EventScannerBuilder::<LatestEvents>::new(count)

src/event_scanner/scanner/sync/from_latest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream};
99
use tracing::info;
1010

1111
use crate::{
12-
EventScannerBuilder, ScannerError, ScannerStatus,
12+
EventScannerBuilder, Notification, ScannerError,
1313
block_range_scanner::Message as BlockRangeMessage,
1414
event_scanner::{
1515
EventScanner,
@@ -106,7 +106,7 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
106106
// Use a one-off channel for the notification.
107107
let (tx, rx) = mpsc::channel::<BlockRangeMessage>(1);
108108
let stream = ReceiverStream::new(rx);
109-
tx.send(BlockRangeMessage::Status(ScannerStatus::SwitchingToLive))
109+
tx.send(BlockRangeMessage::Notification(Notification::SwitchingToLive))
110110
.await
111111
.expect("receiver exists");
112112

0 commit comments

Comments
 (0)