Skip to content

Commit 71de79f

Browse files
committed
Add kafka streaming loader app
1 parent 1a2c152 commit 71de79f

File tree

3 files changed

+336
-0
lines changed

3 files changed

+336
-0
lines changed

apps/kafka_streaming_loader.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Kafka streaming loader with label joining.
4+
Continuously loads ERC20 transfers to Kafka with token metadata.
5+
"""
6+
import argparse
7+
import json
8+
import os
9+
import time
10+
from pathlib import Path
11+
12+
from amp.client import Client
13+
from amp.loaders.types import LabelJoinConfig
14+
from kafka import KafkaConsumer
15+
16+
17+
def consume_messages(kafka_brokers: str, topic: str, max_messages: int = 10):
18+
"""Consume and print messages from Kafka topic for testing."""
19+
print(f'\n{"="*60}')
20+
print('Consuming messages from Kafka')
21+
print(f'{"="*60}\n')
22+
print(f'Topic: {topic}')
23+
print(f'Brokers: {kafka_brokers}')
24+
print(f'Max messages: {max_messages}\n')
25+
26+
consumer = KafkaConsumer(
27+
topic,
28+
bootstrap_servers=kafka_brokers,
29+
auto_offset_reset='earliest',
30+
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
31+
consumer_timeout_ms=5000,
32+
group_id='kafka-streaming-loader-consumer',
33+
enable_auto_commit=True,
34+
)
35+
36+
def format_address(addr):
37+
"""Convert binary address to hex string."""
38+
if addr is None:
39+
return None
40+
if isinstance(addr, str):
41+
return addr
42+
if isinstance(addr, bytes):
43+
return '0x' + addr.hex()
44+
return addr
45+
46+
msg_count = 0
47+
for message in consumer:
48+
msg_count += 1
49+
data = message.value
50+
print(f'Message {msg_count}:')
51+
print(f' block_num: {data.get("block_num")}')
52+
print(f' token_address: {format_address(data.get("token_address"))}')
53+
print(f' symbol: {data.get("symbol")}')
54+
print(f' name: {data.get("name")}')
55+
print(f' decimals: {data.get("decimals")}')
56+
print(f' value: {data.get("value")}')
57+
print(f' from_address: {format_address(data.get("from_address"))}')
58+
print(f' to_address: {format_address(data.get("to_address"))}')
59+
print()
60+
61+
if msg_count >= max_messages:
62+
break
63+
64+
consumer.close()
65+
print(f'Consumed {msg_count} messages from Kafka topic "{topic}"')
66+
67+
68+
def main(
69+
kafka_brokers: str,
70+
topic: str,
71+
label_csv: str,
72+
amp_server: str,
73+
query_file: str,
74+
consume_mode: bool = False,
75+
consume_max: int = 10,
76+
):
77+
if consume_mode:
78+
consume_messages(kafka_brokers, topic, consume_max)
79+
return
80+
81+
print(f'Connecting to Amp server: {amp_server}')
82+
client = Client(amp_server)
83+
84+
label_path = Path(label_csv)
85+
if not label_path.exists():
86+
raise FileNotFoundError(f'Label CSV not found: {label_csv}')
87+
88+
client.configure_label('tokens', str(label_path))
89+
print(f'Loaded {len(client.label_manager.get_label("tokens"))} tokens from {label_csv}')
90+
91+
kafka_config = {
92+
'bootstrap_servers': kafka_brokers,
93+
'client_id': 'amp-kafka-loader',
94+
}
95+
client.configure_connection('kafka', 'kafka', kafka_config)
96+
97+
query_path = Path(query_file)
98+
if not query_path.exists():
99+
raise FileNotFoundError(f'Query file not found: {query_file}')
100+
101+
with open(query_path) as f:
102+
query = f.read()
103+
104+
label_config = LabelJoinConfig(
105+
label_name='tokens',
106+
label_key_column='token_address',
107+
stream_key_column='token_address',
108+
)
109+
110+
print(f'Starting Kafka streaming loader')
111+
print(f'Kafka brokers: {kafka_brokers}')
112+
print(f'Topic: {topic}')
113+
print('Press Ctrl+C to stop\n')
114+
115+
total_rows = 0
116+
batch_count = 0
117+
118+
for result in client.sql(query).load(
119+
connection='kafka',
120+
destination=topic,
121+
stream=True,
122+
label_config=label_config,
123+
):
124+
if result.success:
125+
total_rows += result.rows_loaded
126+
batch_count += 1
127+
print(f'Batch {batch_count}: {result.rows_loaded} rows in {result.duration:.2f}s (total: {total_rows})')
128+
else:
129+
print(f'Error: {result.error}')
130+
131+
132+
if __name__ == '__main__':
133+
parser = argparse.ArgumentParser(description='Stream ERC20 transfers to Kafka with token labels')
134+
parser.add_argument(
135+
'--kafka-brokers',
136+
default=os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
137+
help='Kafka bootstrap servers (default: localhost:9092 or KAFKA_BOOTSTRAP_SERVERS env var)',
138+
)
139+
parser.add_argument('--topic', default='erc20_transfers', help='Kafka topic name (default: erc20_transfers)')
140+
parser.add_argument(
141+
'--label-csv',
142+
default='data/eth_mainnet_token_metadata.csv',
143+
help='Path to token metadata CSV (default: data/eth_mainnet_token_metadata.csv)',
144+
)
145+
parser.add_argument(
146+
'--amp-server',
147+
default=os.getenv('AMP_SERVER_URL', 'grpc://34.27.238.174:80'),
148+
help='Amp server URL (default: grpc://34.27.238.174:80 or AMP_SERVER_URL env var)',
149+
)
150+
parser.add_argument(
151+
'--query-file',
152+
default='apps/queries/erc20_transfers.sql',
153+
help='Path to SQL query file (default: apps/queries/erc20_transfers.sql)',
154+
)
155+
parser.add_argument(
156+
'--consume',
157+
action='store_true',
158+
help='Consume mode: read and print messages from Kafka topic (for testing)',
159+
)
160+
parser.add_argument(
161+
'--consume-max',
162+
type=int,
163+
default=10,
164+
help='Maximum messages to consume in consume mode (default: 10)',
165+
)
166+
167+
args = parser.parse_args()
168+
169+
try:
170+
main(
171+
kafka_brokers=args.kafka_brokers,
172+
topic=args.topic,
173+
label_csv=args.label_csv,
174+
amp_server=args.amp_server,
175+
query_file=args.query_file,
176+
consume_mode=args.consume,
177+
consume_max=args.consume_max,
178+
)
179+
except KeyboardInterrupt:
180+
print('\n\nInterrupted by user')
181+
except Exception as e:
182+
print(f'\n\nError: {e}')
183+
import traceback
184+
185+
traceback.print_exc()
186+
raise
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
-- ERC20 Transfer Events Query for Streaming (from specific block)
2+
--
3+
-- This query decodes ERC20 Transfer events from raw Ethereum logs.
4+
-- Filters to only include blocks >= {start_block} for streaming from recent data.
5+
--
6+
-- Required columns for parallel loading:
7+
-- - block_num: Used for partitioning across workers
8+
--
9+
-- Label join column (if using --label-csv):
10+
-- - token_address: Binary address of the ERC20 token contract
11+
--
12+
-- Example usage:
13+
-- python apps/kafka_streaming_loader.py \
14+
-- --query-file apps/queries/erc20_transfers_streaming.sql \
15+
-- --start-block 21000000
16+
17+
select
18+
pc.block_num,
19+
pc.block_hash,
20+
pc.timestamp,
21+
pc.tx_hash,
22+
pc.tx_index,
23+
pc.log_index,
24+
pc.address as token_address,
25+
pc.dec['from'] as from_address,
26+
pc.dec['to'] as to_address,
27+
pc.dec['value'] as value
28+
from (
29+
select
30+
l.block_num,
31+
l.block_hash,
32+
l.tx_hash,
33+
l.tx_index,
34+
l.log_index,
35+
l.timestamp,
36+
l.address,
37+
evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'Transfer(address indexed from, address indexed to, uint256 value)') as dec
38+
from eth_firehose.logs l
39+
where
40+
l.topic0 = evm_topic('Transfer(address indexed from, address indexed to, uint256 value)') and
41+
l.topic3 IS NULL and
42+
l.block_num >= {start_block}
43+
) pc

apps/test_kafka_query.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Test ERC20 query with label joining
4+
"""
5+
import os
6+
7+
from amp.client import Client
8+
from amp.loaders.types import LabelJoinConfig
9+
10+
# Connect to Amp server
11+
server_url = os.getenv('AMP_SERVER_URL', 'grpc://34.27.238.174:80')
12+
print(f'Connecting to {server_url}...')
13+
client = Client(server_url)
14+
print('✅ Connected!')
15+
16+
# Simple ERC20 transfer query
17+
transfer_sig = 'Transfer(address indexed from, address indexed to, uint256 value)'
18+
query = f"""
19+
SELECT
20+
block_num,
21+
tx_hash,
22+
address as token_address,
23+
evm_decode(topic1, topic2, topic3, data, '{transfer_sig}')['from'] as from_address,
24+
evm_decode(topic1, topic2, topic3, data, '{transfer_sig}')['to'] as to_address,
25+
evm_decode(topic1, topic2, topic3, data, '{transfer_sig}')['value'] as value
26+
FROM eth_firehose.logs
27+
WHERE topic0 = evm_topic('{transfer_sig}')
28+
AND topic3 IS NULL
29+
LIMIT 10
30+
"""
31+
32+
print('\nRunning query...')
33+
result = client.get_sql(query, read_all=True)
34+
35+
print(f'Got {result.num_rows} rows')
36+
print(f'Columns: {result.schema.names}')
37+
38+
print('Testing label join')
39+
40+
41+
csv_path = 'data/eth_mainnet_token_metadata.csv'
42+
client.configure_label('tokens', csv_path)
43+
print(f'Loaded {len(client.label_manager.get_label("tokens"))} tokens from CSV')
44+
45+
label_config = LabelJoinConfig(
46+
label_name='tokens',
47+
label_key_column='token_address',
48+
stream_key_column='token_address',
49+
)
50+
51+
print('Configured label join: will add symbol, name, decimals columns')
52+
53+
54+
print('Loading to Kafka with labels')
55+
56+
57+
kafka_config = {
58+
'bootstrap_servers': os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
59+
'client_id': 'amp-erc20-loader',
60+
}
61+
client.configure_connection('kafka', 'kafka', kafka_config)
62+
63+
results = list(
64+
client.sql(query).load(
65+
connection='kafka',
66+
destination='erc20_transfers',
67+
label_config=label_config,
68+
)
69+
)
70+
71+
total_rows = sum(r.rows_loaded for r in results if r.success)
72+
print(f'\nLoaded {total_rows} enriched rows to Kafka topic "erc20_transfers"')
73+
74+
print('\n' + '='*60)
75+
print('Reading back from Kafka')
76+
print('='*60)
77+
78+
from kafka import KafkaConsumer
79+
import json
80+
import time
81+
82+
time.sleep(1)
83+
84+
consumer = KafkaConsumer(
85+
'erc20_transfers',
86+
bootstrap_servers=kafka_config['bootstrap_servers'],
87+
auto_offset_reset='earliest',
88+
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
89+
consumer_timeout_ms=5000,
90+
)
91+
92+
print(f'\nConsuming messages from topic "erc20_transfers":\n')
93+
msg_count = 0
94+
for message in consumer:
95+
msg_count += 1
96+
data = message.value
97+
print(f'Message {msg_count}:')
98+
print(f' token_address: {data.get("token_address")}')
99+
print(f' symbol: {data.get("symbol")}')
100+
print(f' name: {data.get("name")}')
101+
print(f' decimals: {data.get("decimals")}')
102+
print(f' value: {data.get("value")}')
103+
print(f' from_address: {data.get("from_address")}')
104+
print()
105+
106+
consumer.close()
107+
print(f'Read {msg_count} messages from Kafka')

0 commit comments

Comments
 (0)