Skip to content

Commit 5696afd

Browse files
authored
sync::from_latest and latest Modes Should Return Notification::NoPastLogsFound Instead Of Empty Vec (#201)
1 parent c1b8e35 commit 5696afd

File tree

8 files changed

+142
-22
lines changed

8 files changed

+142
-22
lines changed

src/event_scanner/scanner/common.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::ops::RangeInclusive;
22

33
use crate::{
4-
ScannerMessage,
4+
Notification, ScannerMessage,
55
block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES},
66
event_scanner::{EventScannerResult, filter::EventFilter, listener::EventListener},
77
robust_provider::{Error as RobustProviderError, RobustProvider},
@@ -120,10 +120,15 @@ pub fn spawn_log_consumers<N: Network>(
120120
}
121121

122122
if let ConsumerMode::CollectLatest { .. } = mode {
123-
if !collected.is_empty() {
124-
collected.reverse(); // restore chronological order
123+
if collected.is_empty() {
124+
info!("No logs found");
125+
_ = sender.try_stream(Notification::NoPastLogsFound).await;
126+
return;
125127
}
126128

129+
info!(count = collected.len(), "Logs found");
130+
collected.reverse(); // restore chronological order
131+
127132
info!("Sending collected logs to consumer");
128133
_ = sender.try_stream(collected).await;
129134
}

src/event_scanner/scanner/historic.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,17 @@ impl EventScannerBuilder<Historic> {
5151
}
5252
};
5353

54+
if from_num > latest_block {
55+
Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
56+
}
57+
5458
let to_num = match scanner.config.to_block {
5559
BlockId::Number(to_block) => to_block.as_number().unwrap_or(0),
5660
BlockId::Hash(to_hash) => {
5761
provider.get_block_by_hash(to_hash.into()).await?.header().number()
5862
}
5963
};
6064

61-
if from_num > latest_block {
62-
Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
63-
}
64-
6565
if to_num > latest_block {
6666
Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?;
6767
}

src/event_scanner/scanner/latest.rs

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use alloy::{eips::BlockId, network::Network};
1+
use alloy::{
2+
consensus::BlockHeader,
3+
eips::BlockId,
4+
network::{BlockResponse, Network},
5+
};
26

37
use super::common::{ConsumerMode, handle_stream};
48
use crate::{
@@ -45,13 +49,28 @@ impl EventScannerBuilder<LatestEvents> {
4549
let scanner = self.build(provider).await?;
4650

4751
let provider = scanner.block_range_scanner.provider();
52+
let latest_block = provider.get_block_number().await?;
53+
54+
let from_num = match scanner.config.from_block {
55+
BlockId::Number(from_block) => from_block.as_number().unwrap_or(0),
56+
BlockId::Hash(from_hash) => {
57+
provider.get_block_by_hash(from_hash.into()).await?.header().number()
58+
}
59+
};
4860

49-
if let BlockId::Hash(from_hash) = scanner.config.from_block {
50-
provider.get_block_by_hash(from_hash.into()).await?;
61+
if from_num > latest_block {
62+
Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
5163
}
5264

53-
if let BlockId::Hash(to_hash) = scanner.config.to_block {
54-
provider.get_block_by_hash(to_hash.into()).await?;
65+
let to_num = match scanner.config.to_block {
66+
BlockId::Number(to_block) => to_block.as_number().unwrap_or(0),
67+
BlockId::Hash(to_hash) => {
68+
provider.get_block_by_hash(to_hash.into()).await?.header().number()
69+
}
70+
};
71+
72+
if to_num > latest_block {
73+
Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?;
5574
}
5675

5776
Ok(scanner)
@@ -281,4 +300,70 @@ mod tests {
281300

282301
assert!(result.is_ok());
283302
}
303+
304+
#[tokio::test]
305+
async fn test_from_block_above_latest_returns_error() {
306+
let anvil = Anvil::new().try_spawn().unwrap();
307+
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
308+
309+
let latest_block = provider.get_block_number().await.unwrap();
310+
311+
let result = EventScannerBuilder::latest(1)
312+
.from_block(latest_block + 100)
313+
.to_block(latest_block)
314+
.connect(provider)
315+
.await;
316+
317+
match result {
318+
Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
319+
assert_eq!(max, latest_block + 100);
320+
assert_eq!(latest, latest_block);
321+
}
322+
_ => panic!("Expected BlockExceedsLatest error"),
323+
}
324+
}
325+
326+
#[tokio::test]
327+
async fn test_to_block_above_latest_returns_error() {
328+
let anvil = Anvil::new().try_spawn().unwrap();
329+
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
330+
331+
let latest_block = provider.get_block_number().await.unwrap();
332+
333+
let result = EventScannerBuilder::latest(1)
334+
.from_block(0)
335+
.to_block(latest_block + 100)
336+
.connect(provider)
337+
.await;
338+
339+
match result {
340+
Err(ScannerError::BlockExceedsLatest("to_block", max, latest)) => {
341+
assert_eq!(max, latest_block + 100);
342+
assert_eq!(latest, latest_block);
343+
}
344+
_ => panic!("Expected BlockExceedsLatest error"),
345+
}
346+
}
347+
348+
#[tokio::test]
349+
async fn test_to_and_from_block_above_latest_returns_error() {
350+
let anvil = Anvil::new().try_spawn().unwrap();
351+
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
352+
353+
let latest_block = provider.get_block_number().await.unwrap();
354+
355+
let result = EventScannerBuilder::latest(1)
356+
.from_block(latest_block + 50)
357+
.to_block(latest_block + 100)
358+
.connect(provider)
359+
.await;
360+
361+
match result {
362+
Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
363+
assert_eq!(max, latest_block + 50);
364+
assert_eq!(latest, latest_block);
365+
}
366+
_ => panic!("Expected BlockExceedsLatest error for 'from_block'"),
367+
}
368+
}
284369
}

src/event_scanner/scanner/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,13 @@ impl EventScannerBuilder<Unspecified> {
299299
/// - **Default range**: By default, scans from `Earliest` to `Latest` block
300300
/// - **Reorg handling**: Periodically checks the tip to detect reorgs during the scan
301301
///
302+
/// # Notifications
303+
///
304+
/// The scanner emits the following notification before delivering log data:
305+
///
306+
/// - **[`Notification::NoPastLogsFound`][no_logs]**: Emitted when no matching logs are found in
307+
/// the scanned range.
308+
///
302309
/// # Arguments
303310
///
304311
/// * `count` - Maximum number of recent events to collect per listener (must be greater than 0)
@@ -323,6 +330,7 @@ impl EventScannerBuilder<Unspecified> {
323330
/// [start]: EventScanner::start
324331
/// [sync_from_latest]: EventScannerBuilder::from_latest
325332
/// [reorg]: crate::Notification::ReorgDetected
333+
/// [no_logs]: crate::Notification::NoPastLogsFound
326334
#[must_use]
327335
pub fn latest(count: usize) -> EventScannerBuilder<LatestEvents> {
328336
EventScannerBuilder::<LatestEvents>::new(count)

src/event_scanner/scanner/sync/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ impl EventScannerBuilder<Synchronize> {
7979
/// block confirmations
8080
/// - **Continuous operation**: Live phase continues indefinitely until the scanner is dropped
8181
///
82+
/// # Notifications
83+
///
84+
/// During the **latest events phase**, the scanner can emit the following notification
85+
/// before transitioning to live mode:
86+
///
87+
/// - **[`Notification::NoPastLogsFound`][no_logs]**: Emitted when no matching logs are found in
88+
/// the scanned range
89+
///
90+
/// After the latest events phase completes, [`Notification::SwitchingToLive`][switch_to_live]
91+
/// is emitted before transitioning to the live streaming phase.
92+
///
8293
/// # Arguments
8394
///
8495
/// * `count` - Maximum number of recent events to collect per listener before switching to live
@@ -102,6 +113,7 @@ impl EventScannerBuilder<Synchronize> {
102113
/// [start]: crate::event_scanner::EventScanner::start
103114
/// [reorg]: crate::types::Notification::ReorgDetected
104115
/// [switch_to_live]: crate::types::Notification::SwitchingToLive
116+
/// [no_logs]: crate::types::Notification::NoPastLogsFound
105117
#[must_use]
106118
pub fn from_latest(self, count: usize) -> EventScannerBuilder<SyncFromLatestEvents> {
107119
EventScannerBuilder::<SyncFromLatestEvents>::new(count)

src/types.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,31 @@ use tracing::{info, warn};
55

66
use crate::ScannerError;
77

8-
#[derive(Debug, Clone)]
8+
/// Messages streamed by the scanner to subscribers.
9+
///
10+
/// Each message represents either data or a notification about the scanner's state or behavior.
11+
#[derive(Copy, Debug, Clone)]
912
pub enum ScannerMessage<T: Clone> {
13+
/// Data streamed to the subscriber.
1014
Data(T),
15+
16+
/// Notification about scanner state changes or important events.
1117
Notification(Notification),
1218
}
1319

20+
/// Notifications emitted by the scanner to signal state changes or important events.
1421
#[derive(Copy, Debug, Clone, PartialEq)]
1522
pub enum Notification {
23+
/// Emitted when transitioning from the latest events phase to live streaming mode
24+
/// in sync scanners.
1625
SwitchingToLive,
26+
27+
/// Emitted when a blockchain reorganization is detected during scanning.
1728
ReorgDetected,
29+
30+
/// Emitted during the latest events phase when no matching logs are found in the
31+
/// scanned range.
32+
NoPastLogsFound,
1833
}
1934

2035
impl<T: Clone> From<Notification> for ScannerMessage<T> {

tests/latest_events/basic.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ use alloy::{
33
};
44

55
use crate::common::{TestCounter, deploy_counter, setup_common, setup_latest_scanner};
6-
use event_scanner::{EventFilter, EventScannerBuilder, assert_closed, assert_next};
6+
use event_scanner::{EventFilter, EventScannerBuilder, Notification, assert_closed, assert_next};
77

88
#[tokio::test]
99
async fn exact_count_returns_last_events_in_order() -> anyhow::Result<()> {
10-
let count = 5;
11-
let setup = setup_latest_scanner(None, None, count, None, None).await?;
10+
let setup = setup_latest_scanner(None, None, 5, None, None).await?;
1211
let contract = setup.contract;
1312
let scanner = setup.scanner;
1413
let mut stream = setup.stream;
@@ -17,7 +16,6 @@ async fn exact_count_returns_last_events_in_order() -> anyhow::Result<()> {
1716
contract.increase().send().await?.watch().await?;
1817
}
1918

20-
// Ask for the latest 5
2119
scanner.start().await?;
2220

2321
assert_next!(
@@ -72,9 +70,7 @@ async fn no_events_returns_empty() -> anyhow::Result<()> {
7270

7371
scanner.start().await?;
7472

75-
let expected: &[TestCounter::CountIncreased] = &[];
76-
77-
assert_next!(stream, expected);
73+
assert_next!(stream, Notification::NoPastLogsFound);
7874
assert_closed!(stream);
7975

8076
Ok(())

tests/sync/from_latest.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,7 @@ async fn no_historical_only_live_streams() -> anyhow::Result<()> {
132132
scanner.start().await?;
133133

134134
// Latest is empty
135-
let expected: &[TestCounter::CountIncreased] = &[];
136-
assert_next!(stream, expected);
135+
assert_next!(stream, Notification::NoPastLogsFound);
137136
assert_next!(stream, Notification::SwitchingToLive);
138137
let mut stream = assert_empty!(stream);
139138

0 commit comments

Comments
 (0)