Skip to content

Commit cdcc2cc

Browse files
authored
Fixes for finalized block processing in historic mode (#225)
1 parent f8fc694 commit cdcc2cc

File tree

3 files changed

+79
-31
lines changed

3 files changed

+79
-31
lines changed

src/block_range_scanner/common.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,8 +349,8 @@ pub(crate) async fn stream_historical_range<N: Network>(
349349
// no reorg check for finalized blocks
350350
let mut batch_start = start;
351351
let finalized_batch_end = finalized.min(end);
352-
while batch_start < finalized_batch_end {
353-
let batch_end = batch_start.saturating_add(max_block_range - 1).min(end);
352+
while batch_start <= finalized_batch_end {
353+
let batch_end = batch_start.saturating_add(max_block_range - 1).min(finalized_batch_end);
354354

355355
if !sender.try_stream(batch_start..=batch_end).await {
356356
return None; // channel closed

src/types.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,30 @@ pub enum Notification {
2525
SwitchingToLive,
2626

2727
/// Emitted when a blockchain reorganization is detected during scanning.
28+
///
29+
/// When a reorg occurs, the scanner adjusts its position to re-stream events from the
30+
/// canonical chain state. The specific behavior depends on the scanning mode (see individual
31+
/// scanner mode documentation for details).
32+
///
33+
/// # Redundant Notifications
34+
///
35+
/// Due to the asynchronous nature of block scanning and log fetching, you may occasionally
36+
/// receive this notification even after the reorg has already been accounted for. This happens
37+
/// when:
38+
///
39+
/// 1. `BlockRangeScanner` validates and emits a block range
40+
/// 2. A reorg occurs on the chain
41+
/// 3. `EventScanner` fetches logs for that range, but the RPC provider returns logs from the
42+
/// post-reorg chain state (the provider's view has already updated)
43+
/// 4. `BlockRangeScanner` detects the reorg on its next check and emits
44+
/// `Notification::ReorgDetected` with a new range starting from the first reorged block
45+
/// 5. `EventScanner` re-fetches logs for this range, which may return duplicate logs already
46+
/// delivered in step 3 (the new range might also extend beyond the original range)
47+
///
48+
/// **How to handle**: This is a benign race condition. Your application should be designed to
49+
/// handle duplicate logs idempotently (e.g., using transaction hashes or log indices as
50+
/// deduplication keys). The scanner prioritizes correctness by ensuring all logs from the
51+
/// canonical chain are delivered, even if it means occasional duplicates during reorgs.
2852
ReorgDetected,
2953

3054
/// Emitted during the latest events phase when no matching logs are found in the

tests/block_range_scanner.rs

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -188,16 +188,16 @@ async fn historical_emits_correction_range_when_reorg_below_end() -> anyhow::Res
188188
.stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num))
189189
.await?;
190190

191-
let depth = 15;
192-
_ = provider.anvil_reorg(ReorgOptions { depth, tx_block_pairs: vec![] }).await;
193-
_ = provider.anvil_mine(Some(20), None).await;
194-
195191
assert_next!(stream, 0..=29);
196-
assert_next!(stream, 30..=59);
197-
assert_next!(stream, 60..=89);
198-
assert_next!(stream, 90..=110);
192+
assert_next!(stream, 30..=56);
193+
assert_next!(stream, 57..=86);
194+
assert_next!(stream, 87..=110);
195+
let mut stream = assert_empty!(stream);
196+
197+
_ = provider.anvil_reorg(ReorgOptions { depth: 15, tx_block_pairs: vec![] }).await;
198+
199199
assert_next!(stream, Notification::ReorgDetected);
200-
assert_next!(stream, 105..=110);
200+
assert_next!(stream, 106..=110);
201201
assert_closed!(stream);
202202

203203
Ok(())
@@ -211,32 +211,55 @@ async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Resu
211211

212212
provider.anvil_mine(Some(120), None).await?;
213213

214-
let end_num = 120;
215-
216214
let client =
217215
BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?.run()?;
218216

219-
let mut stream = client
220-
.stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num))
221-
.await?;
222-
223-
let pre_reorg_mine = 20;
224-
_ = provider.anvil_mine(Some(pre_reorg_mine), None).await;
225-
let depth = pre_reorg_mine + 1;
226-
_ = provider.anvil_reorg(ReorgOptions { depth, tx_block_pairs: vec![] }).await;
227-
_ = provider.anvil_mine(Some(20), None).await;
217+
let mut stream =
218+
client.stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Latest).await?;
228219

229220
assert_next!(stream, 0..=29);
230-
assert_next!(stream, 30..=59);
231-
assert_next!(stream, 60..=89);
232-
assert_next!(stream, 90..=120);
221+
assert_next!(stream, 30..=56);
222+
assert_next!(stream, 57..=86);
223+
assert_next!(stream, 87..=116);
224+
assert_next!(stream, 117..=120);
225+
let mut stream = assert_empty!(stream);
226+
227+
_ = provider.anvil_reorg(ReorgOptions { depth: 1, tx_block_pairs: vec![] }).await;
228+
233229
assert_next!(stream, Notification::ReorgDetected);
234230
assert_next!(stream, 120..=120);
235231
assert_closed!(stream);
236232

237233
Ok(())
238234
}
239235

