Skip to content

Commit 8539dc7

Browse files
committed
Add a simple kafka consumer script
1 parent 11ad52f commit 8539dc7

File tree

2 files changed

+76
-1
lines changed

2 files changed

+76
-1
lines changed

apps/kafka_consumer.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#!/usr/bin/env python3
2+
"""Simple Kafka consumer script to print messages from a topic in real-time.
3+
4+
Messages are consumed from a consumer group, so subsequent runs will only show
5+
new messages. Press Ctrl+C to exit cleanly.
6+
7+
Usage:
8+
python kafka_consumer.py [topic] [broker] [group_id]
9+
10+
Examples:
11+
python kafka_consumer.py
12+
python kafka_consumer.py anvil_logs
13+
python kafka_consumer.py anvil_logs localhost:9092
14+
python kafka_consumer.py anvil_logs localhost:9092 my-group
15+
"""
16+
17+
import json
18+
import sys
19+
from datetime import datetime
20+
21+
from kafka import KafkaConsumer
22+
23+
topic = sys.argv[1] if len(sys.argv) > 1 else 'anvil_logs'
24+
broker = sys.argv[2] if len(sys.argv) > 2 else 'localhost:9092'
25+
group_id = sys.argv[3] if len(sys.argv) > 3 else 'kafka-consumer-cli'
26+
27+
print(f'Consuming from: {broker} -> topic: {topic}')
28+
print(f'Consumer group: {group_id}')
29+
print(f'Started at: {datetime.now().strftime("%H:%M:%S")}')
30+
print('-' * 80)
31+
32+
consumer = KafkaConsumer(
33+
topic,
34+
bootstrap_servers=broker,
35+
group_id=group_id,
36+
auto_offset_reset='earliest',
37+
enable_auto_commit=True,
38+
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
39+
)
40+
41+
msg_count = 0
42+
data_count = 0
43+
reorg_count = 0
44+
45+
try:
46+
for msg in consumer:
47+
msg_count += 1
48+
msg_type = msg.value.get('_type', 'unknown')
49+
50+
if msg_type == 'data':
51+
data_count += 1
52+
print(f'\nMessage #{msg_count} [DATA] - Key: {msg.key.decode() if msg.key else "None"}')
53+
print(f'Offset: {msg.offset} | Partition: {msg.partition}')
54+
55+
for k, v in msg.value.items():
56+
if k != '_type':
57+
print(f'{k}: {v}')
58+
59+
elif msg_type == 'reorg':
60+
reorg_count += 1
61+
print(f'\nMessage #{msg_count} [REORG] - Key: {msg.key.decode() if msg.key else "None"}')
62+
print(f'Network: {msg.value.get("network")}')
63+
print(f'Blocks: {msg.value.get("start_block")} -> {msg.value.get("end_block")}')
64+
65+
else:
66+
print(f'\nMessage #{msg_count} [UNKNOWN]')
67+
print(json.dumps(msg.value, indent=2))
68+
69+
print(f'\nTotal: {msg_count} msgs | Data: {data_count} | Reorgs: {reorg_count}')
70+
print('-' * 80)
71+
72+
except KeyboardInterrupt:
73+
print('\n\nStopped')
74+
finally:
75+
consumer.close()

apps/kafka_streaming_loader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def main(
6363
query = f.read()
6464

6565
if start_block is None:
66-
start_block = get_latest_block(client, raw_dataset) - 10
66+
start_block = get_latest_block(client, raw_dataset)
6767

6868
print(f'Starting from block {start_block}')
6969

0 commit comments

Comments
 (0)