Skip to content

Commit d32e3db

Browse files
committed
Add kafka streaming loader app
1 parent 37c3281 commit d32e3db

File tree

3 files changed

+338
-0
lines changed

3 files changed

+338
-0
lines changed

apps/kafka_streaming_loader.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Kafka streaming loader with label joining.
4+
Continuously loads ERC20 transfers to Kafka with token metadata.
5+
"""
6+
7+
import argparse
8+
import json
9+
import os
10+
import time
11+
from pathlib import Path
12+
13+
from amp.client import Client
14+
from amp.loaders.types import LabelJoinConfig
15+
from kafka import KafkaConsumer
16+
17+
18+
def consume_messages(kafka_brokers: str, topic: str, max_messages: int = 10):
19+
"""Consume and print messages from Kafka topic for testing."""
20+
print(f'\n{"=" * 60}')
21+
print('Consuming messages from Kafka')
22+
print(f'{"=" * 60}\n')
23+
print(f'Topic: {topic}')
24+
print(f'Brokers: {kafka_brokers}')
25+
print(f'Max messages: {max_messages}\n')
26+
27+
consumer = KafkaConsumer(
28+
topic,
29+
bootstrap_servers=kafka_brokers,
30+
auto_offset_reset='earliest',
31+
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
32+
consumer_timeout_ms=5000,
33+
group_id='kafka-streaming-loader-consumer',
34+
enable_auto_commit=True,
35+
)
36+
37+
def format_address(addr):
38+
"""Convert binary address to hex string."""
39+
if addr is None:
40+
return None
41+
if isinstance(addr, str):
42+
return addr
43+
if isinstance(addr, bytes):
44+
return '0x' + addr.hex()
45+
return addr
46+
47+
msg_count = 0
48+
for message in consumer:
49+
msg_count += 1
50+
data = message.value
51+
print(f'Message {msg_count}:')
52+
print(f' block_num: {data.get("block_num")}')
53+
print(f' token_address: {format_address(data.get("token_address"))}')
54+
print(f' symbol: {data.get("symbol")}')
55+
print(f' name: {data.get("name")}')
56+
print(f' decimals: {data.get("decimals")}')
57+
print(f' value: {data.get("value")}')
58+
print(f' from_address: {format_address(data.get("from_address"))}')
59+
print(f' to_address: {format_address(data.get("to_address"))}')
60+
print()
61+
62+
if msg_count >= max_messages:
63+
break
64+
65+
consumer.close()
66+
print(f'Consumed {msg_count} messages from Kafka topic "{topic}"')
67+
68+
69+
def main(
70+
kafka_brokers: str,
71+
topic: str,
72+
label_csv: str,
73+
amp_server: str,
74+
query_file: str,
75+
consume_mode: bool = False,
76+
consume_max: int = 10,
77+
):
78+
if consume_mode:
79+
consume_messages(kafka_brokers, topic, consume_max)
80+
return
81+
82+
print(f'Connecting to Amp server: {amp_server}')
83+
client = Client(amp_server)
84+
85+
label_path = Path(label_csv)
86+
if not label_path.exists():
87+
raise FileNotFoundError(f'Label CSV not found: {label_csv}')
88+
89+
client.configure_label('tokens', str(label_path))
90+
print(f'Loaded {len(client.label_manager.get_label("tokens"))} tokens from {label_csv}')
91+
92+
kafka_config = {
93+
'bootstrap_servers': kafka_brokers,
94+
'client_id': 'amp-kafka-loader',
95+
}
96+
client.configure_connection('kafka', 'kafka', kafka_config)
97+
98+
query_path = Path(query_file)
99+
if not query_path.exists():
100+
raise FileNotFoundError(f'Query file not found: {query_file}')
101+
102+
with open(query_path) as f:
103+
query = f.read()
104+
105+
label_config = LabelJoinConfig(
106+
label_name='tokens',
107+
label_key_column='token_address',
108+
stream_key_column='token_address',
109+
)
110+
111+
print(f'Starting Kafka streaming loader')
112+
print(f'Kafka brokers: {kafka_brokers}')
113+
print(f'Topic: {topic}')
114+
print('Press Ctrl+C to stop\n')
115+
116+
total_rows = 0
117+
batch_count = 0
118+
119+
for result in client.sql(query).load(
120+
connection='kafka',
121+
destination=topic,
122+
stream=True,
123+
label_config=label_config,
124+
):
125+
if result.success:
126+
total_rows += result.rows_loaded
127+
batch_count += 1
128+
print(f'Batch {batch_count}: {result.rows_loaded} rows in {result.duration:.2f}s (total: {total_rows})')
129+
else:
130+
print(f'Error: {result.error}')
131+
132+
133+
if __name__ == '__main__':
134+
parser = argparse.ArgumentParser(description='Stream ERC20 transfers to Kafka with token labels')
135+
parser.add_argument(
136+
'--kafka-brokers',
137+
default=os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
138+
help='Kafka bootstrap servers (default: localhost:9092 or KAFKA_BOOTSTRAP_SERVERS env var)',
139+
)
140+
parser.add_argument('--topic', default='erc20_transfers', help='Kafka topic name (default: erc20_transfers)')
141+
parser.add_argument(
142+
'--label-csv',
143+
default='data/eth_mainnet_token_metadata.csv',
144+
help='Path to token metadata CSV (default: data/eth_mainnet_token_metadata.csv)',
145+
)
146+
parser.add_argument(
147+
'--amp-server',
148+
default=os.getenv('AMP_SERVER_URL', 'grpc://34.27.238.174:80'),
149+
help='Amp server URL (default: grpc://34.27.238.174:80 or AMP_SERVER_URL env var)',
150+
)
151+
parser.add_argument(
152+
'--query-file',
153+
default='apps/queries/erc20_transfers.sql',
154+
help='Path to SQL query file (default: apps/queries/erc20_transfers.sql)',
155+
)
156+
parser.add_argument(
157+
'--consume',
158+
action='store_true',
159+
help='Consume mode: read and print messages from Kafka topic (for testing)',
160+
)
161+
parser.add_argument(
162+
'--consume-max',
163+
type=int,
164+
default=10,
165+
help='Maximum messages to consume in consume mode (default: 10)',
166+
)
167+
168+
args = parser.parse_args()
169+
170+
try:
171+
main(
172+
kafka_brokers=args.kafka_brokers,
173+
topic=args.topic,
174+
label_csv=args.label_csv,
175+
amp_server=args.amp_server,
176+
query_file=args.query_file,
177+
consume_mode=args.consume,
178+
consume_max=args.consume_max,
179+
)
180+
except KeyboardInterrupt:
181+
print('\n\nInterrupted by user')
182+
except Exception as e:
183+
print(f'\n\nError: {e}')
184+
import traceback
185+
186+
traceback.print_exc()
187+
raise
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
-- ERC20 Transfer Events Query for Streaming (from specific block)
2+
--
3+
-- This query decodes ERC20 Transfer events from raw Ethereum logs.
4+
-- Filters to only include blocks >= {start_block} for streaming from recent data.
5+
--
6+
-- Required columns for parallel loading:
7+
-- - block_num: Used for partitioning across workers
8+
--
9+
-- Label join column (if using --label-csv):
10+
-- - token_address: Binary address of the ERC20 token contract
11+
--
12+
-- Example usage:
13+
-- python apps/kafka_streaming_loader.py \
14+
-- --query-file apps/queries/erc20_transfers_streaming.sql \
15+
-- --start-block 21000000
16+
17+
select
18+
pc.block_num,
19+
pc.block_hash,
20+
pc.timestamp,
21+
pc.tx_hash,
22+
pc.tx_index,
23+
pc.log_index,
24+
pc.address as token_address,
25+
pc.dec['from'] as from_address,
26+
pc.dec['to'] as to_address,
27+
pc.dec['value'] as value
28+
from (
29+
select
30+
l.block_num,
31+
l.block_hash,
32+
l.tx_hash,
33+
l.tx_index,
34+
l.log_index,
35+
l.timestamp,
36+
l.address,
37+
evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'Transfer(address indexed from, address indexed to, uint256 value)') as dec
38+
from eth_firehose.logs l
39+
where
40+
l.topic0 = evm_topic('Transfer(address indexed from, address indexed to, uint256 value)') and
41+
l.topic3 IS NULL and
42+
l.block_num >= {start_block}
43+
) pc

apps/test_kafka_query.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Test ERC20 query with label joining
4+
"""
5+
6+
import os
7+
8+
from amp.client import Client
9+
from amp.loaders.types import LabelJoinConfig
10+
11+
# Connect to Amp server
12+
server_url = os.getenv('AMP_SERVER_URL', 'grpc://34.27.238.174:80')
13+
print(f'Connecting to {server_url}...')
14+
client = Client(server_url)
15+
print('✅ Connected!')
16+
17+
# Simple ERC20 transfer query
18+
transfer_sig = 'Transfer(address indexed from, address indexed to, uint256 value)'
19+
query = f"""
20+
SELECT
21+
block_num,
22+
tx_hash,
23+
address as token_address,
24+
evm_decode(topic1, topic2, topic3, data, '{transfer_sig}')['from'] as from_address,
25+
evm_decode(topic1, topic2, topic3, data, '{transfer_sig}')['to'] as to_address,
26+
evm_decode(topic1, topic2, topic3, data, '{transfer_sig}')['value'] as value
27+
FROM eth_firehose.logs
28+
WHERE topic0 = evm_topic('{transfer_sig}')
29+
AND topic3 IS NULL
30+
LIMIT 10
31+
"""
32+
33+
print('\nRunning query...')
34+
result = client.get_sql(query, read_all=True)
35+
36+
print(f'Got {result.num_rows} rows')
37+
print(f'Columns: {result.schema.names}')
38+
39+
print('Testing label join')
40+
41+
42+
csv_path = 'data/eth_mainnet_token_metadata.csv'
43+
client.configure_label('tokens', csv_path)
44+
print(f'Loaded {len(client.label_manager.get_label("tokens"))} tokens from CSV')
45+
46+
label_config = LabelJoinConfig(
47+
label_name='tokens',
48+
label_key_column='token_address',
49+
stream_key_column='token_address',
50+
)
51+
52+
print('Configured label join: will add symbol, name, decimals columns')
53+
54+
55+
print('Loading to Kafka with labels')
56+
57+
58+
kafka_config = {
59+
'bootstrap_servers': os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
60+
'client_id': 'amp-erc20-loader',
61+
}
62+
client.configure_connection('kafka', 'kafka', kafka_config)
63+
64+
results = list(
65+
client.sql(query).load(
66+
connection='kafka',
67+
destination='erc20_transfers',
68+
label_config=label_config,
69+
)
70+
)
71+
72+
total_rows = sum(r.rows_loaded for r in results if r.success)
73+
print(f'\nLoaded {total_rows} enriched rows to Kafka topic "erc20_transfers"')
74+
75+
print('\n' + '=' * 60)
76+
print('Reading back from Kafka')
77+
print('=' * 60)
78+
79+
from kafka import KafkaConsumer
80+
import json
81+
import time
82+
83+
time.sleep(1)
84+
85+
consumer = KafkaConsumer(
86+
'erc20_transfers',
87+
bootstrap_servers=kafka_config['bootstrap_servers'],
88+
auto_offset_reset='earliest',
89+
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
90+
consumer_timeout_ms=5000,
91+
)
92+
93+
print(f'\nConsuming messages from topic "erc20_transfers":\n')
94+
msg_count = 0
95+
for message in consumer:
96+
msg_count += 1
97+
data = message.value
98+
print(f'Message {msg_count}:')
99+
print(f' token_address: {data.get("token_address")}')
100+
print(f' symbol: {data.get("symbol")}')
101+
print(f' name: {data.get("name")}')
102+
print(f' decimals: {data.get("decimals")}')
103+
print(f' value: {data.get("value")}')
104+
print(f' from_address: {data.get("from_address")}')
105+
print()
106+
107+
consumer.close()
108+
print(f'Read {msg_count} messages from Kafka')

0 commit comments

Comments
 (0)