Skip to content

Commit cdb1f28

Browse files
committed
Add cross-restart reorg detection using persistent state
- Initialize ReorgAwareStream from resume_watermark to restore block hashes - Add hash mismatch detection: compare stored hash with server's prev_hash - Pass resume_watermark from client.py to ReorgAwareStream - Add 6 unit tests for cross-restart reorg detection scenarios This enables detecting chain reorgs that occur while the process is down by comparing the stored block hash (from state store) with the server's prev_hash.
1 parent 8ce25f1 commit cdb1f28

File tree

3 files changed

+146
-11
lines changed

3 files changed

+146
-11
lines changed

src/amp/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,7 @@ def query_and_load_streaming(
817817

818818
# Optionally wrap with reorg detection
819819
if with_reorg_detection:
820-
stream_iterator = ReorgAwareStream(stream_iterator)
820+
stream_iterator = ReorgAwareStream(stream_iterator, resume_watermark=resume_watermark)
821821
self.logger.info('Reorg detection enabled for streaming query')
822822

823823
# Start continuous loading with checkpoint support

src/amp/streaming/reorg.py

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Dict, Iterator, List
77

88
from .iterator import StreamingResultIterator
9-
from .types import BlockRange, ResponseBatch
9+
from .types import BlockRange, ResponseBatch, ResumeWatermark
1010

1111

1212
class ReorgAwareStream:
@@ -16,20 +16,32 @@ class ReorgAwareStream:
1616
This class monitors the block ranges in consecutive batches to detect chain
1717
reorganizations (reorgs). When a reorg is detected, a ResponseBatch with
1818
is_reorg=True is emitted containing the invalidation ranges.
19+
20+
Supports cross-restart reorg detection by initializing from a resume watermark
21+
that contains the last known block hashes from persistent state.
1922
"""
2023

21-
def __init__(self, stream_iterator: StreamingResultIterator):
24+
def __init__(self, stream_iterator: StreamingResultIterator, resume_watermark: ResumeWatermark = None):
2225
"""
2326
Initialize the reorg-aware stream.
2427
2528
Args:
2629
stream_iterator: The underlying streaming result iterator
30+
resume_watermark: Optional watermark from persistent state (LMDB) containing
31+
last known block ranges with hashes for cross-restart reorg detection
2732
"""
2833
self.stream_iterator = stream_iterator
29-
# Track the latest range for each network
3034
self.prev_ranges_by_network: Dict[str, BlockRange] = {}
3135
self.logger = logging.getLogger(__name__)
3236

37+
if resume_watermark:
38+
for block_range in resume_watermark.ranges:
39+
self.prev_ranges_by_network[block_range.network] = block_range
40+
self.logger.debug(
41+
f'Initialized reorg detection for {block_range.network} '
42+
f'from block {block_range.end} hash {block_range.hash}'
43+
)
44+
3345
def __iter__(self) -> Iterator[ResponseBatch]:
3446
"""Return iterator instance"""
3547
return self
@@ -89,9 +101,9 @@ def _detect_reorg(self, current_ranges: List[BlockRange]) -> List[BlockRange]:
89101
"""
90102
Detect reorganizations by comparing current ranges with previous ranges.
91103
92-
A reorg is detected when:
93-
- A range starts at or before the end of the previous range for the same network
94-
- The range is different from the previous range
104+
A reorg is detected when either:
105+
1. Block number overlap: current range starts at or before previous range end
106+
2. Hash mismatch: server's prev_hash doesn't match our stored hash (cross-restart detection)
95107
96108
Args:
97109
current_ranges: Block ranges from the current batch
@@ -102,18 +114,39 @@ def _detect_reorg(self, current_ranges: List[BlockRange]) -> List[BlockRange]:
102114
invalidation_ranges = []
103115

104116
for current_range in current_ranges:
105-
# Get the previous range for this network
106117
prev_range = self.prev_ranges_by_network.get(current_range.network)
107118

108119
if prev_range:
109-
# Check if this indicates a reorg
120+
is_reorg = False
121+
122+
# Detection 1: Block number overlap (original logic)
110123
if current_range != prev_range and current_range.start <= prev_range.end:
111-
# Reorg detected - create invalidation range
112-
# Invalidate from the start of the current range to the max end
124+
is_reorg = True
125+
self.logger.info(
126+
f'Reorg detected via block overlap: {current_range.network} '
127+
f'current start {current_range.start} <= prev end {prev_range.end}'
128+
)
129+
130+
# Detection 2: Hash mismatch (cross-restart detection)
131+
# Server sends prev_hash = hash of block before current range
132+
# If it doesn't match our stored hash, chain has changed
133+
elif (
134+
current_range.prev_hash is not None
135+
and prev_range.hash is not None
136+
and current_range.prev_hash != prev_range.hash
137+
):
138+
is_reorg = True
139+
self.logger.info(
140+
f'Reorg detected via hash mismatch: {current_range.network} '
141+
f'server prev_hash {current_range.prev_hash} != stored hash {prev_range.hash}'
142+
)
143+
144+
if is_reorg:
113145
invalidation = BlockRange(
114146
network=current_range.network,
115147
start=current_range.start,
116148
end=max(current_range.end, prev_range.end),
149+
hash=prev_range.hash,
117150
)
118151
invalidation_ranges.append(invalidation)
119152

tests/unit/test_streaming_types.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,3 +619,105 @@ class MockIterator:
619619
stream = ReorgAwareStream(MockIterator())
620620

621621
assert stream._is_duplicate_batch([]) == False
622+
623+
def test_init_from_resume_watermark(self):
624+
"""Test initialization from resume watermark for cross-restart reorg detection"""
625+
626+
class MockIterator:
627+
pass
628+
629+
watermark = ResumeWatermark(
630+
ranges=[
631+
BlockRange(network='ethereum', start=100, end=200, hash='0xabc123'),
632+
BlockRange(network='polygon', start=50, end=150, hash='0xdef456'),
633+
]
634+
)
635+
636+
stream = ReorgAwareStream(MockIterator(), resume_watermark=watermark)
637+
638+
assert 'ethereum' in stream.prev_ranges_by_network
639+
assert 'polygon' in stream.prev_ranges_by_network
640+
assert stream.prev_ranges_by_network['ethereum'].hash == '0xabc123'
641+
assert stream.prev_ranges_by_network['polygon'].hash == '0xdef456'
642+
643+
def test_detect_reorg_hash_mismatch(self):
644+
"""Test reorg detection via hash mismatch (cross-restart detection)"""
645+
646+
class MockIterator:
647+
pass
648+
649+
stream = ReorgAwareStream(MockIterator())
650+
651+
stream.prev_ranges_by_network = {
652+
'ethereum': BlockRange(network='ethereum', start=100, end=200, hash='0xoriginal'),
653+
}
654+
655+
current_ranges = [
656+
BlockRange(network='ethereum', start=201, end=300, prev_hash='0xdifferent'),
657+
]
658+
659+
invalidations = stream._detect_reorg(current_ranges)
660+
661+
assert len(invalidations) == 1
662+
assert invalidations[0].network == 'ethereum'
663+
assert invalidations[0].hash == '0xoriginal'
664+
665+
def test_detect_reorg_hash_match_no_reorg(self):
666+
"""Test no reorg when hashes match across restart"""
667+
668+
class MockIterator:
669+
pass
670+
671+
stream = ReorgAwareStream(MockIterator())
672+
673+
stream.prev_ranges_by_network = {
674+
'ethereum': BlockRange(network='ethereum', start=100, end=200, hash='0xsame'),
675+
}
676+
677+
current_ranges = [
678+
BlockRange(network='ethereum', start=201, end=300, prev_hash='0xsame'),
679+
]
680+
681+
invalidations = stream._detect_reorg(current_ranges)
682+
683+
assert len(invalidations) == 0
684+
685+
def test_detect_reorg_hash_mismatch_with_none_prev_hash(self):
686+
"""Test no reorg detection when server prev_hash is None (genesis block)"""
687+
688+
class MockIterator:
689+
pass
690+
691+
stream = ReorgAwareStream(MockIterator())
692+
693+
stream.prev_ranges_by_network = {
694+
'ethereum': BlockRange(network='ethereum', start=0, end=0, hash='0xgenesis'),
695+
}
696+
697+
current_ranges = [
698+
BlockRange(network='ethereum', start=1, end=100, prev_hash=None),
699+
]
700+
701+
invalidations = stream._detect_reorg(current_ranges)
702+
703+
assert len(invalidations) == 0
704+
705+
def test_detect_reorg_hash_mismatch_with_none_stored_hash(self):
706+
"""Test no reorg detection when stored hash is None"""
707+
708+
class MockIterator:
709+
pass
710+
711+
stream = ReorgAwareStream(MockIterator())
712+
713+
stream.prev_ranges_by_network = {
714+
'ethereum': BlockRange(network='ethereum', start=100, end=200, hash=None),
715+
}
716+
717+
current_ranges = [
718+
BlockRange(network='ethereum', start=201, end=300, prev_hash='0xsome_hash'),
719+
]
720+
721+
invalidations = stream._detect_reorg(current_ranges)
722+
723+
assert len(invalidations) == 0

0 commit comments

Comments
 (0)