Skip to content

Commit e162af7

Browse files
authored
fix: Historic and Sync Respect Blocks Read Per Epoch Config (#119)
1 parent c58deac commit e162af7

File tree

2 files changed

+146
-35
lines changed

2 files changed

+146
-35
lines changed

src/block_range_scanner.rs

Lines changed: 145 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ pub enum BlockRangeScannerError {
133133

134134
#[error("WebSocket connection failed after {0} attempts")]
135135
WebSocketConnectionFailed(usize),
136+
137+
#[error("Block not found, block number: {0}")]
138+
BlockNotFound(BlockNumberOrTag),
136139
}
137140

138141
impl From<reqwest::Error> for BlockRangeScannerError {
@@ -318,7 +321,7 @@ struct Service<N: Network> {
318321
config: Config,
319322
provider: RootProvider<N>,
320323
subscriber: Option<mpsc::Sender<BlockRangeMessage>>,
321-
current: BlockHashAndNumber,
324+
next_start_block: BlockHashAndNumber,
322325
websocket_connected: bool,
323326
processed_count: u64,
324327
error_count: u64,
@@ -334,7 +337,7 @@ impl<N: Network> Service<N> {
334337
config,
335338
provider,
336339
subscriber: None,
337-
current: BlockHashAndNumber::default(),
340+
next_start_block: BlockHashAndNumber::default(),
338341
websocket_connected: false,
339342
processed_count: 0,
340343
error_count: 0,
@@ -573,32 +576,47 @@ impl<N: Network> Service<N> {
573576
) -> Result<(), BlockRangeScannerError> {
574577
let mut batch_count = 0;
575578

576-
self.current = BlockHashAndNumber::from_header::<N>(start.header());
579+
self.next_start_block = BlockHashAndNumber::from_header::<N>(start.header());
577580

578-
while self.current.number < end.header().number() {
581+
// must be <= to include the edge case when start == end (i.e. return the single block
582+
// range)
583+
while self.next_start_block.number <= end.header().number() {
579584
self.ensure_current_not_reorged().await?;
580585

581-
let batch_to = self
582-
.current
586+
let batch_end_block_number = self
587+
.next_start_block
583588
.number
584-
.saturating_add(self.config.blocks_read_per_epoch as u64)
589+
.saturating_add(self.config.blocks_read_per_epoch as u64 - 1)
585590
.min(end.header().number());
586591

587-
// safe unwrap since we've checked end block exists
588-
let batch_end_block = self
589-
.provider
590-
.get_block_by_number(batch_to.into())
591-
.await?
592-
.expect("end of the batch should already be ensured to exist");
593-
594-
self.send_to_subscriber(BlockRangeMessage::Data(self.current.number..=batch_to)).await;
595-
596-
self.current = BlockHashAndNumber::from_header::<N>(batch_end_block.header());
592+
self.send_to_subscriber(BlockRangeMessage::Data(
593+
self.next_start_block.number..=batch_end_block_number,
594+
))
595+
.await;
597596

598597
batch_count += 1;
599598
if batch_count % 10 == 0 {
600599
debug!(batch_count = batch_count, "Processed historical batches");
601600
}
601+
602+
if batch_end_block_number == end.header().number() {
603+
break;
604+
}
605+
606+
let next_start_block_number = (batch_end_block_number + 1).into();
607+
let next_start_block =
608+
match self.provider.get_block_by_number(next_start_block_number).await {
609+
Ok(block) => {
610+
block.expect("block number is less than 'end', so it should exist")
611+
}
612+
Err(e) => {
613+
error!(error = %e, "Failed to get block by number");
614+
let e: BlockRangeScannerError = e.into();
615+
self.send_to_subscriber(BlockRangeMessage::Error(e.clone())).await;
616+
return Err(e);
617+
}
618+
};
619+
self.next_start_block = BlockHashAndNumber::from_header::<N>(next_start_block.header());
602620
}
603621

604622
info!(batch_count = batch_count, "Historical sync completed");
@@ -714,7 +732,7 @@ impl<N: Network> Service<N> {
714732
}
715733

716734
async fn ensure_current_not_reorged(&mut self) -> Result<(), BlockRangeScannerError> {
717-
let current_block = self.provider.get_block_by_hash(self.current.hash).await?;
735+
let current_block = self.provider.get_block_by_hash(self.next_start_block.hash).await?;
718736
if current_block.is_some() {
719737
return Ok(());
720738
}
@@ -724,7 +742,7 @@ impl<N: Network> Service<N> {
724742

725743
async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockRangeScannerError> {
726744
let mut new_current_height =
727-
self.current.number.saturating_sub(self.config.reorg_rewind_depth);
745+
self.next_start_block.number.saturating_sub(self.config.reorg_rewind_depth);
728746

729747
let head = self.provider.get_block_number().await?;
730748
if head < new_current_height {
@@ -741,12 +759,12 @@ impl<N: Network> Service<N> {
741759
)))?;
742760

743761
info!(
744-
old_current = self.current.number,
762+
old_current = self.next_start_block.number,
745763
new_current = current.number,
746764
"Rewind on reorg detected"
747765
);
748766

749-
self.current = current;
767+
self.next_start_block = current;
750768

751769
Ok(())
752770
}
@@ -837,18 +855,18 @@ impl BlockRangeScannerClient {
837855
/// # Errors
838856
///
839857
/// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
840-
pub async fn stream_historical(
858+
pub async fn stream_historical<N: Into<BlockNumberOrTag>>(
841859
&self,
842-
start_height: BlockNumberOrTag,
843-
end_height: BlockNumberOrTag,
860+
start_height: N,
861+
end_height: N,
844862
) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
845863
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
846864
let (response_tx, response_rx) = oneshot::channel();
847865

848866
let command = Command::StreamHistorical {
849867
sender: blocks_sender,
850-
start_height,
851-
end_height,
868+
start_height: start_height.into(),
869+
end_height: end_height.into(),
852870
response: response_tx,
853871
};
854872

@@ -873,13 +891,16 @@ impl BlockRangeScannerClient {
873891
/// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
874892
pub async fn stream_from(
875893
&self,
876-
start_height: BlockNumberOrTag,
894+
start_height: impl Into<BlockNumberOrTag>,
877895
) -> Result<ReceiverStream<BlockRangeMessage>, BlockRangeScannerError> {
878896
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
879897
let (response_tx, response_rx) = oneshot::channel();
880898

881-
let command =
882-
Command::StreamFrom { sender: blocks_sender, start_height, response: response_tx };
899+
let command = Command::StreamFrom {
900+
sender: blocks_sender,
901+
start_height: start_height.into(),
902+
response: response_tx,
903+
};
883904

884905
self.command_sender
885906
.send(command)
@@ -944,11 +965,46 @@ mod tests {
944965
};
945966
use alloy_node_bindings::Anvil;
946967
use serde_json::{Value, json};
947-
use tokio::{sync::mpsc, time::timeout};
968+
use tokio::{
969+
sync::mpsc::{self, Receiver},
970+
time::timeout,
971+
};
948972
use tokio_stream::StreamExt;
949973

950974
use super::*;
951975

976+
// Trait to enable receiver-type-agnostic range receival
977+
trait RangeReceiver {
978+
async fn next_range(&mut self) -> Option<BlockRangeMessage>;
979+
}
980+
981+
impl RangeReceiver for ReceiverStream<BlockRangeMessage> {
982+
async fn next_range(&mut self) -> Option<BlockRangeMessage> {
983+
self.next().await
984+
}
985+
}
986+
987+
impl RangeReceiver for Receiver<BlockRangeMessage> {
988+
async fn next_range(&mut self) -> Option<BlockRangeMessage> {
989+
self.recv().await
990+
}
991+
}
992+
993+
macro_rules! assert_next_range {
994+
($recv:expr, None) => {
995+
let next = $recv.next_range().await;
996+
assert!(next.is_none());
997+
};
998+
($recv:expr, $range:expr) => {
999+
let next = $recv.next_range().await;
1000+
if let Some(BlockRangeMessage::Data(range)) = next {
1001+
assert_eq!($range, range);
1002+
} else {
1003+
panic!("expected block range, got: {next:?}");
1004+
}
1005+
};
1006+
}
1007+
9521008
fn test_config() -> Config {
9531009
Config { blocks_read_per_epoch: 5, reorg_rewind_depth: 5, block_confirmations: 0 }
9541010
}
@@ -1321,7 +1377,8 @@ mod tests {
13211377
let original_height = 10;
13221378
let original_hash = keccak256(b"original block");
13231379
let original_block = mock_block(original_height, original_hash);
1324-
service.current = BlockHashAndNumber::from_header::<Ethereum>(original_block.header());
1380+
service.next_start_block =
1381+
BlockHashAndNumber::from_header::<Ethereum>(original_block.header());
13251382

13261383
let expected_rewind_height = original_height - config.reorg_rewind_depth;
13271384
let expected_rewind_hash = keccak256(b"rewound block");
@@ -1337,13 +1394,69 @@ mod tests {
13371394

13381395
service.ensure_current_not_reorged().await?;
13391396

1340-
let current = service.current;
1397+
let current = service.next_start_block;
13411398
assert_eq!(current.number, expected_rewind_height, "should rewind by reorg_rewind_depth");
13421399
assert_eq!(current.hash, expected_rewind_hash, "should use hash of block at rewind height");
13431400

13441401
Ok(())
13451402
}
13461403

1404+
#[tokio::test]
1405+
async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
1406+
let anvil = Anvil::new().try_spawn()?;
1407+
1408+
let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
1409+
1410+
provider.anvil_mine(Option::Some(100), Option::None).await?;
1411+
1412+
let client = BlockRangeScanner::new()
1413+
.with_blocks_read_per_epoch(5)
1414+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1415+
.await?
1416+
.run()?;
1417+
1418+
// ranges where each batch is of max blocks per epoch size
1419+
let mut stream = client.stream_historical(0, 19).await?;
1420+
assert_next_range!(stream, 0..=4);
1421+
assert_next_range!(stream, 5..=9);
1422+
assert_next_range!(stream, 10..=14);
1423+
assert_next_range!(stream, 15..=19);
1424+
assert_next_range!(stream, None);
1425+
1426+
// ranges where last batch is smaller than blocks per epoch
1427+
let mut stream = client.stream_historical(93, 99).await?;
1428+
assert_next_range!(stream, 93..=97);
1429+
assert_next_range!(stream, 98..=99);
1430+
assert_next_range!(stream, None);
1431+
1432+
// range where blocks per epoch is larger than the number of blocks in the range
1433+
let mut stream = client.stream_historical(3, 5).await?;
1434+
assert_next_range!(stream, 3..=5);
1435+
assert_next_range!(stream, None);
1436+
1437+
// single item range
1438+
let mut stream = client.stream_historical(3, 3).await?;
1439+
assert_next_range!(stream, 3..=3);
1440+
assert_next_range!(stream, None);
1441+
1442+
// range where blocks per epoch is larger than the number of blocks on chain
1443+
let client = BlockRangeScanner::new()
1444+
.with_blocks_read_per_epoch(200)
1445+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1446+
.await?
1447+
.run()?;
1448+
1449+
let mut stream = client.stream_historical(0, 20).await?;
1450+
assert_next_range!(stream, 0..=20);
1451+
assert_next_range!(stream, None);
1452+
1453+
let mut stream = client.stream_historical(0, 99).await?;
1454+
assert_next_range!(stream, 0..=99);
1455+
assert_next_range!(stream, None);
1456+
1457+
Ok(())
1458+
}
1459+
13471460
#[tokio::test]
13481461
async fn buffered_messages_trim_ranges_prior_to_cutoff() -> anyhow::Result<()> {
13491462
let cutoff = 50;

tests/historic_to_live/basic.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use event_scanner::{
1111
};
1212
use tokio::{
1313
sync::Mutex,
14-
time::{Duration, sleep, timeout},
14+
time::{Duration, timeout},
1515
};
1616
use tokio_stream::StreamExt;
1717

@@ -47,8 +47,6 @@ async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> {
4747
client.start_scanner(BlockNumberOrTag::Number(first_historical_block), None).await
4848
});
4949

50-
sleep(Duration::from_millis(200)).await;
51-
5250
for _ in 0..live_events {
5351
contract.increase().send().await?.watch().await?;
5452
}

0 commit comments

Comments
 (0)