Skip to content

Commit 26c0fce

Browse files
committed
Merge branch 'main' into abstract-loop
2 parents 500405e + 0bfd0c9 commit 26c0fce

File tree

8 files changed

+219
-124
lines changed

8 files changed

+219
-124
lines changed

src/block_range_scanner.rs

Lines changed: 90 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use tokio_stream::wrappers::ReceiverStream;
6969
use crate::{
7070
ScannerError, ScannerMessage,
7171
block_range_scanner::sync_handler::SyncHandler,
72-
robust_provider::{IntoRobustProvider, RobustProvider, provider::Error as RobustProviderError},
72+
robust_provider::{IntoRobustProvider, RobustProvider},
7373
types::{IntoScannerResult, Notification, ScannerResult, TryStream},
7474
};
7575

@@ -445,17 +445,6 @@ impl<N: Network> Service<N> {
445445
}
446446

447447
/// Streams blocks in reverse order from `from` to `to`.
448-
///
449-
/// The `from` block is assumed to be greater than or equal to the `to` block.
450-
///
451-
/// # Reorg Handling
452-
///
453-
/// Reorg checks are only performed when the tip is above the current finalized
454-
/// block height.
455-
///
456-
/// # Errors
457-
///
458-
/// Returns an error if the stream fails
459448
async fn stream_rewind(
460449
from: N::BlockResponse,
461450
to: N::BlockResponse,
@@ -515,6 +504,8 @@ impl<N: Network> Service<N> {
515504
return;
516505
}
517506
}
507+
508+
batch_from = batch_to - 1;
518509
}
519510

520511
info!(batch_count = iter.batch_count(), "Rewind completed");
@@ -539,7 +530,7 @@ impl<N: Network> Service<N> {
539530
"Reorg detected"
540531
);
541532

542-
if !sender.try_stream(Notification::ReorgDetected { common_ancestor_block }).await {
533+
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
543534
return false;
544535
}
545536

@@ -567,6 +558,63 @@ impl<N: Network> Service<N> {
567558

568559
true
569560
}
561+
562+
/// Handles re-scanning of reorged blocks.
563+
///
564+
/// Returns `true` on success, `false` if stream closed or terminal error occurred.
565+
async fn handle_reorg_rescan(
566+
tip: &mut N::BlockResponse,
567+
common_ancestor: N::BlockResponse,
568+
max_block_range: u64,
569+
sender: &mpsc::Sender<BlockScannerResult>,
570+
provider: &RobustProvider<N>,
571+
) -> bool {
572+
let tip_number = tip.header().number();
573+
let common_ancestor = common_ancestor.header().number();
574+
info!(
575+
block_number = %tip_number,
576+
hash = %tip.header().hash(),
577+
common_ancestor = %common_ancestor,
578+
"Reorg detected"
579+
);
580+
581+
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
582+
return false;
583+
}
584+
585+
// Get the new tip block (same height as original tip, but new hash)
586+
*tip = match provider.get_block_by_number(tip_number.into()).await {
587+
Ok(block) => block,
588+
Err(e) => {
589+
if matches!(e, crate::robust_provider::Error::BlockNotFound(_)) {
590+
error!("Unexpected error: pre-reorg chain tip should exist on a reorged chain");
591+
} else {
592+
error!(error = %e, "Terminal RPC call error, shutting down");
593+
}
594+
_ = sender.try_stream(e).await;
595+
return false;
596+
}
597+
};
598+
599+
// Re-scan only the affected range (from common_ancestor + 1 up to tip)
600+
let rescan_from = common_ancestor + 1;
601+
602+
let mut rescan_batch_start = rescan_from;
603+
while rescan_batch_start <= tip_number {
604+
let rescan_batch_end = (rescan_batch_start + max_block_range - 1).min(tip_number);
605+
606+
if !sender.try_stream(rescan_batch_start..=rescan_batch_end).await {
607+
return false;
608+
}
609+
610+
if rescan_batch_end == tip_number {
611+
break;
612+
}
613+
rescan_batch_start = rescan_batch_end + 1;
614+
}
615+
616+
true
617+
}
570618
}
571619

