Skip to content

Commit b48a74e

Browse files
committed
feat: add reorg support
1 parent de1b8fb commit b48a74e

File tree

1 file changed

+135
-91
lines changed

1 file changed

+135
-91
lines changed

src/block_range_scanner.rs

Lines changed: 135 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ use alloy::{
7575
consensus::BlockHeader,
7676
eips::BlockNumberOrTag,
7777
network::{BlockResponse, Network, primitives::HeaderResponse},
78-
primitives::{BlockHash, BlockNumber},
78+
primitives::{B256, BlockHash, BlockNumber},
7979
providers::{Provider, RootProvider},
8080
pubsub::Subscription,
8181
rpc::client::ClientBuilder,
@@ -95,7 +95,8 @@ pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
9595

9696
pub const MAX_BUFFERED_MESSAGES: usize = 50000;
9797

98-
pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;
98+
// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block is considered final)
99+
pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64;
99100

100101
// // State sync aware retry settings
101102
// const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30);
@@ -212,7 +213,7 @@ struct Config {
212213

213214
pub struct BlockRangeScanner {
214215
blocks_read_per_epoch: usize,
215-
reorg_rewind_depth: u64,
216+
max_reorg_depth: u64,
216217
block_confirmations: u64,
217218
}
218219

@@ -227,7 +228,7 @@ impl BlockRangeScanner {
227228
pub fn new() -> Self {
228229
Self {
229230
blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH,
230-
reorg_rewind_depth: DEFAULT_REORG_REWIND_DEPTH,
231+
max_reorg_depth: DEFAULT_REORG_REWIND_DEPTH,
231232
block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
232233
}
233234
}
@@ -240,7 +241,7 @@ impl BlockRangeScanner {
240241

241242
#[must_use]
242243
pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
243-
self.reorg_rewind_depth = reorg_rewind_depth;
244+
self.max_reorg_depth = reorg_rewind_depth;
244245
self
245246
}
246247

@@ -290,7 +291,7 @@ impl BlockRangeScanner {
290291
provider,
291292
config: Config {
292293
blocks_read_per_epoch: self.blocks_read_per_epoch,
293-
reorg_rewind_depth: self.reorg_rewind_depth,
294+
reorg_rewind_depth: self.max_reorg_depth,
294295
block_confirmations: self.block_confirmations,
295296
},
296297
})
@@ -591,50 +592,88 @@ impl<N: Network> Service<N> {
591592
.provider
592593
.get_block_by_number(start_height)
593594
.await?
594-
.ok_or(BlockRangeScannerError::BlockNotFound(start_height))?
595-
.header()
596-
.number();
595+
.ok_or(BlockRangeScannerError::BlockNotFound(start_height))?;
597596
let end_block = self
598597
.provider
599598
.get_block_by_number(end_height)
600599
.await?
601-
.ok_or(BlockRangeScannerError::BlockNotFound(end_height))?
602-
.header()
603-
.number();
600+
.ok_or(BlockRangeScannerError::BlockNotFound(end_height))?;
604601

605-
let block_range = match start_block.cmp(&end_block) {
606-
Ordering::Greater => end_block..=start_block,
607-
_ => start_block..=end_block,
608-
};
602+
// normalize block range
603+
let (start_block, end_block) =
604+
match start_block.header().number().cmp(&end_block.header().number()) {
605+
Ordering::Greater => (end_block, start_block),
606+
_ => (start_block, end_block),
607+
};
609608

610-
self.stream_rewind(block_range).await;
609+
self.stream_rewind(start_block, end_block).await?;
611610

612611
_ = self.subscriber.take();
613612

614613
Ok(())
615614
}
616615

617-
async fn stream_rewind(&mut self, block_range: RangeInclusive<BlockNumber>) {
616+
async fn stream_rewind(
617+
&mut self,
618+
from: N::BlockResponse,
619+
to: N::BlockResponse,
620+
) -> Result<(), BlockRangeScannerError> {
618621
let mut batch_count = 0;
619622
let blocks_read_per_epoch = self.config.blocks_read_per_epoch;
620623

624+
// for checking whether reorg occurred
625+
let mut to_hash = to.header().hash();
626+
627+
let from = from.header().number();
628+
let to = to.header().number();
629+
let stream_end = from;
630+
621631
// we're iterating in reverse
622-
let stream_end = *block_range.start();
623-
let range_iter = block_range.rev().step_by(blocks_read_per_epoch);
632+
let mut batch_from = to;
624633

625-
for batch_end in range_iter {
626-
let batch_start =
627-
batch_end.saturating_sub(blocks_read_per_epoch as u64 - 1).max(stream_end);
634+
while batch_from >= stream_end {
635+
let batch_to =
636+
batch_from.saturating_sub(blocks_read_per_epoch as u64 - 1).max(stream_end);
628637

629-
self.send_to_subscriber(BlockRangeMessage::Data(batch_start..=batch_end)).await;
638+
// stream the range regularly, i.e. from smaller block number to greater
639+
self.send_to_subscriber(BlockRangeMessage::Data(batch_to..=batch_from)).await;
630640

631641
batch_count += 1;
632642
if batch_count % 10 == 0 {
633643
debug!(batch_count = batch_count, "Processed rewind batches");
634644
}
645+
646+
// check early if end of stream achieved to avoid subtraction overflow when `stream_end == 0`
647+
if batch_to == stream_end {
648+
break;
649+
}
650+
651+
if self.reorg_detected(to_hash).await? {
652+
info!(block_number = %to, hash = %to_hash, "Reorg detected");
653+
self.send_to_subscriber(BlockRangeMessage::Status(ScannerStatus::ReorgDetected))
654+
.await;
655+
// restart rewind
656+
batch_from = to;
657+
// store the updated end block hash
658+
to_hash = self
659+
.provider
660+
.get_block_by_number(to.into())
661+
.await?
662+
.expect("Chain should have the same height post-reorg")
663+
.header()
664+
.hash();
665+
} else {
666+
batch_from = batch_to - 1;
667+
}
635668
}
636669

637670
info!(batch_count = batch_count, "Rewind completed");
671+
672+
Ok(())
673+
}
674+
675+
async fn reorg_detected(&self, hash_to_check: B256) -> Result<bool, BlockRangeScannerError> {
676+
Ok(self.provider.get_block_by_hash(hash_to_check).await?.is_none())
638677
}
639678

640679
async fn sync_historical_data(
@@ -1128,7 +1167,7 @@ mod tests {
11281167
let scanner = BlockRangeScanner::new();
11291168

11301169
assert_eq!(scanner.blocks_read_per_epoch, DEFAULT_BLOCKS_READ_PER_EPOCH);
1131-
assert_eq!(scanner.reorg_rewind_depth, DEFAULT_REORG_REWIND_DEPTH);
1170+
assert_eq!(scanner.max_reorg_depth, DEFAULT_REORG_REWIND_DEPTH);
11321171
assert_eq!(scanner.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
11331172
}
11341173

@@ -1729,117 +1768,122 @@ mod tests {
17291768

17301769
#[tokio::test]
17311770
async fn rewind_single_batch_when_epoch_larger_than_range() -> anyhow::Result<()> {
1732-
let asserter = Asserter::new();
1733-
let provider = mocked_provider(asserter);
1771+
let anvil = Anvil::new().try_spawn()?;
17341772

1735-
let mut config = test_config();
1736-
config.blocks_read_per_epoch = 100;
1737-
let (mut service, _cmd) = Service::new(config, provider);
1773+
let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
17381774

1739-
let (tx, mut rx) = mpsc::channel(16);
1740-
service.subscriber = Some(tx);
1775+
provider.anvil_mine(Option::Some(150), Option::None).await?;
17411776

1742-
// Range length is 51, epoch is 100 -> single batch [100..=150]
1743-
service.stream_rewind(100..=150).await;
1744-
service.handle_unsubscribe();
1777+
let client = BlockRangeScanner::new()
1778+
.with_blocks_read_per_epoch(100)
1779+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1780+
.await?
1781+
.run()?;
17451782

1746-
assert_next!(rx, 100..=150);
1747-
assert_next!(rx, None);
1783+
let mut stream = client.rewind(100, 150).await?;
1784+
1785+
// Range length is 51, epoch is 100 -> single batch [100..=150]
1786+
assert_next!(stream, 100..=150);
1787+
assert_next!(stream, None);
17481788

17491789
Ok(())
17501790
}
17511791

17521792
#[tokio::test]
17531793
async fn rewind_exact_multiple_of_epoch_creates_full_batches_in_reverse() -> anyhow::Result<()>
17541794
{
1755-
let asserter = Asserter::new();
1756-
let provider = mocked_provider(asserter);
1795+
let anvil = Anvil::new().try_spawn()?;
17571796

1758-
let mut config = test_config();
1759-
config.blocks_read_per_epoch = 5;
1760-
let (mut service, _cmd) = Service::new(config, provider);
1797+
let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
17611798

1762-
let (tx, mut rx) = mpsc::channel(16);
1763-
service.subscriber = Some(tx);
1799+
provider.anvil_mine(Option::Some(15), Option::None).await?;
17641800

1765-
// 0..=14 with epoch 5 -> [10..=14, 5..=9, 0..=4]
1766-
service.stream_rewind(0..=14).await;
1767-
service.handle_unsubscribe();
1801+
let client = BlockRangeScanner::new()
1802+
.with_blocks_read_per_epoch(5)
1803+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1804+
.await?
1805+
.run()?;
1806+
1807+
let mut stream = client.rewind(0, 14).await?;
17681808

1769-
assert_next!(rx, 10..=14);
1770-
assert_next!(rx, 5..=9);
1771-
assert_next!(rx, 0..=4);
1772-
assert_next!(rx, None);
1809+
// 0..=14 with epoch 5 -> [10..=14, 5..=9, 0..=4]
1810+
assert_next!(stream, 10..=14);
1811+
assert_next!(stream, 5..=9);
1812+
assert_next!(stream, 0..=4);
1813+
assert_next!(stream, None);
17731814

17741815
Ok(())
17751816
}
17761817

17771818
#[tokio::test]
17781819
async fn rewind_with_remainder_trims_first_batch_to_stream_start() -> anyhow::Result<()> {
1779-
let asserter = Asserter::new();
1780-
let provider = mocked_provider(asserter);
1820+
let anvil = Anvil::new().try_spawn()?;
17811821

1782-
let mut config = test_config();
1783-
config.blocks_read_per_epoch = 4;
1784-
let (mut service, _cmd) = Service::new(config, provider);
1822+
let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
17851823

1786-
let (tx, mut rx) = mpsc::channel(16);
1787-
service.subscriber = Some(tx);
1824+
provider.anvil_mine(Option::Some(15), Option::None).await?;
17881825

1789-
// 3..=12 with epoch 4 -> ends: 12,8,4 -> batches: [9..=12, 5..=8, 3..=4]
1790-
service.stream_rewind(3..=12).await;
1791-
service.handle_unsubscribe();
1826+
let client = BlockRangeScanner::new()
1827+
.with_blocks_read_per_epoch(4)
1828+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1829+
.await?
1830+
.run()?;
1831+
1832+
let mut stream = client.rewind(3, 12).await?;
17921833

1793-
assert_next!(rx, 9..=12);
1794-
assert_next!(rx, 5..=8);
1795-
assert_next!(rx, 3..=4);
1796-
assert_next!(rx, None);
1834+
// 3..=12 with epoch 4 -> ends: 12,8,4 -> batches: [9..=12, 5..=8, 3..=4]
1835+
assert_next!(stream, 9..=12);
1836+
assert_next!(stream, 5..=8);
1837+
assert_next!(stream, 3..=4);
1838+
assert_next!(stream, None);
17971839

17981840
Ok(())
17991841
}
18001842

18011843
#[tokio::test]
18021844
async fn rewind_single_block_range() -> anyhow::Result<()> {
1803-
let asserter = Asserter::new();
1804-
let provider = mocked_provider(asserter);
1845+
let anvil = Anvil::new().try_spawn()?;
18051846

1806-
let mut config = test_config();
1807-
config.blocks_read_per_epoch = 5;
1808-
let (mut service, _cmd) = Service::new(config, provider);
1847+
let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
18091848

1810-
let (tx, mut rx) = mpsc::channel(4);
1811-
service.subscriber = Some(tx);
1849+
provider.anvil_mine(Option::Some(15), Option::None).await?;
18121850

1813-
service.stream_rewind(7..=7).await;
1814-
service.handle_unsubscribe();
1851+
let client = BlockRangeScanner::new()
1852+
.with_blocks_read_per_epoch(5)
1853+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1854+
.await?
1855+
.run()?;
1856+
1857+
let mut stream = client.rewind(7, 7).await?;
18151858

1816-
assert_next!(rx, 7..=7);
1817-
assert_next!(rx, None);
1859+
assert_next!(stream, 7..=7);
1860+
assert_next!(stream, None);
18181861

18191862
Ok(())
18201863
}
18211864

18221865
#[tokio::test]
18231866
async fn rewind_epoch_of_one_sends_each_block_in_reverse_order() -> anyhow::Result<()> {
1824-
let asserter = Asserter::new();
1825-
let provider = mocked_provider(asserter);
1867+
let anvil = Anvil::new().try_spawn()?;
18261868

1827-
let mut config = test_config();
1828-
config.blocks_read_per_epoch = 1;
1829-
let (mut service, _cmd) = Service::new(config, provider);
1869+
let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?;
18301870

1831-
let (tx, mut rx) = mpsc::channel(16);
1832-
service.subscriber = Some(tx);
1871+
provider.anvil_mine(Option::Some(15), Option::None).await?;
18331872

1834-
// 5..=8 with epoch 1 -> [8..=8, 7..=7, 6..=6, 5..=5]
1835-
service.stream_rewind(5..=8).await;
1836-
service.handle_unsubscribe();
1873+
let client = BlockRangeScanner::new()
1874+
.with_blocks_read_per_epoch(1)
1875+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
1876+
.await?
1877+
.run()?;
18371878

1838-
assert_next!(rx, 8..=8);
1839-
assert_next!(rx, 7..=7);
1840-
assert_next!(rx, 6..=6);
1841-
assert_next!(rx, 5..=5);
1842-
assert_next!(rx, None);
1879+
let mut stream = client.rewind(5, 8).await?;
1880+
1881+
// 5..=8 with epoch 1 -> [8..=8, 7..=7, 6..=6, 5..=5]
1882+
assert_next!(stream, 8..=8);
1883+
assert_next!(stream, 7..=7);
1884+
assert_next!(stream, 6..=6);
1885+
assert_next!(stream, 5..=5);
1886+
assert_next!(stream, None);
18431887

18441888
Ok(())
18451889
}

0 commit comments

Comments
 (0)