Skip to content

Commit 810461e

Browse files
committed
Merge branch 'main' into remove-scanner-error-enum
2 parents fe60984 + 5a19c9a commit 810461e

File tree

9 files changed

+437
-128
lines changed

9 files changed

+437
-128
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ Always handle all message types in your stream processing loop to ensure robust
252252

253253
- `examples/live_scanning` – minimal live-mode scanner using `EventScannerBuilder::live()`
254254
- `examples/historical_scanning` – demonstrates replaying historical data using `EventScannerBuilder::historic()`
255-
- `examples/sync_from_block_scanning` – demonstrates replaying from genesis (block 0) before continuing to stream the latest blocks using `EventScannerBuilder::sync().from_block(block_tag_or_number)`
255+
- `examples/sync_from_block_scanning` – demonstrates replaying from genesis (block 0) before continuing to stream the latest blocks using `EventScannerBuilder::sync().from_block(block_id)`
256256
- `examples/latest_events_scanning` – demonstrates scanning the latest events using `EventScannerBuilder::latest()`
257257
- `examples/sync_from_latest_scanning` – demonstrates scanning the latest events before switching to live mode using `EventScannerBuilder::sync().from_latest(count)`.
258258

src/block_range_scanner.rs

Lines changed: 56 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ use crate::{
7474

7575
use alloy::{
7676
consensus::BlockHeader,
77-
eips::BlockNumberOrTag,
77+
eips::{BlockId, BlockNumberOrTag},
7878
network::{BlockResponse, Network, primitives::HeaderResponse},
7979
primitives::{B256, BlockNumber},
8080
pubsub::Subscription,
@@ -182,21 +182,21 @@ pub enum Command {
182182
response: oneshot::Sender<Result<(), ScannerError>>,
183183
},
184184
StreamHistorical {
185-
sender: mpsc::Sender<Result<Message, ScannerError>>,
186-
start_height: BlockNumberOrTag,
187-
end_height: BlockNumberOrTag,
185+
sender: mpsc::Sender<Message>,
186+
start_id: BlockId,
187+
end_id: BlockId,
188188
response: oneshot::Sender<Result<(), ScannerError>>,
189189
},
190190
StreamFrom {
191-
sender: mpsc::Sender<Result<Message, ScannerError>>,
192-
start_height: BlockNumberOrTag,
191+
sender: mpsc::Sender<Message>,
192+
start_id: BlockId,
193193
block_confirmations: u64,
194194
response: oneshot::Sender<Result<(), ScannerError>>,
195195
},
196196
Rewind {
197-
sender: mpsc::Sender<Result<Message, ScannerError>>,
198-
start_height: BlockNumberOrTag,
199-
end_height: BlockNumberOrTag,
197+
sender: mpsc::Sender<Message>,
198+
start_id: BlockId,
199+
end_id: BlockId,
200200
response: oneshot::Sender<Result<(), ScannerError>>,
201201
},
202202
}
@@ -253,19 +253,19 @@ impl<N: Network> Service<N> {
253253
let result = self.handle_live(block_confirmations, sender).await;
254254
let _ = response.send(result);
255255
}
256-
Command::StreamHistorical { sender, start_height, end_height, response } => {
257-
info!(start_height = ?start_height, end_height = ?end_height, "Starting historical stream");
258-
let result = self.handle_historical(start_height, end_height, sender).await;
256+
Command::StreamHistorical { sender, start_id, end_id, response } => {
257+
info!(start_id = ?start_id, end_id = ?end_id, "Starting historical stream");
258+
let result = self.handle_historical(start_id, end_id, sender).await;
259259
let _ = response.send(result);
260260
}
261-
Command::StreamFrom { sender, start_height, block_confirmations, response } => {
262-
info!(start_height = ?start_height, "Starting streaming from");
263-
let result = self.handle_sync(start_height, block_confirmations, sender).await;
261+
Command::StreamFrom { sender, start_id, block_confirmations, response } => {
262+
info!(start_id = ?start_id, "Starting streaming from");
263+
let result = self.handle_sync(start_id, block_confirmations, sender).await;
264264
let _ = response.send(result);
265265
}
266-
Command::Rewind { sender, start_height, end_height, response } => {
267-
info!(start_height = ?start_height, end_height = ?end_height, "Starting rewind");
268-
let result = self.handle_rewind(start_height, end_height, sender).await;
266+
Command::Rewind { sender, start_id, end_id, response } => {
267+
info!(start_id = ?start_id, end_id = ?end_id, "Starting rewind");
268+
let result = self.handle_rewind(start_id, end_id, sender).await;
269269
let _ = response.send(result);
270270
}
271271
}
@@ -305,16 +305,14 @@ impl<N: Network> Service<N> {
305305

306306
async fn handle_historical(
307307
&mut self,
308-
start_height: BlockNumberOrTag,
309-
end_height: BlockNumberOrTag,
310-
sender: mpsc::Sender<Result<Message, ScannerError>>,
308+
start_id: BlockId,
309+
end_id: BlockId,
310+
sender: mpsc::Sender<Message>,
311311
) -> Result<(), ScannerError> {
312312
let max_block_range = self.max_block_range;
313313

314-
let (start_block, end_block) = tokio::try_join!(
315-
self.provider.get_block_by_number(start_height),
316-
self.provider.get_block_by_number(end_height)
317-
)?;
314+
let (start_block, end_block) =
315+
tokio::try_join!(self.provider.get_block(start_id), self.provider.get_block(end_id))?;
318316

319317
let start_block_num = start_block.header().number();
320318
let end_block_num = end_block.header().number();
@@ -341,17 +339,17 @@ impl<N: Network> Service<N> {
341339

342340
async fn handle_sync(
343341
&mut self,
344-
start_height: BlockNumberOrTag,
342+
start_id: BlockId,
345343
block_confirmations: u64,
346344
sender: mpsc::Sender<Result<Message, ScannerError>>,
347345
) -> Result<(), ScannerError> {
348346
let provider = self.provider.clone();
349347
let max_block_range = self.max_block_range;
350348

351349
let get_start_block = async || -> Result<BlockNumber, ScannerError> {
352-
let block = match start_height {
353-
BlockNumberOrTag::Number(num) => num,
354-
block_tag => provider.get_block_by_number(block_tag).await?.header().number(),
350+
let block = match start_id {
351+
BlockId::Number(BlockNumberOrTag::Number(num)) => num,
352+
_ => provider.get_block(start_id).await?.header().number(),
355353
};
356354
Ok(block)
357355
};
@@ -444,17 +442,15 @@ impl<N: Network> Service<N> {
444442

445443
async fn handle_rewind(
446444
&mut self,
447-
start_height: BlockNumberOrTag,
448-
end_height: BlockNumberOrTag,
449-
sender: mpsc::Sender<Result<Message, ScannerError>>,
445+
start_id: BlockId,
446+
end_id: BlockId,
447+
sender: mpsc::Sender<Message>,
450448
) -> Result<(), ScannerError> {
451449
let max_block_range = self.max_block_range;
452450
let provider = self.provider.clone();
453451

454-
let (start_block, end_block) = try_join!(
455-
self.provider.get_block_by_number(start_height),
456-
self.provider.get_block_by_number(end_height),
457-
)?;
452+
let (start_block, end_block) =
453+
try_join!(self.provider.get_block(start_id), self.provider.get_block(end_id),)?;
458454

459455
// normalize block range
460456
let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
@@ -742,28 +738,28 @@ impl BlockRangeScannerClient {
742738
Ok(ReceiverStream::new(blocks_receiver))
743739
}
744740

745-
/// Streams a batch of historical blocks from `start_height` to `end_height`.
741+
/// Streams a batch of historical blocks from `start_id` to `end_id`.
746742
///
747743
/// # Arguments
748744
///
749-
/// * `start_height` - The starting block number or tag.
750-
/// * `end_height` - The ending block number or tag.
745+
/// * `start_id` - The starting block id
746+
/// * `end_id` - The ending block id
751747
///
752748
/// # Errors
753749
///
754750
/// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
755751
pub async fn stream_historical(
756752
&self,
757-
start_height: impl Into<BlockNumberOrTag>,
758-
end_height: impl Into<BlockNumberOrTag>,
759-
) -> Result<ReceiverStream<Result<Message, ScannerError>>, ScannerError> {
753+
start_id: impl Into<BlockId>,
754+
end_id: impl Into<BlockId>,
755+
) -> Result<ReceiverStream<Message>, ScannerError> {
760756
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
761757
let (response_tx, response_rx) = oneshot::channel();
762758

763759
let command = Command::StreamHistorical {
764760
sender: blocks_sender,
765-
start_height: start_height.into(),
766-
end_height: end_height.into(),
761+
start_id: start_id.into(),
762+
end_id: end_id.into(),
767763
response: response_tx,
768764
};
769765

@@ -774,27 +770,27 @@ impl BlockRangeScannerClient {
774770
Ok(ReceiverStream::new(blocks_receiver))
775771
}
776772

777-
/// Streams blocks starting from `start_height` and transitions to live mode.
773+
/// Streams blocks starting from `start_id` and transitions to live mode.
778774
///
779775
/// # Arguments
780776
///
781-
/// * `start_height` - The starting block number or tag.
777+
/// * `start_id` - The starting block id.
782778
/// * `block_confirmations` - Number of confirmations to apply once in live mode.
783779
///
784780
/// # Errors
785781
///
786782
/// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
787783
pub async fn stream_from(
788784
&self,
789-
start_height: impl Into<BlockNumberOrTag>,
785+
start_id: impl Into<BlockId>,
790786
block_confirmations: u64,
791787
) -> Result<ReceiverStream<Result<Message, ScannerError>>, ScannerError> {
792788
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
793789
let (response_tx, response_rx) = oneshot::channel();
794790

795791
let command = Command::StreamFrom {
796792
sender: blocks_sender,
797-
start_height: start_height.into(),
793+
start_id: start_id.into(),
798794
block_confirmations,
799795
response: response_tx,
800796
};
@@ -806,28 +802,28 @@ impl BlockRangeScannerClient {
806802
Ok(ReceiverStream::new(blocks_receiver))
807803
}
808804

809-
/// Streams blocks in reverse order from `start_height` to `end_height`.
805+
/// Streams blocks in reverse order from `start_id` to `end_id`.
810806
///
811807
/// # Arguments
812808
///
813-
/// * `start_height` - The starting block number or tag (defaults to Latest if None).
814-
/// * `end_height` - The ending block number or tag (defaults to Earliest if None).
809+
/// * `start_id` - The starting block id (defaults to Latest if None).
810+
/// * `end_id` - The ending block id (defaults to Earliest if None).
815811
///
816812
/// # Errors
817813
///
818814
/// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
819815
pub async fn rewind(
820816
&self,
821-
start_height: impl Into<BlockNumberOrTag>,
822-
end_height: impl Into<BlockNumberOrTag>,
823-
) -> Result<ReceiverStream<Result<Message, ScannerError>>, ScannerError> {
817+
start_id: impl Into<BlockId>,
818+
end_id: impl Into<BlockId>,
819+
) -> Result<ReceiverStream<Message>, ScannerError> {
824820
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
825821
let (response_tx, response_rx) = oneshot::channel();
826822

827823
let command = Command::Rewind {
828824
sender: blocks_sender,
829-
start_height: start_height.into(),
830-
end_height: end_height.into(),
825+
start_id: start_id.into(),
826+
end_id: end_id.into(),
831827
response: response_tx,
832828
};
833829

@@ -843,7 +839,10 @@ impl BlockRangeScannerClient {
843839
mod tests {
844840
use super::*;
845841
use crate::{assert_closed, assert_next};
846-
use alloy::{eips::BlockId, network::Ethereum};
842+
use alloy::{
843+
eips::{BlockId, BlockNumberOrTag},
844+
network::Ethereum,
845+
};
847846
use tokio::sync::mpsc;
848847

849848
#[test]

0 commit comments

Comments
 (0)