Skip to content

Commit ffe2219

Browse files
committed
Merge branch 'main' into start-typestate
2 parents 8b79a56 + cdcc2cc commit ffe2219

File tree

5 files changed

+175
-51
lines changed

5 files changed

+175
-51
lines changed

src/block_range_scanner.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,14 +370,17 @@ impl<N: Network> Service<N> {
370370
_ => (start_block_num, end_block_num),
371371
};
372372

373-
info!(start_block = start_block_num, end_block = end_block_num, "Syncing historical data");
373+
info!(
374+
start_block = start_block_num,
375+
end_block = end_block_num,
376+
"Normalized the block range"
377+
);
374378

375379
tokio::spawn(async move {
376380
let mut reorg_handler =
377381
ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
378382

379-
common::stream_block_range(
380-
start_block_num,
383+
_ = common::stream_historical_range(
381384
start_block_num,
382385
end_block_num,
383386
max_block_range,

src/block_range_scanner/common.rs

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
};
1010
use alloy::{
1111
consensus::BlockHeader,
12+
eips::BlockNumberOrTag,
1213
network::{BlockResponse, Network},
1314
primitives::BlockNumber,
1415
};
@@ -135,9 +136,12 @@ async fn initialize_live_streaming_state<N: Network>(
135136

136137
let confirmed = incoming_block_num.saturating_sub(block_confirmations);
137138

139+
// The minimum common ancestor is the block before the stream start
140+
let min_common_ancestor = stream_start.saturating_sub(1);
141+
138142
// Catch up on any confirmed blocks between stream_start and the confirmed tip
139-
let previous_batch_end = stream_block_range(
140-
stream_start,
143+
let previous_batch_end = stream_range_with_reorg_handling(
144+
min_common_ancestor,
141145
stream_start,
142146
confirmed,
143147
max_block_range,
@@ -289,8 +293,11 @@ async fn stream_next_batch<N: Network>(
289293
return true;
290294
}
291295

292-
state.previous_batch_end = stream_block_range(
293-
stream_start,
296+
// The minimum common ancestor is the block before the stream start
297+
let min_common_ancestor = stream_start.saturating_sub(1);
298+
299+
state.previous_batch_end = stream_range_with_reorg_handling(
300+
min_common_ancestor,
294301
state.batch_start,
295302
batch_end_num,
296303
max_block_range,
@@ -319,9 +326,72 @@ struct LiveStreamingState<N: Network> {
319326
previous_batch_end: Option<N::BlockResponse>,
320327
}
321328

322-
/// Assumes that `min_block <= next_start_block <= end`.
323-
pub(crate) async fn stream_block_range<N: Network>(
324-
min_block: BlockNumber,
329+
#[must_use]
330+
pub(crate) async fn stream_historical_range<N: Network>(
331+
start: BlockNumber,
332+
end: BlockNumber,
333+
max_block_range: u64,
334+
sender: &mpsc::Sender<BlockScannerResult>,
335+
provider: &RobustProvider<N>,
336+
reorg_handler: &mut ReorgHandler<N>,
337+
) -> Option<()> {
338+
info!("Getting finalized block number");
339+
let finalized = match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await
340+
{
341+
Ok(block) => block,
342+
Err(e) => {
343+
error!(error = %e, "Failed to get finalized block");
344+
_ = sender.try_stream(e).await;
345+
return None;
346+
}
347+
};
348+
349+
// no reorg check for finalized blocks
350+
let mut batch_start = start;
351+
let finalized_batch_end = finalized.min(end);
352+
while batch_start <= finalized_batch_end {
353+
let batch_end = batch_start.saturating_add(max_block_range - 1).min(finalized_batch_end);
354+
355+
if !sender.try_stream(batch_start..=batch_end).await {
356+
return None; // channel closed
357+
}
358+
359+
batch_start = batch_end + 1;
360+
}
361+
362+
// covers case when `end <= finalized`
363+
if batch_start > end {
364+
return Some(()); // we're done
365+
}
366+
367+
// we have non-finalized block numbers to stream, a reorg can occur
368+
369+
// Possible minimal common ancestors when a reorg occurs:
370+
// * start > finalized -> the common ancestor we care about is the block before `start`, that's
371+
// where the stream should restart -> this is why we used `start - 1`
372+
// * start == finalized -> `start` should never be re-streamed on reorgs; stream should restart
373+
// on `start + 1`
374+
// * start < finalized -> if we got here, then `end > finalized`; on reorg, we should only
375+
// re-stream non-finalized blocks
376+
let min_common_ancestor = (start.saturating_sub(1)).max(finalized);
377+
378+
stream_range_with_reorg_handling(
379+
min_common_ancestor,
380+
batch_start,
381+
end,
382+
max_block_range,
383+
sender,
384+
provider,
385+
reorg_handler,
386+
)
387+
.await?;
388+
389+
Some(())
390+
}
391+
392+
/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
393+
pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
394+
min_common_ancestor: BlockNumber,
325395
mut next_start_block: BlockNumber,
326396
end: BlockNumber,
327397
max_block_range: u64,
@@ -364,13 +434,10 @@ pub(crate) async fn stream_block_range<N: Network>(
364434
if !sender.try_stream(Notification::ReorgDetected).await {
365435
return None;
366436
}
367-
if common_ancestor.header().number() < min_block {
368-
min_block
369-
} else {
370-
common_ancestor.header().number() + 1
371-
}
437+
438+
min_common_ancestor.max(common_ancestor.header().number()) + 1
372439
} else {
373-
batch_end_num.saturating_add(1)
440+
batch_end_num + 1
374441
};
375442

376443
if next_start_block > end {

src/block_range_scanner/sync_handler.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ impl<N: Network> SyncHandler<N> {
127127
)
128128
.await
129129
{
130-
Ok(start_block) => start_block,
130+
Ok(Some(start_block)) => start_block,
131+
Ok(None) => {
132+
return; // channel closed
133+
}
131134
Err(e) => {
132135
error!(error = %e, "Error during historical catchup, shutting down");
133136
_ = sender.try_stream(e).await;
@@ -158,18 +161,21 @@ impl<N: Network> SyncHandler<N> {
158161
sender: &mpsc::Sender<BlockScannerResult>,
159162
provider: &RobustProvider<N>,
160163
reorg_handler: &mut ReorgHandler<N>,
161-
) -> Result<BlockNumber, ScannerError> {
164+
) -> Result<Option<BlockNumber>, ScannerError> {
162165
while start_block < confirmed_tip {
163-
common::stream_block_range(
164-
start_block,
166+
if common::stream_historical_range(
165167
start_block,
166168
confirmed_tip,
167169
max_block_range,
168170
sender,
169171
provider,
170172
reorg_handler,
171173
)
172-
.await;
174+
.await
175+
.is_none()
176+
{
177+
return Ok(None);
178+
}
173179

174180
let latest = provider.get_block_number().await?;
175181

@@ -179,7 +185,7 @@ impl<N: Network> SyncHandler<N> {
179185

180186
info!("Historical catchup complete, ready to transition to live");
181187

182-
Ok(start_block)
188+
Ok(Some(start_block))
183189
}
184190

185191
/// Subscribes to live blocks and begins streaming

src/types.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,30 @@ pub enum Notification {
2525
SwitchingToLive,
2626

2727
/// Emitted when a blockchain reorganization is detected during scanning.
28+
///
29+
/// When a reorg occurs, the scanner adjusts its position to re-stream events from the
30+
/// canonical chain state. The specific behavior depends on the scanning mode (see individual
31+
/// scanner mode documentation for details).
32+
///
33+
/// # Redundant Notifications
34+
///
35+
/// Due to the asynchronous nature of block scanning and log fetching, you may occasionally
36+
/// receive this notification even after the reorg has already been accounted for. This happens
37+
/// when:
38+
///
39+
/// 1. `BlockRangeScanner` validates and emits a block range
40+
/// 2. A reorg occurs on the chain
41+
/// 3. `EventScanner` fetches logs for that range, but the RPC provider returns logs from the
42+
/// post-reorg chain state (the provider's view has already updated)
43+
/// 4. `BlockRangeScanner` detects the reorg on its next check and emits
44+
/// `Notification::ReorgDetected` with a new range starting from the first reorged block
45+
/// 5. `EventScanner` re-fetches logs for this range, which may return duplicate logs already
46+
/// delivered in step 3 (the new range might also extend beyond the original range)
47+
///
48+
/// **How to handle**: This is a benign race condition. Your application should be designed to
49+
/// handle duplicate logs idempotently (e.g., using transaction hashes or log indices as
50+
/// deduplication keys). The scanner prioritizes correctness by ensuring all logs from the
51+
/// canonical chain are delivered, even if it means occasional duplicates during reorgs.
2852
ReorgDetected,
2953

3054
/// Emitted during the latest events phase when no matching logs are found in the

tests/block_range_scanner.rs

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -188,16 +188,16 @@ async fn historical_emits_correction_range_when_reorg_below_end() -> anyhow::Res
188188
.stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num))
189189
.await?;
190190

191-
let depth = 15;
192-
_ = provider.anvil_reorg(ReorgOptions { depth, tx_block_pairs: vec![] }).await;
193-
_ = provider.anvil_mine(Some(20), None).await;
194-
195191
assert_next!(stream, 0..=29);
196-
assert_next!(stream, 30..=59);
197-
assert_next!(stream, 60..=89);
198-
assert_next!(stream, 90..=110);
192+
assert_next!(stream, 30..=56);
193+
assert_next!(stream, 57..=86);
194+
assert_next!(stream, 87..=110);
195+
let mut stream = assert_empty!(stream);
196+
197+
_ = provider.anvil_reorg(ReorgOptions { depth: 15, tx_block_pairs: vec![] }).await;
198+
199199
assert_next!(stream, Notification::ReorgDetected);
200-
assert_next!(stream, 105..=110);
200+
assert_next!(stream, 106..=110);
201201
assert_closed!(stream);
202202

203203
Ok(())
@@ -211,32 +211,55 @@ async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Resu
211211

212212
provider.anvil_mine(Some(120), None).await?;
213213

214-
let end_num = 120;
215-
216214
let client =
217215
BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?.run()?;
218216

219-
let mut stream = client
220-
.stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num))
221-
.await?;
222-
223-
let pre_reorg_mine = 20;
224-
_ = provider.anvil_mine(Some(pre_reorg_mine), None).await;
225-
let depth = pre_reorg_mine + 1;
226-
_ = provider.anvil_reorg(ReorgOptions { depth, tx_block_pairs: vec![] }).await;
227-
_ = provider.anvil_mine(Some(20), None).await;
217+
let mut stream =
218+
client.stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Latest).await?;
228219

229220
assert_next!(stream, 0..=29);
230-
assert_next!(stream, 30..=59);
231-
assert_next!(stream, 60..=89);
232-
assert_next!(stream, 90..=120);
221+
assert_next!(stream, 30..=56);
222+
assert_next!(stream, 57..=86);
223+
assert_next!(stream, 87..=116);
224+
assert_next!(stream, 117..=120);
225+
let mut stream = assert_empty!(stream);
226+
227+
_ = provider.anvil_reorg(ReorgOptions { depth: 1, tx_block_pairs: vec![] }).await;
228+
233229
assert_next!(stream, Notification::ReorgDetected);
234230
assert_next!(stream, 120..=120);
235231
assert_closed!(stream);
236232

237233
Ok(())
238234
}
239235

236+
#[tokio::test]
237+
#[ignore = "enable when ack-channels are introduced: https://github.com/OpenZeppelin/Event-Scanner/issues/218"]
238+
async fn historical_reorg_occurring_immediately_after_finalized_processing_is_ignored()
239+
-> anyhow::Result<()> {
240+
let anvil = Anvil::new().try_spawn()?;
241+
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
242+
243+
provider.anvil_mine(Some(11), None).await?;
244+
245+
let client =
246+
BlockRangeScanner::new().max_block_range(10).connect(provider.clone()).await?.run()?;
247+
248+
let mut stream =
249+
client.stream_historical(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?;
250+
251+
assert_next!(stream, 0..=0);
252+
let mut stream = assert_empty!(stream);
253+
254+
_ = provider.anvil_reorg(ReorgOptions { depth: 3, tx_block_pairs: vec![] }).await;
255+
256+
// no Notification::ReorgDetected is streamed
257+
assert_next!(stream, 1..=11);
258+
assert_closed!(stream);
259+
260+
Ok(())
261+
}
262+
240263
#[tokio::test]
241264
async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
242265
let anvil = Anvil::new().try_spawn()?;
@@ -279,7 +302,8 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> {
279302
assert_closed!(stream);
280303

281304
let mut stream = client.stream_historical(0, 99).await?;
282-
assert_next!(stream, 0..=99);
305+
assert_next!(stream, 0..=36);
306+
assert_next!(stream, 37..=99);
283307
assert_closed!(stream);
284308

285309
Ok(())
@@ -295,15 +319,15 @@ async fn historic_mode_normalises_start_and_end_block() -> anyhow::Result<()> {
295319
let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?;
296320

297321
let mut stream = client.stream_historical(10, 0).await?;
298-
assert_next!(stream, 0..=4);
299-
assert_next!(stream, 5..=9);
300-
assert_next!(stream, 10..=10);
322+
assert_next!(stream, 0..=0);
323+
assert_next!(stream, 1..=5);
324+
assert_next!(stream, 6..=10);
301325
assert_closed!(stream);
302326

303327
let mut stream = client.stream_historical(0, 10).await?;
304-
assert_next!(stream, 0..=4);
305-
assert_next!(stream, 5..=9);
306-
assert_next!(stream, 10..=10);
328+
assert_next!(stream, 0..=0);
329+
assert_next!(stream, 1..=5);
330+
assert_next!(stream, 6..=10);
307331
assert_closed!(stream);
308332

309333
Ok(())

0 commit comments

Comments
 (0)