Skip to content

Commit be24774

Browse files
committed
Add a simple notebook to test kafka streaming
1 parent 2a6ce1c commit be24774

File tree

1 file changed

+164
-0
lines changed

1 file changed

+164
-0
lines changed

notebooks/kafka_streaming.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import marimo
2+
3+
__generated_with = "0.17.0"
4+
app = marimo.App(width="medium")
5+
6+
7+
@app.cell
8+
def _():
9+
import marimo as mo
10+
from amp.client import Client
11+
return Client, mo
12+
13+
14+
@app.cell(hide_code=True)
15+
def _(mo):
16+
mo.md(
17+
r"""
18+
# Kafka Streaming Example
19+
20+
This notebook demonstrates continuous streaming from Flight SQL to Kafka with reorg detection.
21+
"""
22+
)
23+
return
24+
25+
26+
@app.cell(hide_code=True)
27+
def _(mo):
28+
mo.md(r"""## Setup""")
29+
return
30+
31+
32+
@app.cell
33+
def _(Client):
34+
client = Client('grpc://34.122.177.97:80')
35+
return (client,)
36+
37+
38+
@app.cell
39+
def _(client):
40+
client.configure_connection(
41+
'my_kafka',
42+
'kafka',
43+
{
44+
'bootstrap_servers': 'localhost:9092',
45+
'client_id': 'amp-streaming-client',
46+
'key_field': 'block_num'
47+
}
48+
)
49+
return
50+
51+
52+
@app.cell(hide_code=True)
53+
def _(mo):
54+
mo.md(
55+
r"""
56+
## Streaming Query
57+
58+
This query uses `SETTINGS stream = true` to continuously stream new blocks as they arrive.
59+
The loader will automatically handle blockchain reorganizations.
60+
"""
61+
)
62+
return
63+
64+
65+
@app.cell
66+
def _(client):
67+
streaming_results = client.sql(
68+
'''
69+
SELECT
70+
block_num,
71+
block_hash,
72+
timestamp,
73+
tx_hash,
74+
log_index,
75+
address,
76+
topic0
77+
FROM eth_firehose.logs
78+
WHERE block_num >= 20000000
79+
'''
80+
).load(
81+
'my_kafka',
82+
'eth_logs_stream',
83+
stream=True,
84+
create_table=True,
85+
)
86+
return (streaming_results,)
87+
88+
89+
@app.cell(hide_code=True)
90+
def _(mo):
91+
mo.md(
92+
r"""
93+
## Monitor Stream
94+
95+
This cell will continuously print results as they arrive. Press Ctrl+C to stop.
96+
"""
97+
)
98+
return
99+
100+
101+
@app.cell
102+
def _(streaming_results):
103+
batch_count = 0
104+
reorg_count = 0
105+
106+
for result in streaming_results:
107+
if result.is_reorg:
108+
reorg_count += 1
109+
print(f'🔄 REORG #{reorg_count}: {len(result.invalidation_ranges)} ranges invalidated')
110+
for r in result.invalidation_ranges:
111+
print(f' Network: {r.network}, Blocks: {r.start}-{r.end}')
112+
else:
113+
batch_count += 1
114+
print(f'✅ Batch #{batch_count}: {result.rows_loaded} rows, {result.ops_per_second:.0f} rows/sec')
115+
116+
if 'block_ranges' in result.metadata:
117+
for br in result.metadata['block_ranges']:
118+
print(f' Blocks: {br["start"]}-{br["end"]} ({br["network"]})')
119+
return
120+
121+
122+
@app.cell(hide_code=True)
123+
def _(mo):
124+
mo.md(r"""## Verify Messages in Kafka""")
125+
return
126+
127+
128+
@app.cell
129+
def _():
130+
from kafka import KafkaConsumer
131+
import json
132+
133+
consumer = KafkaConsumer(
134+
'eth_logs_stream',
135+
bootstrap_servers='localhost:9092',
136+
auto_offset_reset='earliest',
137+
consumer_timeout_ms=5000,
138+
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
139+
)
140+
141+
messages = []
142+
for message in consumer:
143+
messages.append(message.value)
144+
if len(messages) >= 10:
145+
break
146+
147+
consumer.close()
148+
149+
print(f"Latest {len(messages)} messages:")
150+
for i, msg in enumerate(messages):
151+
print(f"\n[{i+1}] Block {msg.get('block_num')} at {msg.get('timestamp')}")
152+
print(f" TX: {msg.get('tx_hash')}")
153+
print(f" Address: {msg.get('address')}")
154+
return
155+
156+
157+
158+
@app.cell
159+
def _():
160+
return
161+
162+
163+
if __name__ == "__main__":
164+
app.run()

0 commit comments

Comments
 (0)