Skip to content

Commit 46b7bdf

Browse files
committed
implement a simple kafka loader
1 parent 3d5c462 commit 46b7bdf

File tree

6 files changed

+137
-15
lines changed

6 files changed

+137
-15
lines changed

.test.env

Whitespace-only changes.

notebooks/test_loaders.py

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import marimo
22

3-
__generated_with = "0.14.16"
3+
__generated_with = "0.17.0"
44
app = marimo.App(width="full")
55

66

@@ -20,7 +20,7 @@ def _():
2020

2121
@app.cell
2222
def _(Client):
23-
client = Client('grpc://127.0.0.1')
23+
client = Client('grpc://34.122.177.97:80')
2424
client.configure_connection('my_pg', 'postgresql', {'host': 'localhost', 'database': 'loaders_testing', 'user': 'username', 'password': 'pass', 'port': '5432'})
2525
client.configure_connection('my_redis', 'redis', {'host': 'localhost', 'port': 6379, 'password': 'mypassword'})
2626
return (client,)
@@ -89,15 +89,15 @@ def _(mo):
8989
create_table=True,
9090
)
9191
""",
92-
name='_',
92+
name="_"
9393
)
9494

9595

9696
@app.cell
9797
def _(psql_load_results):
9898
for p_result in psql_load_results:
9999
print(p_result)
100-
return (p_result,)
100+
return
101101

102102

103103
@app.cell(hide_code=True)
@@ -120,7 +120,7 @@ def _(client):
120120
def _(redis_load_results):
121121
for r_result in redis_load_results:
122122
print(r_result)
123-
return (r_result,)
123+
return
124124

125125

126126
@app.cell(hide_code=True)
@@ -149,7 +149,7 @@ def _(client):
149149
else:
150150
# Single result
151151
print(f'Total: {result.rows_loaded} rows')
152-
return batch_result, result
152+
return (batch_result,)
153153

154154

155155
@app.cell
@@ -291,7 +291,7 @@ def _(lmdb_load_result):
291291
def _(batch_result, lmdb_load_result):
292292
for lmdb_batch_result in lmdb_load_result:
293293
print(f'Batch: {batch_result.rows_loaded} rows')
294-
return (lmdb_batch_result,)
294+
return
295295

296296

297297
@app.cell
@@ -325,7 +325,7 @@ def _(env):
325325
myList = [ key for key, _ in txn.cursor() ]
326326
print(myList)
327327
print(len(myList))
328-
return myList, txn
328+
return
329329

330330

331331
@app.cell
@@ -340,7 +340,74 @@ def _(env, pa):
340340
batch = reader.read_next_batch()
341341

342342
print(batch)
343-
return batch, key, open_txn, reader, value
343+
return
344+
345+
346+
@app.cell(hide_code=True)
347+
def _(mo):
348+
mo.md(r"""# Kafka""")
349+
return
350+
351+
352+
@app.cell
353+
def _(client):
354+
# Configure Kafka connection
355+
client.configure_connection(
356+
'my_kafka',
357+
'kafka',
358+
{
359+
'bootstrap_servers': 'localhost:9092',
360+
'client_id': 'amp-test-client',
361+
'key_field': 'id'
362+
}
363+
)
364+
return
365+
366+
367+
@app.cell
368+
def _(client):
369+
# Load data to Kafka topic
370+
kafka_load_results = client.sql('select * from eth_firehose.logs limit 100').load(
371+
'my_kafka',
372+
'test_logs',
373+
create_table=True,
374+
)
375+
return (kafka_load_results,)
376+
377+
378+
@app.cell
379+
def _(kafka_load_results):
380+
# Check results
381+
for k_result in kafka_load_results:
382+
print(f'Kafka batch: {k_result.rows_loaded} rows loaded, duration: {k_result.duration:.2f}s')
383+
return (k_result,)
384+
385+
386+
@app.cell
387+
def _():
388+
from kafka import KafkaConsumer
389+
import json
390+
391+
consumer = KafkaConsumer(
392+
'test_logs',
393+
bootstrap_servers='localhost:9092',
394+
auto_offset_reset='earliest',
395+
consumer_timeout_ms=3000,
396+
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
397+
)
398+
399+
messages = list(consumer)
400+
consumer.close()
401+
402+
print(f"Total messages in Kafka: {len(messages)}")
403+
print(f"\nFirst message:")
404+
if messages:
405+
msg = messages[0].value
406+
print(f" Block: {msg.get('block_num')}")
407+
print(f" Timestamp: {msg.get('timestamp')}")
408+
print(f" Address: {msg.get('address')}")
409+
410+
return
344411

345412

346413
@app.cell

