Skip to content

Commit d32373b

Browse files
committed
Add last_valid_hash to reorg messages
- Pass through block hash from server in reorg detection - Include last_valid_hash in Kafka reorg messages - Update integration tests to verify new field
1 parent 91a1a66 commit d32373b

File tree

3 files changed

+8
-3
lines changed

3 files changed

+8
-3
lines changed

src/amp/loaders/implementations/kafka_loader.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str,
151151
'network': invalidation_range.network,
152152
'start_block': invalidation_range.start,
153153
'end_block': invalidation_range.end,
154+
'last_valid_hash': invalidation_range.hash,
154155
}
155156

156157
self._producer.send(

src/amp/streaming/reorg.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def _detect_reorg(self, current_ranges: List[BlockRange]) -> List[BlockRange]:
107107
network=current_range.network,
108108
start=current_range.start,
109109
end=max(current_range.end, prev_range.end),
110+
hash=prev_range.hash,
110111
)
111112
invalidation_ranges.append(invalidation)
112113

tests/integration/test_kafka_loader.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ def test_handle_reorg(self, kafka_test_config):
112112
topic_name = 'test_reorg_topic'
113113

114114
invalidation_ranges = [
115-
BlockRange(network='ethereum', start=100, end=200),
116-
BlockRange(network='polygon', start=500, end=600),
115+
BlockRange(network='ethereum', start=100, end=200, hash='0xabc123'),
116+
BlockRange(network='polygon', start=500, end=600, hash='0xdef456'),
117117
]
118118

119119
with loader:
@@ -138,13 +138,15 @@ def test_handle_reorg(self, kafka_test_config):
138138
assert msg1.value['network'] == 'ethereum'
139139
assert msg1.value['start_block'] == 100
140140
assert msg1.value['end_block'] == 200
141+
assert msg1.value['last_valid_hash'] == '0xabc123'
141142

142143
msg2 = messages[1]
143144
assert msg2.key == b'reorg:polygon'
144145
assert msg2.value['_type'] == 'reorg'
145146
assert msg2.value['network'] == 'polygon'
146147
assert msg2.value['start_block'] == 500
147148
assert msg2.value['end_block'] == 600
149+
assert msg2.value['last_valid_hash'] == '0xdef456'
148150

149151
def test_streaming_with_reorg(self, kafka_test_config):
150152
loader = KafkaLoader(kafka_test_config)
@@ -165,7 +167,7 @@ def test_streaming_with_reorg(self, kafka_test_config):
165167
)
166168

167169
reorg_response = ResponseBatch.reorg_batch(
168-
invalidation_ranges=[BlockRange(network='ethereum', start=110, end=200)]
170+
invalidation_ranges=[BlockRange(network='ethereum', start=110, end=200, hash='0xdef456')]
169171
)
170172

171173
response3 = ResponseBatch.data_batch(
@@ -211,6 +213,7 @@ def test_streaming_with_reorg(self, kafka_test_config):
211213
assert reorg_messages[0].value['network'] == 'ethereum'
212214
assert reorg_messages[0].value['start_block'] == 110
213215
assert reorg_messages[0].value['end_block'] == 200
216+
assert reorg_messages[0].value['last_valid_hash'] == '0xdef456'
214217

215218
data_ids = [msg.value['id'] for msg in data_messages]
216219
assert data_ids == [1, 2, 3, 4, 5, 6]

0 commit comments

Comments
 (0)