Skip to content

Commit 389cd77

Browse files
pepebndc0xNeshi
andauthored
Block range scanner return rangeinclusive instead of range (#55)
Co-authored-by: Nenad <[email protected]>
1 parent 958504a commit 389cd77

File tree

2 files changed

+32
-33
lines changed

2 files changed

+32
-33
lines changed

src/block_range_scanner.rs

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! ```rust,no_run
44
//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
5-
//! use std::ops::Range;
5+
//! use std::ops::RangeInclusive;
66
//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
77
//!
88
//! use alloy::transports::http::reqwest::Url;
@@ -28,13 +28,12 @@
2828
//! // Create client to send subscribe command to block scanner
2929
//! let client: BlockRangeScannerClient = block_range_scanner.run()?;
3030
//!
31-
//! let mut receiver: ReceiverStream<Result<Range<BlockNumber>, BlockRangeScannerError>> =
32-
//! client
33-
//! .subscribe(
34-
//! BlockNumberOrTag::Latest,
35-
//! None, // just subscribe to new blocks
36-
//! )
37-
//! .await?;
31+
//! let mut receiver = client
32+
//! .subscribe(
33+
//! BlockNumberOrTag::Latest,
34+
//! None, // just subscribe to new blocks
35+
//! )
36+
//! .await?;
3837
//!
3938
//! while let Some(result) = receiver.next().await {
4039
//! match result {
@@ -68,7 +67,7 @@
6867
//! }
6968
//! ```
7069
71-
use std::ops::Range;
70+
use std::ops::RangeInclusive;
7271

7372
use tokio::sync::{mpsc, oneshot};
7473
use tokio_stream::wrappers::ReceiverStream;
@@ -138,7 +137,7 @@ pub enum Error {
138137
#[derive(Debug)]
139138
pub enum Command {
140139
Subscribe {
141-
sender: mpsc::Sender<Result<Range<BlockNumber>, Error>>,
140+
sender: mpsc::Sender<Result<RangeInclusive<BlockNumber>, Error>>,
142141
start_height: BlockNumberOrTag,
143142
end_height: Option<BlockNumberOrTag>,
144143
response: oneshot::Sender<Result<(), Error>>,
@@ -297,7 +296,7 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
297296
struct Service<N: Network> {
298297
config: Config,
299298
provider: RootProvider<N>,
300-
subscriber: Option<mpsc::Sender<Result<Range<BlockNumber>, Error>>>,
299+
subscriber: Option<mpsc::Sender<Result<RangeInclusive<BlockNumber>, Error>>>,
301300
current: Option<BlockHashAndNumber>,
302301
websocket_connected: bool,
303302
processed_count: u64,
@@ -372,7 +371,7 @@ impl<N: Network> Service<N> {
372371

373372
async fn handle_subscribe(
374373
&mut self,
375-
sender: mpsc::Sender<Result<Range<BlockNumber>, Error>>,
374+
sender: mpsc::Sender<Result<RangeInclusive<BlockNumber>, Error>>,
376375
start_height: BlockNumberOrTag,
377376
end_height: Option<BlockNumberOrTag>,
378377
) -> Result<(), Error> {
@@ -425,7 +424,7 @@ impl<N: Network> Service<N> {
425424
let cutoff = sync_end_block.header().number();
426425
let ws_task = tokio::spawn(async move {
427426
if end_height.is_none() {
428-
Self::websocket_buffer_task(cutoff, provider, buffer_tx).await;
427+
Self::websocket_buffer_task(cutoff + 1, provider, buffer_tx).await;
429428
}
430429
});
431430

@@ -478,7 +477,7 @@ impl<N: Network> Service<N> {
478477
let batch_end_block =
479478
self.provider.get_block_by_number(batch_to.into()).await?.expect("should be valid");
480479

481-
self.send_to_subscriber(Ok(self.current.as_ref().unwrap().number..batch_to)).await;
480+
self.send_to_subscriber(Ok(self.current.as_ref().unwrap().number..=batch_to)).await;
482481

483482
self.current = Some(BlockHashAndNumber::from_header::<N>(batch_end_block.header()));
484483

@@ -536,7 +535,7 @@ impl<N: Network> Service<N> {
536535
async fn websocket_buffer_task<P: Provider<N>>(
537536
mut current: BlockNumber,
538537
provider: P,
539-
buffer_sender: mpsc::Sender<Range<BlockNumber>>,
538+
buffer_sender: mpsc::Sender<RangeInclusive<BlockNumber>>,
540539
) {
541540
match Self::get_block_subscription(&provider).await {
542541
Ok(mut ws_stream) => {
@@ -548,9 +547,8 @@ impl<N: Network> Service<N> {
548547
continue;
549548
}
550549

551-
// we add 1 to include the latest block
552-
#[allow(clippy::range_plus_one)]
553-
if let Err(e) = buffer_sender.send(current..header_resp.number() + 1).await {
550+
// RangeInclusive already includes the end block
551+
if let Err(e) = buffer_sender.send(current..=header_resp.number()).await {
554552
error!("Buffer channel closed, stopping buffer task: {e}");
555553

556554
return;
@@ -567,16 +565,16 @@ impl<N: Network> Service<N> {
567565
}
568566

569567
async fn process_buffered_messages(
570-
mut buffer_rx: mpsc::Receiver<Range<BlockNumber>>,
571-
sender: mpsc::Sender<Result<Range<BlockNumber>, Error>>,
568+
mut buffer_rx: mpsc::Receiver<RangeInclusive<BlockNumber>>,
569+
sender: mpsc::Sender<Result<RangeInclusive<BlockNumber>, Error>>,
572570
cutoff: BlockNumber,
573571
) {
574572
let mut processed = 0;
575573
let mut discarded = 0;
576574

577575
// Process all buffered messages
578576
while let Some(range) = buffer_rx.recv().await {
579-
let (start, end) = (range.start, range.end);
577+
let (start, end) = (*range.start(), *range.end());
580578
if start >= cutoff {
581579
if sender.send(Ok(range)).await.is_err() {
582580
warn!("Subscriber channel closed, cleaning up");
@@ -587,7 +585,7 @@ impl<N: Network> Service<N> {
587585
discarded += cutoff - start;
588586

589587
let start = cutoff;
590-
if sender.send(Ok(start..end)).await.is_err() {
588+
if sender.send(Ok(start..=end)).await.is_err() {
591589
warn!("Subscriber channel closed, cleaning up");
592590
return;
593591
}
@@ -609,7 +607,7 @@ impl<N: Network> Service<N> {
609607
Ok(ws_stream)
610608
}
611609

612-
async fn send_to_subscriber(&mut self, result: Result<Range<BlockNumber>, Error>) {
610+
async fn send_to_subscriber(&mut self, result: Result<RangeInclusive<BlockNumber>, Error>) {
613611
if let Some(ref sender) = self.subscriber {
614612
if sender.send(result).await.is_err() {
615613
self.subscriber = None;
@@ -658,7 +656,7 @@ impl BlockRangeScannerClient {
658656
/// # Arguments
659657
///
660658
/// * `start_height` - The block number to start from.
661-
/// * `end_height` - The block number to end at.
659+
/// * `end_height` - The block number to end at (inclusive).
662660
///
663661
/// # Errors
664662
///
@@ -667,7 +665,7 @@ impl BlockRangeScannerClient {
667665
&self,
668666
start_height: BlockNumberOrTag,
669667
end_height: Option<BlockNumberOrTag>,
670-
) -> Result<ReceiverStream<Result<Range<BlockNumber>, Error>>, Error> {
668+
) -> Result<ReceiverStream<Result<RangeInclusive<BlockNumber>, Error>>, Error> {
671669
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
672670
let (response_tx, response_rx) = oneshot::channel();
673671

@@ -764,14 +762,14 @@ mod tests {
764762
while let Some(result) = receiver.next().await {
765763
match result {
766764
Ok(range) => {
767-
println!("Received block range: {} - {}", range.start, range.end);
765+
println!("Received block range: [{range:?}]");
768766
if block_range_start == 0 {
769-
block_range_start = range.start;
767+
block_range_start = *range.start();
770768
}
771769

772-
assert_eq!(block_range_start, range.start);
773-
assert!(range.end >= range.start);
774-
block_range_start = range.end;
770+
assert_eq!(block_range_start, *range.start());
771+
assert!(*range.end() >= *range.start());
772+
block_range_start = *range.end() + 1;
775773
}
776774
Err(e) => {
777775
panic!("Received error from subscription: {e}");

src/event_scanner.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ impl<N: Network> EventScanner<N> {
181181
while let Some(range) = stream.next().await {
182182
match range {
183183
Ok(range) => {
184-
let from_block = range.start;
185-
let to_block = range.end;
184+
let from_block = *range.start();
185+
let to_block = *range.end();
186186
info!(from_block, to_block, "processing block range");
187187
self.process_block_range(from_block, to_block, &event_channels).await?;
188188
}
@@ -216,7 +216,8 @@ impl<N: Network> EventScanner<N> {
216216
});
217217
}
218218

219-
/// Fetches logs for the supplied block range and forwards them to the callback channels.
219+
/// Fetches logs for the supplied inclusive block range [`from_block..=to_block`] and forwards
220+
/// them to the callback channels.
220221
async fn process_block_range(
221222
&self,
222223
from_block: u64,

0 commit comments

Comments
 (0)