src/amp/client.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,13 @@ def _load_table(
228228
except Exception as e:
229229
self.logger.error(f'Failed to load table: {e}')
230230
return LoadResult(
231-
rows_loaded=0, duration=0.0, table_name=table_name, loader_type=loader, success=False, error=str(e)
231+
rows_loaded=0,
232+
duration=0.0,
233+
ops_per_second=0.0,
234+
table_name=table_name,
235+
loader_type=loader,
236+
success=False,
237+
error=str(e),
232238
)
233239

234240
def _load_stream(
@@ -248,7 +254,13 @@ def _load_stream(
248254
except Exception as e:
249255
self.logger.error(f'Failed to load stream: {e}')
250256
yield LoadResult(
251-
rows_loaded=0, duration=0.0, table_name=table_name, loader_type=loader, success=False, error=str(e)
257+
rows_loaded=0,
258+
duration=0.0,
259+
ops_per_second=0.0,
260+
table_name=table_name,
261+
loader_type=loader,
262+
success=False,
263+
error=str(e),
252264
)
253265

254266
def query_and_load_streaming(

src/amp/loaders/implementations/iceberg_loader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def _validate_schema_compatibility(self, iceberg_table: IcebergTable, arrow_sche
258258
# Evolution mode: evolve schema to accommodate new fields
259259
self._evolve_schema_if_needed(iceberg_table, iceberg_schema, arrow_schema)
260260

261-
def _validate_schema_strict(self, iceberg_schema: IcebergSchema, arrow_schema: pa.Schema) -> None:
261+
def _validate_schema_strict(self, iceberg_schema: 'IcebergSchema', arrow_schema: pa.Schema) -> None:
262262
"""Validate schema compatibility in strict mode (no evolution)"""
263263
iceberg_field_names = {field.name for field in iceberg_schema.fields}
264264
arrow_field_names = {field.name for field in arrow_schema}
@@ -279,7 +279,7 @@ def _validate_schema_strict(self, iceberg_schema: IcebergSchema, arrow_schema: p
279279
self.logger.debug('Schema validation passed in strict mode')
280280

281281
def _evolve_schema_if_needed(
282-
self, iceberg_table: IcebergTable, iceberg_schema: IcebergSchema, arrow_schema: pa.Schema
282+
self, iceberg_table: IcebergTable, iceberg_schema: 'IcebergSchema', arrow_schema: pa.Schema
283283
) -> None:
284284
"""Evolve the Iceberg table schema to accommodate new Arrow schema fields"""
285285
try:

src/amp/loaders/implementations/kafka_loader.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,37 @@ def disconnect(self) -> None:
6060
self.logger.info('Disconnected from Kafka')
6161

6262
def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> int:
63-
pass
63+
if not self._producer:
64+
raise RuntimeError('Producer not connected. Call connect() first.')
65+
66+
data_dict = batch.to_pydict()
67+
num_rows = batch.num_rows
68+
69+
if num_rows == 0:
70+
return 0
71+
72+
for i in range(num_rows):
73+
row = {field: values[i] for field, values in data_dict.items()}
74+
75+
key = self._extract_message_key(row)
76+
77+
self._producer.send(topic=table_name, key=key, value=row)
78+
79+
self._producer.flush()
80+
81+
self.logger.debug(f'Sent {num_rows} messages to topic {table_name}')
82+
83+
return num_rows
84+
85+
def _extract_message_key(self, row: Dict[str, Any]) -> Optional[bytes]:
86+
if not self.config.key_field or self.config.key_field not in row:
87+
return None
88+
89+
key_value = row[self.config.key_field]
90+
if key_value is None:
91+
return None
92+
93+
return str(key_value).encode('utf-8')
6494

6595
def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str) -> None:
6696
pass

tests/integration/test_kafka_loader.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pyarrow as pa
12
import pytest
23

34
try:
@@ -9,7 +10,6 @@
910
@pytest.mark.integration
1011
@pytest.mark.kafka
1112
class TestKafkaLoaderIntegration:
12-
1313
def test_loader_connection(self, kafka_test_config):
1414
loader = KafkaLoader(kafka_test_config)
1515

@@ -29,3 +29,16 @@ def test_context_manager(self, kafka_test_config):
2929
assert loader._producer is not None
3030

3131
assert loader._is_connected == False
32+
33+
def test_load_batch(self, kafka_test_config):
34+
loader = KafkaLoader(kafka_test_config)
35+
36+
batch = pa.RecordBatch.from_pydict(
37+
{'id': [1, 2, 3], 'name': ['alice', 'bob', 'charlie'], 'value': [100, 200, 300]}
38+
)
39+
40+
with loader:
41+
result = loader.load_batch(batch, 'test_topic')
42+
43+
assert result.success == True
44+
assert result.rows_loaded == 3

0 commit comments

Comments
 (0)