236+
#[tokio::test]
237+
#[ignore = "enable when ack-channels are introduced: https://github.com/OpenZeppelin/Event-Scanner/issues/218"]
238+
async fn historical_reorg_occurring_immediately_after_finalized_processing_is_ignored()
239+
-> anyhow::Result<()> {
240+
let anvil = Anvil::new().try_spawn()?;
241+
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
242+
243+
provider.anvil_mine(Some(11), None).await?;
244+
245+
let client =
246+
BlockRangeScanner::new().max_block_range(10).connect(provider.clone()).await?.run()?;
247+
248+
let mut stream =
249+
client.stream_historical(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?;
250+
251+
assert_next!(stream, 0..=0);
252+
let mut stream = assert_empty!(stream);
253+
254+
_ = provider.anvil_reorg(ReorgOptions { depth: 3, tx_block_pairs: vec![] }).await;
255+
256+
// no Notification::ReorgDetected is streamed
257+
assert_next!(stream, 1..=11);
258+
assert_closed!(stream);
259+
260+
Ok(())
261+
}
262+
240263
#[tokio::test]
241264
async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
242265
let anvil = Anvil::new().try_spawn()?;
@@ -279,7 +302,8 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
279302
assert_closed!(stream);
280303

281304
let mut stream = client.stream_historical(0, 99).await?;
282-
assert_next!(stream, 0..=99);
305+
assert_next!(stream, 0..=36);
306+
assert_next!(stream, 37..=99);
283307
assert_closed!(stream);
284308

285309
Ok(())
@@ -295,15 +319,15 @@ async fn historic_mode_normalises_start_and_end_block() -> anyhow::Result<()> {
295319
let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?;
296320

297321
let mut stream = client.stream_historical(10, 0).await?;
298-
assert_next!(stream, 0..=4);
299-
assert_next!(stream, 5..=9);
300-
assert_next!(stream, 10..=10);
322+
assert_next!(stream, 0..=0);
323+
assert_next!(stream, 1..=5);
324+
assert_next!(stream, 6..=10);
301325
assert_closed!(stream);
302326

303327
let mut stream = client.stream_historical(0, 10).await?;
304-
assert_next!(stream, 0..=4);
305-
assert_next!(stream, 5..=9);
306-
assert_next!(stream, 10..=10);
328+
assert_next!(stream, 0..=0);
329+
assert_next!(stream, 1..=5);
330+
assert_next!(stream, 6..=10);
307331
assert_closed!(stream);
308332

309333
Ok(())

0 commit comments

Comments
 (0)