Skip to content

Commit e2239c9

Browse files
committed
Merge branch 'main' into historical-reorgs
2 parents df24123 + 553f485 commit e2239c9

File tree

5 files changed

+148
-40
lines changed

5 files changed

+148
-40
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ authors = ["OpenZeppelin"]
77
edition = "2024"
88
license = "AGPL-3.0-only"
99
repository = "https://github.com/OpenZeppelin/Event-Scanner"
10-
version = "0.2.0-alpha"
10+
version = "0.2.1-alpha"
1111

1212
[workspace.lints.clippy]
1313
pedantic = "warn"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ Add `event-scanner` to your `Cargo.toml`:
5252

5353
```toml
5454
[dependencies]
55-
event-scanner = "0.2.0-alpha"
55+
event-scanner = "0.2.1-alpha"
5656
```
5757

5858
Create an event stream for the given event filters registered with the `EventScanner`:

src/block_range_scanner.rs

Lines changed: 142 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ pub enum BlockRangeScannerError {
135135

136136
#[error("WebSocket connection failed after {0} attempts")]
137137
WebSocketConnectionFailed(usize),
138+
139+
#[error("Block not found, block number: {0}")]
140+
BlockNotFound(BlockNumberOrTag),
138141
}
139142

140143
impl From<reqwest::Error> for BlockRangeScannerError {
@@ -330,7 +333,7 @@ struct Service<N: Network> {
330333
config: Config,
331334
provider: RootProvider<N>,
332335
subscriber: Option<mpsc::Sender<BlockRangeMessage>>,
333-
current: BlockHashAndNumber,
336+
next_start_block: BlockHashAndNumber,
334337
websocket_connected: bool,
335338
processed_count: u64,
336339
error_count: u64,
@@ -346,7 +349,7 @@ impl<N: Network> Service<N> {
346349
config,
347350
provider,
348351
subscriber: None,
349-
current: BlockHashAndNumber::default(),
352+
next_start_block: BlockHashAndNumber::default(),
350353
websocket_connected: false,
351354
processed_count: 0,
352355
error_count: 0,
@@ -609,36 +612,51 @@ impl<N: Network> Service<N> {
609612
) -> Result<(), BlockRangeScannerError> {
610613
let mut batch_count = 0;
611614

612-
self.current = BlockHashAndNumber::from_header::<N>(start.header());
615+
self.next_start_block = BlockHashAndNumber::from_header::<N>(start.header());
613616

614617
println!("lock");
615618
#[cfg(test)]
616619
lock_historical_for_testing().await;
617620

618-
while self.current.number < end.header().number() {
621+
// must be <= to include the edge case when start == end (i.e. return the single block
622+
// range)
623+
while self.next_start_block.number <= end.header().number() {
619624
self.ensure_current_not_reorged().await?;
620625

621-
let batch_to = self
622-
.current
626+
let batch_end_block_number = self
627+
.next_start_block
623628
.number
624-
.saturating_add(self.config.blocks_read_per_epoch as u64)
629+
.saturating_add(self.config.blocks_read_per_epoch as u64 - 1)
625630
.min(end.header().number());
626631

627-
// safe unwrap since we've checked end block exists
628-
let batch_end_block = self
629-
.provider
630-
.get_block_by_number(batch_to.into())
631-
.await?
632-
.expect("end of the batch should already be ensured to exist");
633-
634-
self.send_to_subscriber(BlockRangeMessage::Data(self.current.number..=batch_to)).await;
635-
636-
self.current = BlockHashAndNumber::from_header::<N>(batch_end_block.header());
632+
self.send_to_subscriber(BlockRangeMessage::Data(
633+
self.next_start_block.number..=batch_end_block_number,
634+
))
635+
.await;
637636

638637
batch_count += 1;
639638
if batch_count % 10 == 0 {
640639
debug!(batch_count = batch_count, "Processed historical batches");
641640
}
641+
642+
if batch_end_block_number == end.header().number() {
643+
break;
644+
}
645+
646+
let next_start_block_number = (batch_end_block_number + 1).into();
647+
let next_start_block =
648+
match self.provider.get_block_by_number(next_start_block_number).await {
649+
Ok(block) => {
650+
block.expect("block number is less than 'end', so it should exist")
651+
}
652+
Err(e) => {
653+
error!(error = %e, "Failed to get block by number");
654+
let e: BlockRangeScannerError = e.into();
655+
self.send_to_subscriber(BlockRangeMessage::Error(e.clone())).await;
656+
return Err(e);
657+
}
658+
};
659+
self.next_start_block = BlockHashAndNumber::from_header::<N>(next_start_block.header());
642660
}
643661

644662
info!(batch_count = batch_count, "Historical sync completed");
@@ -754,7 +772,7 @@ impl<N: Network> Service<N> {
754772
}
755773

756774
async fn ensure_current_not_reorged(&mut self) -> Result<(), BlockRangeScannerError> {
757-
let current_block = self.provider.get_block_by_hash(self.current.hash).await?;
775+
let current_block = self.provider.get_block_by_hash(self.next_start_block.hash).await?;
758776
if current_block.is_some() {
759777
return Ok(());
760778
}
@@ -764,7 +782,7 @@ impl<N: Network> Service<N> {
764782

765783
async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockRangeScannerError> {
766784
let mut new_current_height =
767-
self.current.number.saturating_sub(self.config.reorg_rewind_depth);
785+
self.next_start_block.number.saturating_sub(self.config.reorg_rewind_depth);
768786

769787
let head = self.provider.get_block_number().await?;
770788
if head < new_current_height {
@@ -781,12 +799,12 @@ impl<N: Network> Service<N> {
781799
)))?;
782800

783801
info!(
784-
old_current = self.current.number,
802+
old_current = self.next_start_block.number,
785803
new_current = current.number,
786804
"Rewind on reorg detected"
787805
);
788806

789-
self.current = current;
807+
self.next_start_block = current;
790808

791809
Ok(())
792810
}
@@ -960,18 +978,18 @@ impl BlockRangeScannerClient {
960978
/// # Errors
961979
///
962980
/// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
963-
pub async fn stream_historical(
981+
pub async fn stream_historical<N: Into<BlockNumberOrTag>>(
964982
&self,
965-
start_height: BlockNumberOrTag,
966-
end_height: BlockNumberOrTag,
983+
start_height: N,
984+
end_height: N,
967985
) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
968986
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
969987
let (response_tx, response_rx) = oneshot::channel();
970988

971989
let command = Command::StreamHistorical {
972990
sender: blocks_sender,
973-
start_height,
974-
end_height,
991+
start_height: start_height.into(),
992+
end_height: end_height.into(),
975993
response: response_tx,
976994
};
977995

@@ -996,13 +1014,16 @@ impl BlockRangeScannerClient {
9961014
/// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
9971015
pub async fn stream_from(
9981016
&self,
999-
start_height: BlockNumberOrTag,
1017+
start_height: impl Into<BlockNumberOrTag>,
10001018
) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
10011019
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
10021020
let (response_tx, response_rx) = oneshot::channel();
10031021

1004-
let command =
1005-
Command::StreamFrom { sender: blocks_sender, start_height, response: response_tx };
1022+
let command = Command::StreamFrom {
1023+
sender: blocks_sender,
1024+
start_height: start_height.into(),
1025+
response: response_tx,
1026+
};
10061027

10071028
self.command_sender
10081029
.send(command)
@@ -1055,7 +1076,7 @@ impl BlockRangeScannerClient {
10551076
mod tests {
10561077

10571078
use std::time::Duration;
1058-
use tokio::{join, time::timeout};
1079+
use tokio::{join, sync::mpsc::Receiver, time::timeout};
10591080

10601081
use alloy::{
10611082
network::Ethereum,
@@ -1074,6 +1095,38 @@ mod tests {
10741095

10751096
use super::*;
10761097

1098+
// Trait to enable receiver-type-agnostic range receival
1099+
trait RangeReceiver {
1100+
async fn next_range(&mut self) -> Option<BlockRangeMessage>;
1101+
}
1102+
1103+
impl RangeReceiver for ReceiverStream<BlockRangeMessage> {
1104+
async fn next_range(&mut self) -> Option<BlockRangeMessage> {
1105+
self.next().await
1106+
}
1107+
}
1108+
1109+
impl RangeReceiver for Receiver<BlockRangeMessage> {
1110+
async fn next_range(&mut self) -> Option<BlockRangeMessage> {
1111+
self.recv().await
1112+
}
1113+
}
1114+
1115+
macro_rules! assert_next_range {
1116+
($recv:expr, None) => {
1117+
let next = $recv.next_range().await;
1118+
assert!(next.is_none());
1119+
};
1120+
($recv:expr, $range:expr) => {
1121+
let next = $recv.next_range().await;
1122+
if let Some(BlockRangeMessage::Data(range)) = next {
1123+
assert_eq!($range, range);
1124+
} else {
1125+
panic!("expected block range, got: {next:?}");
1126+
}
1127+
};
1128+
}
1129+
10771130
fn test_config() -> Config {
10781131
Config { blocks_read_per_epoch: 5, reorg_rewind_depth: 5, block_confirmations: 0 }
10791132
}
@@ -1446,7 +1499,8 @@ mod tests {
14461499
let original_height = 10;
14471500
let original_hash = keccak256(b"original block");
14481501
let original_block = mock_block(original_height, original_hash);
1449-
service.current = BlockHashAndNumber::from_header::<Ethereum>(original_block.header());
1502+
service.next_start_block =
1503+
BlockHashAndNumber::from_header::<Ethereum>(original_block.header());
14501504

14511505
let expected_rewind_height = original_height - config.reorg_rewind_depth;
14521506
let expected_rewind_hash = keccak256(b"rewound block");
@@ -1462,13 +1516,69 @@ mod tests {
14621516

14631517
service.ensure_current_not_reorged().await?;
14641518

1465-
let current = service.current;
1519+
let current = service.next_start_block;
14661520
assert_eq!(current.number, expected_rewind_height, "should rewind by reorg_rewind_depth");
14671521
assert_eq!(current.hash, expected_rewind_hash, "should use hash of block at rewind height");
14681522

14691523
Ok(())
14701524
}
14711525

1526+
#[tokio::test]
1527+
async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
1528+
let anvil = Anvil::new().try_spawn()?;
1529+
1530+
let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1531+
1532+
provider.anvil_mine(Option::Some(100), Option::None).await?;
1533+
1534+
let client = BlockRangeScanner::new()
1535+
.with_blocks_read_per_epoch(5)
1536+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1537+
.await?
1538+
.run()?;
1539+
1540+
// ranges where each batch is of max blocks per epoch size
1541+
let mut stream = client.stream_historical(0, 19).await?;
1542+
assert_next_range!(stream, 0..=4);
1543+
assert_next_range!(stream, 5..=9);
1544+
assert_next_range!(stream, 10..=14);
1545+
assert_next_range!(stream, 15..=19);
1546+
assert_next_range!(stream, None);
1547+
1548+
// ranges where last batch is smaller than blocks per epoch
1549+
let mut stream = client.stream_historical(93, 99).await?;
1550+
assert_next_range!(stream, 93..=97);
1551+
assert_next_range!(stream, 98..=99);
1552+
assert_next_range!(stream, None);
1553+
1554+
// range where blocks per epoch is larger than the number of blocks in the range
1555+
let mut stream = client.stream_historical(3, 5).await?;
1556+
assert_next_range!(stream, 3..=5);
1557+
assert_next_range!(stream, None);
1558+
1559+
// single item range
1560+
let mut stream = client.stream_historical(3, 3).await?;
1561+
assert_next_range!(stream, 3..=3);
1562+
assert_next_range!(stream, None);
1563+
1564+
// range where blocks per epoch is larger than the number of blocks on chain
1565+
let client = BlockRangeScanner::new()
1566+
.with_blocks_read_per_epoch(200)
1567+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1568+
.await?
1569+
.run()?;
1570+
1571+
let mut stream = client.stream_historical(0, 20).await?;
1572+
assert_next_range!(stream, 0..=20);
1573+
assert_next_range!(stream, None);
1574+
1575+
let mut stream = client.stream_historical(0, 99).await?;
1576+
assert_next_range!(stream, 0..=99);
1577+
assert_next_range!(stream, None);
1578+
1579+
Ok(())
1580+
}
1581+
14721582
#[tokio::test]
14731583
async fn buffered_messages_trim_ranges_prior_to_cutoff() -> anyhow::Result<()> {
14741584
let cutoff = 50;

tests/historic_to_live/basic.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use event_scanner::{
1111
};
1212
use tokio::{
1313
sync::Mutex,
14-
time::{Duration, sleep, timeout},
14+
time::{Duration, timeout},
1515
};
1616
use tokio_stream::StreamExt;
1717

@@ -47,8 +47,6 @@ async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> {
4747
client.start_scanner(BlockNumberOrTag::Number(first_historical_block), None).await
4848
});
4949

50-
sleep(Duration::from_millis(200)).await;
51-
5250
for _ in 0..live_events {
5351
contract.increase().send().await?.watch().await?;
5452
}

0 commit comments

Comments
 (0)