Skip to content

Commit 8ba62fd

Browse files
committed
merge main
2 parents 0959941 + a520889 commit 8ba62fd

File tree

18 files changed

+264
-80
lines changed

18 files changed

+264
-80
lines changed

.cargo/config.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[alias]
2+
nt = "nextest run --all-features"

src/block_range_scanner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ impl<N: Network> Service<N> {
384384
let provider = self.provider.clone();
385385

386386
let (start_block, end_block) =
387-
try_join!(self.provider.get_block(start_id), self.provider.get_block(end_id),)?;
387+
try_join!(self.provider.get_block(start_id), self.provider.get_block(end_id))?;
388388

389389
// normalize block range
390390
let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {

src/block_range_scanner/sync_handler.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
1-
use alloy::{
2-
consensus::BlockHeader,
3-
eips::{BlockId, BlockNumberOrTag},
4-
network::{BlockResponse, Network},
5-
primitives::BlockNumber,
6-
};
1+
use alloy::{eips::BlockId, network::Network, primitives::BlockNumber};
72
use tokio::sync::mpsc;
83
use tracing::{error, info};
94

105
use crate::{
116
Notification, ScannerError,
127
block_range_scanner::{BlockScannerResult, common, reorg_handler::ReorgHandler},
13-
robust_provider::{self, RobustProvider},
8+
robust_provider::RobustProvider,
149
types::TryStream,
1510
};
1611

@@ -69,16 +64,8 @@ impl<N: Network> SyncHandler<N> {
6964

7065
/// Determines whether we need to catch up or can start live immediately
7166
async fn determine_sync_state(&self) -> Result<SyncState, ScannerError> {
72-
let get_start_block = async || -> Result<BlockNumber, robust_provider::Error> {
73-
let block = match self.start_id {
74-
BlockId::Number(BlockNumberOrTag::Number(num)) => num,
75-
_ => self.provider.get_block(self.start_id).await?.header().number(),
76-
};
77-
Ok(block)
78-
};
79-
8067
let (start_block, confirmed_tip) = tokio::try_join!(
81-
get_start_block(),
68+
self.provider.get_block_number_by_id(self.start_id),
8269
self.provider.get_latest_confirmed(self.block_confirmations)
8370
)?;
8471

src/event_scanner/scanner/common.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::ops::RangeInclusive;
22

33
use crate::{
4-
ScannerMessage,
4+
Notification, ScannerMessage,
55
block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES},
66
event_scanner::{EventScannerResult, filter::EventFilter, listener::EventListener},
77
robust_provider::{Error as RobustProviderError, RobustProvider},
@@ -120,10 +120,15 @@ pub fn spawn_log_consumers<N: Network>(
120120
}
121121

122122
if let ConsumerMode::CollectLatest { .. } = mode {
123-
if !collected.is_empty() {
124-
collected.reverse(); // restore chronological order
123+
if collected.is_empty() {
124+
info!("No logs found");
125+
_ = sender.try_stream(Notification::NoPastLogsFound).await;
126+
return;
125127
}
126128

129+
info!(count = collected.len(), "Logs found");
130+
collected.reverse(); // restore chronological order
131+
127132
info!("Sending collected logs to consumer");
128133
_ = sender.try_stream(collected).await;
129134
}

src/event_scanner/scanner/historic.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,17 @@ impl EventScannerBuilder<Historic> {
5151
}
5252
};
5353

54+
if from_num > latest_block {
55+
Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
56+
}
57+
5458
let to_num = match scanner.config.to_block {
5559
BlockId::Number(to_block) => to_block.as_number().unwrap_or(0),
5660
BlockId::Hash(to_hash) => {
5761
provider.get_block_by_hash(to_hash.into()).await?.header().number()
5862
}
5963
};
6064

61-
if from_num > latest_block {
62-
Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
63-
}
64-
6565
if to_num > latest_block {
6666
Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?;
6767
}

src/event_scanner/scanner/latest.rs

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use alloy::{eips::BlockId, network::Network};
1+
use alloy::{
2+
consensus::BlockHeader,
3+
eips::BlockId,
4+
network::{BlockResponse, Network},
5+
};
26

37
use super::common::{ConsumerMode, handle_stream};
48
use crate::{
@@ -45,13 +49,28 @@ impl EventScannerBuilder<LatestEvents> {
4549
let scanner = self.build(provider).await?;
4650

4751
let provider = scanner.block_range_scanner.provider();
52+
let latest_block = provider.get_block_number().await?;
53+
54+
let from_num = match scanner.config.from_block {
55+
BlockId::Number(from_block) => from_block.as_number().unwrap_or(0),
56+
BlockId::Hash(from_hash) => {
57+
provider.get_block_by_hash(from_hash.into()).await?.header().number()
58+
}
59+
};
4860

49-
if let BlockId::Hash(from_hash) = scanner.config.from_block {
50-
provider.get_block_by_hash(from_hash.into()).await?;
61+
if from_num > latest_block {
62+
Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
5163
}
5264

53-
if let BlockId::Hash(to_hash) = scanner.config.to_block {
54-
provider.get_block_by_hash(to_hash.into()).await?;
65+
let to_num = match scanner.config.to_block {
66+
BlockId::Number(to_block) => to_block.as_number().unwrap_or(0),
67+
BlockId::Hash(to_hash) => {
68+
provider.get_block_by_hash(to_hash.into()).await?.header().number()
69+
}
70+
};
71+
72+
if to_num > latest_block {
73+
Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?;
5574
}
5675

5776
Ok(scanner)
@@ -281,4 +300,70 @@ mod tests {
281300

282301
assert!(result.is_ok());
283302
}
303+
304+
#[tokio::test]
305+
async fn test_from_block_above_latest_returns_error() {
306+
let anvil = Anvil::new().try_spawn().unwrap();
307+
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
308+
309+
let latest_block = provider.get_block_number().await.unwrap();
310+
311+
let result = EventScannerBuilder::latest(1)
312+
.from_block(latest_block + 100)
313+
.to_block(latest_block)
314+
.connect(provider)
315+
.await;
316+
317+
match result {
318+
Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
319+
assert_eq!(max, latest_block + 100);
320+
assert_eq!(latest, latest_block);
321+
}
322+
_ => panic!("Expected BlockExceedsLatest error"),
323+
}
324+
}
325+
326+
#[tokio::test]
327+
async fn test_to_block_above_latest_returns_error() {
328+
let anvil = Anvil::new().try_spawn().unwrap();
329+
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
330+
331+
let latest_block = provider.get_block_number().await.unwrap();
332+
333+
let result = EventScannerBuilder::latest(1)
334+
.from_block(0)
335+
.to_block(latest_block + 100)
336+
.connect(provider)
337+
.await;
338+
339+
match result {
340+
Err(ScannerError::BlockExceedsLatest("to_block", max, latest)) => {
341+
assert_eq!(max, latest_block + 100);
342+
assert_eq!(latest, latest_block);
343+
}
344+
_ => panic!("Expected BlockExceedsLatest error"),
345+
}
346+
}
347+
348+
#[tokio::test]
349+
async fn test_to_and_from_block_above_latest_returns_error() {
350+
let anvil = Anvil::new().try_spawn().unwrap();
351+
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
352+
353+
let latest_block = provider.get_block_number().await.unwrap();
354+
355+
let result = EventScannerBuilder::latest(1)
356+
.from_block(latest_block + 50)
357+
.to_block(latest_block + 100)
358+
.connect(provider)
359+
.await;
360+
361+
match result {
362+
Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
363+
assert_eq!(max, latest_block + 50);
364+
assert_eq!(latest, latest_block);
365+
}
366+
_ => panic!("Expected BlockExceedsLatest error for 'from_block'"),
367+
}
368+
}
284369
}

src/event_scanner/scanner/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,13 @@ impl EventScannerBuilder<Unspecified> {
299299
/// - **Default range**: By default, scans from `Earliest` to `Latest` block
300300
/// - **Reorg handling**: Periodically checks the tip to detect reorgs during the scan
301301
///
302+
/// # Notifications
303+
///
304+
/// The scanner emits the following notification before delivering log data:
305+
///
306+
/// - **[`Notification::NoPastLogsFound`][no_logs]**: Emitted when no matching logs are found in
307+
/// the scanned range.
308+
///
302309
/// # Arguments
303310
///
304311
/// * `count` - Maximum number of recent events to collect per listener (must be greater than 0)
@@ -323,6 +330,7 @@ impl EventScannerBuilder<Unspecified> {
323330
/// [start]: EventScanner::start
324331
/// [sync_from_latest]: EventScannerBuilder::from_latest
325332
/// [reorg]: crate::Notification::ReorgDetected
333+
/// [no_logs]: crate::Notification::NoPastLogsFound
326334
#[must_use]
327335
pub fn latest(count: usize) -> EventScannerBuilder<LatestEvents> {
328336
EventScannerBuilder::<LatestEvents>::new(count)

src/event_scanner/scanner/sync/from_latest.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use alloy::{
2-
consensus::BlockHeader,
3-
eips::BlockNumberOrTag,
4-
network::{BlockResponse, Network},
5-
};
1+
use alloy::{eips::BlockNumberOrTag, network::Network};
62

73
use tracing::{error, info};
84

@@ -73,8 +69,7 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
7369
// This is used to determine the starting point for the rewind stream and the live
7470
// stream. We do this before starting the streams to avoid a race condition
7571
// where the latest block changes while we're setting up the streams.
76-
let latest_block =
77-
provider.get_block_by_number(BlockNumberOrTag::Latest).await?.header().number();
72+
let latest_block = provider.get_block_number().await?;
7873

7974
// Setup rewind and live streams to run in parallel.
8075
let rewind_stream = client.rewind(BlockNumberOrTag::Earliest, latest_block).await?;

src/event_scanner/scanner/sync/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ impl EventScannerBuilder<Synchronize> {
7979
/// block confirmations
8080
/// - **Continuous operation**: Live phase continues indefinitely until the scanner is dropped
8181
///
82+
/// # Notifications
83+
///
84+
/// During the **latest events phase**, the scanner can emit the following notification
85+
/// before transitioning to live mode:
86+
///
87+
/// - **[`Notification::NoPastLogsFound`][no_logs]**: Emitted when no matching logs are found in
88+
/// the scanned range
89+
///
90+
/// After the latest events phase completes, [`Notification::SwitchingToLive`][switch_to_live]
91+
/// is emitted before transitioning to the live streaming phase.
92+
///
8293
/// # Arguments
8394
///
8495
/// * `count` - Maximum number of recent events to collect per listener before switching to live
@@ -102,6 +113,7 @@ impl EventScannerBuilder<Synchronize> {
102113
/// [start]: crate::event_scanner::EventScanner::start
103114
/// [reorg]: crate::types::Notification::ReorgDetected
104115
/// [switch_to_live]: crate::types::Notification::SwitchingToLive
116+
/// [no_logs]: crate::types::Notification::NoPastLogsFound
105117
#[must_use]
106118
pub fn from_latest(self, count: usize) -> EventScannerBuilder<SyncFromLatestEvents> {
107119
EventScannerBuilder::<SyncFromLatestEvents>::new(count)

src/robust_provider/provider.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ impl<N: Network> RobustProvider<N> {
4242

4343
/// Fetch a block by [`BlockNumberOrTag`] with retry and timeout.
4444
///
45+
/// This is a wrapper function for [`Provider::get_block_by_number`].
46+
///
4547
/// # Errors
4648
///
4749
/// See [retry errors](#retry-errors).
@@ -65,6 +67,8 @@ impl<N: Network> RobustProvider<N> {
6567

6668
/// Fetch a block number by [`BlockId`] with retry and timeout.
6769
///
70+
/// This is a wrapper function for [`Provider::get_block`].
71+
///
6872
/// # Errors
6973
///
7074
/// See [retry errors](#retry-errors).
@@ -81,6 +85,8 @@ impl<N: Network> RobustProvider<N> {
8185

8286
/// Fetch the latest block number with retry and timeout.
8387
///
88+
/// This is a wrapper function for [`Provider::get_block_number`].
89+
///
8490
/// # Errors
8591
///
8692
/// See [retry errors](#retry-errors).
@@ -98,6 +104,31 @@ impl<N: Network> RobustProvider<N> {
98104
result
99105
}
100106

107+
/// Get the block number for a given block identifier.
108+
///
109+
/// This is a wrapper function for [`Provider::get_block_number_by_id`].
110+
///
111+
/// # Arguments
112+
///
113+
/// * `block_id` - The block identifier to fetch the block number for.
114+
///
115+
/// # Errors
116+
///
117+
/// See [retry errors](#retry-errors).
118+
pub async fn get_block_number_by_id(&self, block_id: BlockId) -> Result<u64, Error> {
119+
info!("get_block_number_by_id called");
120+
let result = self
121+
.retry_with_total_timeout(
122+
move |provider| async move { provider.get_block_number_by_id(block_id).await },
123+
false,
124+
)
125+
.await;
126+
if let Err(e) = &result {
127+
error!(error = %e, "get_block_number_by_id failed");
128+
}
129+
result?.ok_or_else(|| Error::BlockNotFound(block_id))
130+
}
131+
101132
/// Fetch the latest confirmed block number with retry and timeout.
102133
///
103134
/// This method fetches the latest block number and subtracts the specified
@@ -120,6 +151,8 @@ impl<N: Network> RobustProvider<N> {
120151

121152
/// Fetch a block by [`BlockHash`] with retry and timeout.
122153
///
154+
/// This is a wrapper function for [`Provider::get_block_by_hash`].
155+
///
123156
/// # Errors
124157
///
125158
/// See [retry errors](#retry-errors).
@@ -140,6 +173,8 @@ impl<N: Network> RobustProvider<N> {
140173

141174
/// Fetch logs for the given [`Filter`] with retry and timeout.
142175
///
176+
/// This is a wrapper function for [`Provider::get_logs`].
177+
///
143178
/// # Errors
144179
///
145180
/// See [retry errors](#retry-errors).
@@ -159,6 +194,8 @@ impl<N: Network> RobustProvider<N> {
159194

160195
/// Subscribe to new block headers with retry and timeout.
161196
///
197+
/// This is a wrapper function for [`Provider::subscribe_blocks`].
198+
///
162199
/// # Errors
163200
///
164201
/// see [retry errors](#retry-errors).

0 commit comments

Comments
 (0)