Skip to content

Commit 8164620

Browse files
committed
implement a Kafka loader
1 parent 7c66375 commit 8164620

File tree

9 files changed

+3158
-17
lines changed

9 files changed

+3158
-17
lines changed

.test.env

Whitespace-only changes.

notebooks/kafka_streaming.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import marimo
2+
3+
__generated_with = "0.17.0"
4+
app = marimo.App(width="medium")
5+
6+
7+
@app.cell
8+
def _():
9+
import marimo as mo
10+
from amp.client import Client
11+
return Client, mo
12+
13+
14+
@app.cell(hide_code=True)
15+
def _(mo):
16+
mo.md(
17+
r"""
18+
# Kafka Streaming Example
19+
20+
This notebook demonstrates continuous streaming from Flight SQL to Kafka with reorg detection.
21+
"""
22+
)
23+
return
24+
25+
26+
@app.cell(hide_code=True)
27+
def _(mo):
28+
mo.md(r"""## Setup""")
29+
return
30+
31+
32+
@app.cell
33+
def _(Client):
34+
client = Client('grpc://127.0.0.1:1602')
35+
return (client,)
36+
37+
38+
@app.cell
39+
def _(client):
40+
client.configure_connection(
41+
'my_kafka',
42+
'kafka',
43+
{
44+
'bootstrap_servers': 'localhost:9092',
45+
'client_id': 'amp-streaming-client',
46+
'key_field': 'block_num'
47+
}
48+
)
49+
return
50+
51+
52+
@app.cell(hide_code=True)
53+
def _(mo):
54+
mo.md(
55+
r"""
56+
## Streaming Query
57+
58+
This query uses `SETTINGS stream = true` to continuously stream new blocks as they arrive.
59+
The loader will automatically handle blockchain reorganizations.
60+
"""
61+
)
62+
return
63+
64+
65+
@app.cell
66+
def _(client):
67+
streaming_results = client.sql(
68+
'''
69+
SELECT
70+
block_num,
71+
log_index
72+
FROM anvil.logs
73+
'''
74+
).load(
75+
'my_kafka',
76+
'eth_logs_stream',
77+
stream=True,
78+
create_table=True,
79+
)
80+
return (streaming_results,)
81+
82+
83+
@app.cell(hide_code=True)
84+
def _(mo):
85+
mo.md(
86+
r"""
87+
## Monitor Stream
88+
89+
This cell will continuously print results as they arrive. It starts a Kafka consumer to print the results as they come in.
90+
"""
91+
)
92+
return
93+
94+
95+
@app.cell
96+
def _(streaming_results):
97+
import threading
98+
from kafka import KafkaConsumer
99+
import json
100+
101+
def consume_kafka():
102+
consumer = KafkaConsumer(
103+
'eth_logs_stream',
104+
bootstrap_servers='localhost:9092',
105+
auto_offset_reset='latest',
106+
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
107+
)
108+
print("Kafka Consumer started")
109+
for message in consumer:
110+
print(f"Consumed: {message.value}")
111+
112+
consumer_thread = threading.Thread(target=consume_kafka, daemon=True)
113+
consumer_thread.start()
114+
115+
print("Kafka Producer started")
116+
for result in streaming_results:
117+
print(f"Produced: {result}")
118+
return
119+
120+
121+
if __name__ == "__main__":
122+
app.run()

notebooks/test_loaders.py

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import marimo
22

3-
__generated_with = "0.14.16"
3+
__generated_with = "0.17.0"
44
app = marimo.App(width="full")
55

66

