Skip to content

Commit c770308

Browse files
committed
Add --reorg-topic option for separate reorg message topic
Allows reorg messages to be sent to a different Kafka topic than data messages. Required for Snowflake Kafka connector which needs strict schema per topic. Defaults to same topic if not specified.
1 parent 3b16f82 commit c770308

File tree

3 files changed

+60
-6
lines changed

3 files changed

+60
-6
lines changed

apps/kafka_streaming_loader.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def main(
8484
max_retries: int = 5,
8585
retry_delay: float = 1.0,
8686
kafka_config: dict = None,
87+
reorg_topic: str = None,
8788
):
8889
def connect():
8990
return Client(amp_server, auth=auth, auth_token=auth_token)
@@ -105,6 +106,8 @@ def connect():
105106
'client_id': 'amp-kafka-loader',
106107
'state': {'enabled': True, 'storage': 'lmdb', 'data_dir': state_dir},
107108
}
109+
if reorg_topic:
110+
connection_config['reorg_topic'] = reorg_topic
108111
if kafka_config:
109112
connection_config.update(kafka_config)
110113
client.configure_connection('kafka', 'kafka', connection_config)
@@ -146,6 +149,7 @@ def stream_batches():
146149
parser.add_argument('--amp-server', default=os.getenv('AMP_SERVER_URL', 'grpc://127.0.0.1:1602'))
147150
parser.add_argument('--kafka-brokers', default='localhost:9092')
148151
parser.add_argument('--topic', required=True)
152+
parser.add_argument('--reorg-topic', help='Separate topic for reorg messages (default: same as --topic)')
149153
parser.add_argument('--query-file', required=True)
150154
parser.add_argument(
151155
'--raw-dataset', required=True, help='Dataset name for the raw dataset of the chain (e.g., anvil, eth_firehose)'
@@ -200,6 +204,7 @@ def stream_batches():
200204
max_retries=args.max_retries,
201205
retry_delay=args.retry_delay,
202206
kafka_config=kafka_config or None,
207+
reorg_topic=args.reorg_topic,
203208
)
204209
except KeyboardInterrupt:
205210
logger.info('Stopped by user')

src/amp/loaders/implementations/kafka_loader.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ class KafkaConfig:
1515
bootstrap_servers: str
1616
client_id: str = 'amp-kafka-loader'
1717
key_field: Optional[str] = 'id'
18+
reorg_topic: Optional[str] = None
1819

1920

20-
KAFKA_CONFIG_FIELDS = {'bootstrap_servers', 'client_id', 'key_field'}
21+
KAFKA_CONFIG_FIELDS = {'bootstrap_servers', 'client_id', 'key_field', 'reorg_topic'}
2122
RESERVED_CONFIG_FIELDS = {'resilience', 'state', 'checkpoint', 'idempotency'}
2223

2324

@@ -126,14 +127,14 @@ def _extract_message_key(self, row: Dict[str, Any]) -> Optional[bytes]:
126127

127128
def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str, connection_name: str) -> None:
128129
"""
129-
Handle blockchain reorganization by sending reorg events to the same topic.
130+
Handle blockchain reorganization by sending reorg events to Kafka.
130131
131132
Reorg events are sent as special messages with _type='reorg' so consumers
132133
can detect and handle invalidated block ranges.
133134
134135
Args:
135136
invalidation_ranges: List of block ranges to invalidate
136-
table_name: The Kafka topic name to send reorg events to
137+
table_name: The Kafka topic name (used if reorg_topic not configured)
137138
connection_name: Connection identifier (unused for Kafka, but required by base class)
138139
"""
139140
if not invalidation_ranges:
@@ -143,6 +144,8 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str,
143144
self.logger.warning('Producer not connected, skipping reorg handling')
144145
return
145146

147+
reorg_topic = self.config.reorg_topic or table_name
148+
146149
self._producer.begin_transaction()
147150
try:
148151
for invalidation_range in invalidation_ranges:
@@ -155,16 +158,16 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str,
155158
}
156159

157160
self._producer.send(
158-
topic=table_name, key=f'reorg:{invalidation_range.network}'.encode('utf-8'), value=reorg_message
161+
topic=reorg_topic, key=f'reorg:{invalidation_range.network}'.encode('utf-8'), value=reorg_message
159162
)
160163

161164
self.logger.info(
162-
f'Sent reorg event to {table_name}: '
165+
f'Sent reorg event to {reorg_topic}: '
163166
f'{invalidation_range.network} blocks {invalidation_range.start}-{invalidation_range.end}'
164167
)
165168

166169
self._producer.commit_transaction()
167-
self.logger.info(f'Committed {len(invalidation_ranges)} reorg events to {table_name}')
170+
self.logger.info(f'Committed {len(invalidation_ranges)} reorg events to {reorg_topic}')
168171

169172
except Exception as e:
170173
self._producer.abort_transaction()

tests/integration/test_kafka_loader.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,52 @@ def test_handle_reorg(self, kafka_test_config):
148148
assert msg2.value['end_block'] == 600
149149
assert msg2.value['last_valid_hash'] == '0xdef456'
150150

151+
def test_handle_reorg_separate_topic(self, kafka_test_config):
152+
config_with_reorg_topic = {
153+
**kafka_test_config,
154+
'reorg_topic': 'test_reorg_events',
155+
}
156+
loader = KafkaLoader(config_with_reorg_topic)
157+
data_topic = 'test_data_topic_separate'
158+
159+
batch = pa.RecordBatch.from_pydict({'id': [1, 2], 'value': [100, 200]})
160+
invalidation_ranges = [
161+
BlockRange(network='ethereum', start=100, end=200, hash='0xabc123'),
162+
]
163+
164+
with loader:
165+
loader.load_batch(batch, data_topic)
166+
loader._handle_reorg(invalidation_ranges, data_topic, 'test_connection')
167+
168+
data_consumer = KafkaConsumer(
169+
data_topic,
170+
bootstrap_servers=kafka_test_config['bootstrap_servers'],
171+
auto_offset_reset='earliest',
172+
consumer_timeout_ms=5000,
173+
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
174+
)
175+
data_messages = list(data_consumer)
176+
data_consumer.close()
177+
178+
assert len(data_messages) == 2
179+
assert all(msg.value['_type'] == 'data' for msg in data_messages)
180+
181+
reorg_consumer = KafkaConsumer(
182+
'test_reorg_events',
183+
bootstrap_servers=kafka_test_config['bootstrap_servers'],
184+
auto_offset_reset='earliest',
185+
consumer_timeout_ms=5000,
186+
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
187+
)
188+
reorg_messages = list(reorg_consumer)
189+
reorg_consumer.close()
190+
191+
assert len(reorg_messages) == 1
192+
assert reorg_messages[0].value['_type'] == 'reorg'
193+
assert reorg_messages[0].value['network'] == 'ethereum'
194+
assert reorg_messages[0].value['start_block'] == 100
195+
assert reorg_messages[0].value['end_block'] == 200
196+
151197
def test_streaming_with_reorg(self, kafka_test_config):
152198
loader = KafkaLoader(kafka_test_config)
153199
topic_name = 'test_streaming_topic'

0 commit comments

Comments
 (0)