572620
pub struct BlockRangeScannerClient {
@@ -679,14 +727,41 @@ impl BlockRangeScannerClient {
679727

680728
/// Streams blocks in reverse order from `start_id` to `end_id`.
681729
///
730+
/// The `start_id` block is assumed to be greater than or equal to the `end_id` block.
731+
/// Blocks are streamed in batches, where each batch is ordered from lower to higher
732+
/// block numbers (chronological order within each batch), but batches themselves
733+
/// progress from newer to older blocks.
734+
///
682735
/// # Arguments
683736
///
684-
/// * `start_id` - The starting block id (defaults to Latest if None).
685-
/// * `end_id` - The ending block id (defaults to Earliest if None).
737+
/// * `start_id` - The starting block id (higher block number).
738+
/// * `end_id` - The ending block id (lower block number).
739+
///
740+
/// # Reorg Handling
741+
///
742+
/// Reorg checks are only performed when the specified block range tip is above the
743+
/// current finalized block height. When a reorg is detected:
744+
///
745+
/// 1. A [`Notification::ReorgDetected`] is emitted with the common ancestor block
746+
/// 2. The scanner fetches the new tip block at the same height
747+
/// 3. Reorged blocks are re-streamed in chronological order (from `common_ancestor + 1` up to
748+
/// the new tip)
749+
/// 4. The reverse scan continues from where it left off
750+
///
751+
/// If the range tip is at or below the finalized block, no reorg checks are
752+
/// performed since finalized blocks cannot be reorganized.
753+
///
754+
/// # Note
755+
///
756+
/// The reason reorged blocks are streamed in chronological order is to make it easier to handle
757+
/// reorgs in [`EventScannerBuilder::latest`][latest mode] mode, i.e. to prepend reorged blocks
758+
/// to the result collection, which must maintain chronological order.
686759
///
687760
/// # Errors
688761
///
689762
/// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
763+
///
764+
/// [latest mode]: crate::EventScannerBuilder::latest
690765
pub async fn rewind(
691766
&self,
692767
start_id: impl Into<BlockId>,

src/block_range_scanner/common.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,7 @@ async fn handle_reorg_detected<N: Network>(
253253
) -> bool {
254254
let ancestor_num = common_ancestor.header().number();
255255

256-
if !sender.try_stream(Notification::ReorgDetected { common_ancestor_block: ancestor_num }).await
257-
{
256+
if !sender.try_stream(Notification::ReorgDetected { common_ancestor: ancestor_num }).await {
258257
return false;
259258
}
260259

@@ -427,11 +426,11 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
427426
};
428427

429428
if let Some(common_ancestor) = reorged_opt {
430-
let common_ancestor_block = common_ancestor.header().number();
431-
if !sender.try_stream(Notification::ReorgDetected { common_ancestor_block }).await {
429+
let common_ancestor = common_ancestor.header().number();
430+
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
432431
return None;
433432
}
434-
iter.reset_to((common_ancestor_block + 1).max(min_common_ancestor));
433+
iter.reset_to((common_ancestor + 1).max(min_common_ancestor));
435434
}
436435

437436
last_batch_end = Some(batch_end);

src/event_scanner/scanner/common.rs

Lines changed: 47 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -193,21 +193,27 @@ pub fn spawn_log_consumers_in_collection_mode<N: Network>(
193193
}
194194
}
195195
Ok(ScannerMessage::Notification(Notification::ReorgDetected {
196-
common_ancestor_block,
196+
common_ancestor,
197197
})) => {
198198
info!(
199-
common_ancestor_block = common_ancestor_block,
199+
common_ancestor = common_ancestor,
200200
"Received ReorgDetected notification"
201201
);
202202

203203
// Invalidate logs from reorged blocks
204204
// Logs are ordered newest -> oldest, so skip logs with
205205
// block_number > common_ancestor at the front
206+
// NOTE: Pending logs are not supported therefore this filter
207+
// works for now (may need to update once they are). Tracked in
208+
// <https://github.com/OpenZeppelin/Event-Scanner/issues/244>
206209
let before_count = collected.len();
207210
collected = collected
208211
.into_iter()
209212
.skip_while(|log| {
210-
log.block_number.is_some_and(|n| n > common_ancestor_block)
213+
// Pending blocks aren't supported therefore this filter
214+
// works for now (may need to update once they are).
215+
// Tracked in <https://github.com/OpenZeppelin/Event-Scanner/issues/244>
216+
log.block_number.is_some_and(|n| n > common_ancestor)
211217
})
212218
.collect();
213219
let removed_count = before_count - collected.len();
@@ -220,7 +226,7 @@ pub fn spawn_log_consumers_in_collection_mode<N: Network>(
220226
}
221227

222228
// Track reorg state for proper log ordering
223-
reorg_ancestor = Some(common_ancestor_block);
229+
reorg_ancestor = Some(common_ancestor);
224230

225231
// Don't forward the notification to the user in CollectLatest mode
226232
// since logs haven't been sent yet
@@ -265,15 +271,15 @@ pub fn spawn_log_consumers_in_collection_mode<N: Network>(
265271

266272
/// Collects logs into the buffer, either prepending (reorg recovery) or appending (normal).
267273
/// Returns `true` if collection is complete (reached count limit).
268-
fn collect_logs(collected: &mut Vec<Log>, logs: Vec<Log>, count: usize, prepend: bool) -> bool {
274+
fn collect_logs<T>(collected: &mut Vec<T>, logs: Vec<T>, count: usize, prepend: bool) -> bool {
269275
if prepend {
270276
// Reorg rescan ranges are sent in ascending order (oldest → latest), opposite to normal
271277
// rewind which sends descending (latest → oldest). This means each successive reorg batch
272278
// contains newer blocks, so we always prepend at position 0 to maintain newest-first order.
273279
// Example: reorg rescan sends 86..=95 then 96..=100
274280
// - First batch (86..=95): prepend → [95, 94, ..., 86]
275281
// - Second batch (96..=100): prepend → [100, 99, ..., 96, 95, 94, ..., 86]
276-
let new_logs: Vec<_> = logs.into_iter().rev().take(count).collect();
282+
let new_logs = logs.into_iter().rev().take(count);
277283
let keep = count.saturating_sub(new_logs.len());
278284
collected.truncate(keep);
279285
collected.splice(..0, new_logs);
@@ -328,119 +334,102 @@ async fn get_logs<N: Network>(
328334
#[cfg(test)]
329335
mod tests {
330336
use super::*;
331-
use alloy::primitives::{Address, B256, Bytes, LogData};
332-
333-
fn make_log(block_number: u64) -> Log {
334-
Log {
335-
inner: alloy::primitives::Log {
336-
address: Address::ZERO,
337-
data: LogData::new(vec![], Bytes::new()).unwrap(),
338-
},
339-
block_hash: Some(B256::ZERO),
340-
block_number: Some(block_number),
341-
block_timestamp: None,
342-
transaction_hash: Some(B256::ZERO),
343-
transaction_index: Some(0),
344-
log_index: Some(0),
345-
removed: false,
346-
}
347-
}
348-
349-
fn block_numbers(logs: &[Log]) -> Vec<u64> {
350-
logs.iter().filter_map(|l| l.block_number).collect()
351-
}
352337

353338
#[test]
354339
fn collect_logs_appends_in_reverse_order() {
355340
let mut collected = vec![];
356-
let new_logs = vec![make_log(10), make_log(11), make_log(12)];
341+
let new_logs = vec![10, 11, 12];
357342

358343
let done = collect_logs(&mut collected, new_logs, 5, false);
359344

360345
assert!(!done);
346+
// logs are reversed (newest first): 12, 11, 10
347+
assert_eq!(collected, vec![12, 11, 10]);
348+
}
349+
350+
#[test]
351+
fn collect_logs_prepends_in_reverse_order() {
352+
let mut collected = vec![];
353+
let new_logs = vec![10, 11, 12];
354+
355+
let done = collect_logs(&mut collected, new_logs, 5, true);
361356

357+
assert!(!done);
362358
// logs are reversed (newest first): 12, 11, 10
363-
assert_eq!(block_numbers(&collected), vec![12, 11, 10]);
359+
assert_eq!(collected, vec![12, 11, 10]);
364360
}
365361

366362
#[test]
367363
fn collect_logs_stops_at_count() {
368-
let mut collected = vec![make_log(15), make_log(14)];
369-
let new_logs = vec![make_log(10), make_log(11), make_log(12), make_log(13)];
364+
let mut collected = vec![15, 14];
365+
let new_logs = vec![10, 11, 12, 13];
370366

371367
let done = collect_logs(&mut collected, new_logs, 5, false);
372368

373369
assert!(done);
374-
375370
// takes only 3 more (count=5, had 2), reversed: 13, 12, 11
376-
assert_eq!(block_numbers(&collected), vec![15, 14, 13, 12, 11]);
371+
assert_eq!(collected, vec![15, 14, 13, 12, 11]);
377372
}
378373

379374
#[test]
380375
fn collect_logs_prepends_during_reorg_recovery() {
381376
// Had logs from blocks 75, 70
382377
// Reorg at block 80, now getting replacement logs for 85, 90
383-
let mut collected = vec![make_log(75), make_log(70)];
384-
let new_logs = vec![make_log(85), make_log(90)];
378+
let mut collected = vec![75, 70];
379+
let new_logs = vec![85, 90];
385380

386381
let done = collect_logs(&mut collected, new_logs, 5, true);
387382

388383
assert!(!done);
389-
390384
// prepended (reversed): 90, 85, then existing: 75, 70
391-
assert_eq!(block_numbers(&collected), vec![90, 85, 75, 70]);
385+
assert_eq!(collected, vec![90, 85, 75, 70]);
392386
}
393387

394388
#[test]
395-
fn collect_logs_prepend_takes_all_and_truncates() {
389+
fn collect_logs_prioritizes_prepended_logs_when_truncating() {
396390
// Had 4 logs, count=5, prepending 3 new logs
397-
let mut collected = vec![make_log(75), make_log(70), make_log(65), make_log(60)];
398-
let new_logs = vec![make_log(85), make_log(90), make_log(95)];
391+
let mut collected = vec![75, 70, 65, 60];
392+
let new_logs = vec![85, 90, 95];
399393

400394
let done = collect_logs(&mut collected, new_logs, 5, true);
401395

402396
assert!(done);
403397
// All 3 new logs prepended (reversed: 95,90,85)
404398
// [95, 90, 85, 75, 70] (60 dropped as oldest)
405-
assert_eq!(block_numbers(&collected), vec![95, 90, 85, 75, 70]);
406-
}
399+
assert_eq!(collected, vec![95, 90, 85, 75, 70]);
407400

408-
#[test]
409-
fn reorg_scenario_remove_3_add_4() {
410-
// count=5, had 5 logs, reorg removes 3, leaving 2
411-
// Then 4 replacement logs arrive
412-
let mut collected = vec![make_log(85), make_log(80)];
413-
let new_logs = vec![make_log(88), make_log(92), make_log(96), make_log(100)];
401+
// edge case: more incoming logs than collected
402+
let mut collected = vec![75, 70, 65, 60];
403+
let new_logs = vec![85, 90, 95, 100, 105];
414404

415405
let done = collect_logs(&mut collected, new_logs, 5, true);
416406

417407
assert!(done);
418-
// [100, 96, 92, 88, 85] (80 dropped)
419-
assert_eq!(block_numbers(&collected), vec![100, 96, 92, 88, 85]);
408+
// [105, 100, 95, 90, 85] (all old collected logs dropped)
409+
assert_eq!(collected, vec![105, 100, 95, 90, 85]);
420410
}
421411

422412
#[test]
423-
fn collect_logs_returns_true_when_already_at_count() {
424-
let mut collected = vec![make_log(100), make_log(99), make_log(98)];
425-
let new_logs = vec![make_log(90)];
413+
fn collect_logs_ignores_new_logs_for_appending_when_already_at_count() {
414+
let mut collected = vec![100, 99, 98];
415+
let new_logs = vec![90];
426416

427417
let done = collect_logs(&mut collected, new_logs, 3, false);
428418

429419
assert!(done);
430-
// no change since already at count
431-
assert_eq!(block_numbers(&collected), vec![100, 99, 98]);
420+
assert_eq!(collected, vec![100, 99, 98]);
432421
}
433422

434423
#[test]
435424
fn collect_logs_prepend_respects_count_limit() {
436425
// count=3, have 1, prepending 4 logs
437-
let mut collected = vec![make_log(70)];
438-
let new_logs = vec![make_log(80), make_log(85), make_log(90), make_log(95)];
426+
let mut collected = vec![70];
427+
let new_logs = vec![80, 85, 90, 95];
439428

440429
let done = collect_logs(&mut collected, new_logs, 3, true);
441430

442431
assert!(done);
443432

444-
assert_eq!(block_numbers(&collected), vec![95, 90, 85]);
433+
assert_eq!(collected, vec![95, 90, 85]);
445434
}
446435
}

0 commit comments

Comments
 (0)