@@ -89,15 +89,15 @@ def _(mo):
8989
create_table=True,
9090
)
9191
""",
92-
name='_',
92+
name="_"
9393
)
9494

9595

9696
@app.cell
9797
def _(psql_load_results):
9898
for p_result in psql_load_results:
9999
print(p_result)
100-
return (p_result,)
100+
return
101101

102102

103103
@app.cell(hide_code=True)
@@ -120,7 +120,7 @@ def _(client):
120120
def _(redis_load_results):
121121
for r_result in redis_load_results:
122122
print(r_result)
123-
return (r_result,)
123+
return
124124

125125

126126
@app.cell(hide_code=True)
@@ -149,7 +149,7 @@ def _(client):
149149
else:
150150
# Single result
151151
print(f'Total: {result.rows_loaded} rows')
152-
return batch_result, result
152+
return (batch_result,)
153153

154154

155155
@app.cell
@@ -291,7 +291,7 @@ def _(lmdb_load_result):
291291
def _(batch_result, lmdb_load_result):
292292
for lmdb_batch_result in lmdb_load_result:
293293
print(f'Batch: {batch_result.rows_loaded} rows')
294-
return (lmdb_batch_result,)
294+
return
295295

296296

297297
@app.cell
@@ -325,7 +325,7 @@ def _(env):
325325
myList = [ key for key, _ in txn.cursor() ]
326326
print(myList)
327327
print(len(myList))
328-
return myList, txn
328+
return
329329

330330

331331
@app.cell
@@ -340,7 +340,74 @@ def _(env, pa):
340340
batch = reader.read_next_batch()
341341

342342
print(batch)
343-
return batch, key, open_txn, reader, value
343+
return
344+
345+
346+
@app.cell(hide_code=True)
347+
def _(mo):
348+
mo.md(r"""# Kafka""")
349+
return
350+
351+
352+
@app.cell
353+
def _(client):
354+
# Configure Kafka connection
355+
client.configure_connection(
356+
'my_kafka',
357+
'kafka',
358+
{
359+
'bootstrap_servers': 'localhost:9092',
360+
'client_id': 'amp-test-client',
361+
'key_field': 'id'
362+
}
363+
)
364+
return
365+
366+
367+
@app.cell
368+
def _(client):
369+
# Load data to Kafka topic
370+
kafka_load_results = client.sql('select * from eth_firehose.logs limit 100').load(
371+
'my_kafka',
372+
'test_logs',
373+
create_table=True,
374+
)
375+
return (kafka_load_results,)
376+
377+
378+
@app.cell
379+
def _(kafka_load_results):
380+
# Check results
381+
for k_result in kafka_load_results:
382+
print(f'Kafka batch: {k_result.rows_loaded} rows loaded, duration: {k_result.duration:.2f}s')
383+
return (k_result,)
384+
385+
386+
@app.cell
387+
def _():
388+
from kafka import KafkaConsumer
389+
import json
390+
391+
consumer = KafkaConsumer(
392+
'test_logs',
393+
bootstrap_servers='localhost:9092',
394+
auto_offset_reset='earliest',
395+
consumer_timeout_ms=3000,
396+
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
397+
)
398+
399+
messages = list(consumer)
400+
consumer.close()
401+
402+
print(f"Total messages in Kafka: {len(messages)}")
403+
print(f"\nFirst message:")
404+
if messages:
405+
msg = messages[0].value
406+
print(f" Block: {msg.get('block_num')}")
407+
print(f" Timestamp: {msg.get('timestamp')}")
408+
print(f" Address: {msg.get('address')}")
409+
410+
return
344411

345412

346413
@app.cell

pyproject.toml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,20 @@ lmdb = [
6262
"lmdb>=1.4.0",
6363
]
6464

65+
kafka = [
66+
"kafka-python>=2.2.15",
67+
]
68+
6569
all_loaders = [
66-
"psycopg2-binary>=2.9.0", # PostgreSQL
67-
"redis>=4.5.0", # Redis
68-
"deltalake>=1.0.2", # Delta Lake (consistent version)
70+
"psycopg2-binary>=2.9.0", # PostgreSQL
71+
"redis>=4.5.0", # Redis
72+
"deltalake>=1.0.2", # Delta Lake (consistent version)
6973
"pyiceberg[sql-sqlite]>=0.10.0", # Apache Iceberg
7074
"pydantic>=2.0,<2.12", # PyIceberg 0.10.0 compatibility
7175
"snowflake-connector-python>=4.0.0", # Snowflake
7276
"snowpipe-streaming>=1.0.0", # Snowpipe Streaming API
7377
"lmdb>=1.4.0", # LMDB
78+
"kafka-python>=2.2.15",
7479
]
7580

7681
test = [

src/amp/loaders/implementations/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
try:
2323
from .iceberg_loader import IcebergLoader
24-
except ImportError:
24+
except Exception:
2525
IcebergLoader = None
2626

2727
try:
@@ -34,11 +34,10 @@
3434
except ImportError:
3535
LMDBLoader = None
3636

37-
# Add any other loaders here
38-
# try:
39-
# from .snowflake_loader import SnowflakeLoader
40-
# except ImportError:
41-
# SnowflakeLoader = None
37+
try:
38+
from .kafka_loader import KafkaLoader
39+
except ImportError:
40+
KafkaLoader = None
4241

4342
__all__ = []
4443

@@ -55,3 +54,5 @@
5554
__all__.append('SnowflakeLoader')
5655
if LMDBLoader:
5756
__all__.append('LMDBLoader')
57+
if KafkaLoader:
58+
__all__.append('KafkaLoader')

0 commit comments

Comments
 (0)