|
| 1 | +import marimo |
| 2 | + |
| 3 | +__generated_with = "0.17.0" |
| 4 | +app = marimo.App(width="medium") |
| 5 | + |
| 6 | + |
| 7 | +@app.cell |
| 8 | +def _(): |
| 9 | + import marimo as mo |
| 10 | + from amp.client import Client |
| 11 | + return Client, mo |
| 12 | + |
| 13 | + |
| 14 | +@app.cell(hide_code=True) |
| 15 | +def _(mo): |
| 16 | + mo.md( |
| 17 | + r""" |
| 18 | + # Kafka Streaming Example |
| 19 | +
|
| 20 | + This notebook demonstrates continuous streaming from Flight SQL to Kafka with reorg detection. |
| 21 | + """ |
| 22 | + ) |
| 23 | + return |
| 24 | + |
| 25 | + |
| 26 | +@app.cell(hide_code=True) |
| 27 | +def _(mo): |
| 28 | + mo.md(r"""## Setup""") |
| 29 | + return |
| 30 | + |
| 31 | + |
| 32 | +@app.cell |
| 33 | +def _(Client): |
| 34 | + client = Client('grpc://34.122.177.97:80') |
| 35 | + return (client,) |
| 36 | + |
| 37 | + |
| 38 | +@app.cell |
| 39 | +def _(client): |
| 40 | + client.configure_connection( |
| 41 | + 'my_kafka', |
| 42 | + 'kafka', |
| 43 | + { |
| 44 | + 'bootstrap_servers': 'localhost:9092', |
| 45 | + 'client_id': 'amp-streaming-client', |
| 46 | + 'key_field': 'block_num' |
| 47 | + } |
| 48 | + ) |
| 49 | + return |
| 50 | + |
| 51 | + |
| 52 | +@app.cell(hide_code=True) |
| 53 | +def _(mo): |
| 54 | + mo.md( |
| 55 | + r""" |
| 56 | + ## Streaming Query |
| 57 | +
|
| 58 | + This query uses `SETTINGS stream = true` to continuously stream new blocks as they arrive. |
| 59 | + The loader will automatically handle blockchain reorganizations. |
| 60 | + """ |
| 61 | + ) |
| 62 | + return |
| 63 | + |
| 64 | + |
| 65 | +@app.cell |
| 66 | +def _(client): |
| 67 | + streaming_results = client.sql( |
| 68 | + ''' |
| 69 | + SELECT |
| 70 | + block_num, |
| 71 | + block_hash, |
| 72 | + timestamp, |
| 73 | + tx_hash, |
| 74 | + log_index, |
| 75 | + address, |
| 76 | + topic0 |
| 77 | + FROM eth_firehose.logs |
| 78 | + WHERE block_num >= 20000000 |
| 79 | + ''' |
| 80 | + ).load( |
| 81 | + 'my_kafka', |
| 82 | + 'eth_logs_stream', |
| 83 | + stream=True, |
| 84 | + create_table=True, |
| 85 | + ) |
| 86 | + return (streaming_results,) |
| 87 | + |
| 88 | + |
| 89 | +@app.cell(hide_code=True) |
| 90 | +def _(mo): |
| 91 | + mo.md( |
| 92 | + r""" |
| 93 | + ## Monitor Stream |
| 94 | +
|
| 95 | + This cell will continuously print results as they arrive. Press Ctrl+C to stop. |
| 96 | + """ |
| 97 | + ) |
| 98 | + return |
| 99 | + |
| 100 | + |
| 101 | +@app.cell |
| 102 | +def _(streaming_results): |
| 103 | + batch_count = 0 |
| 104 | + reorg_count = 0 |
| 105 | + |
| 106 | + for result in streaming_results: |
| 107 | + if result.is_reorg: |
| 108 | + reorg_count += 1 |
| 109 | + print(f'🔄 REORG #{reorg_count}: {len(result.invalidation_ranges)} ranges invalidated') |
| 110 | + for r in result.invalidation_ranges: |
| 111 | + print(f' Network: {r.network}, Blocks: {r.start}-{r.end}') |
| 112 | + else: |
| 113 | + batch_count += 1 |
| 114 | + print(f'✅ Batch #{batch_count}: {result.rows_loaded} rows, {result.ops_per_second:.0f} rows/sec') |
| 115 | + |
| 116 | + if 'block_ranges' in result.metadata: |
| 117 | + for br in result.metadata['block_ranges']: |
| 118 | + print(f' Blocks: {br["start"]}-{br["end"]} ({br["network"]})') |
| 119 | + return |
| 120 | + |
| 121 | + |
| 122 | +@app.cell(hide_code=True) |
| 123 | +def _(mo): |
| 124 | + mo.md(r"""## Verify Messages in Kafka""") |
| 125 | + return |
| 126 | + |
| 127 | + |
| 128 | +@app.cell |
| 129 | +def _(): |
| 130 | + from kafka import KafkaConsumer |
| 131 | + import json |
| 132 | + |
| 133 | + consumer = KafkaConsumer( |
| 134 | + 'eth_logs_stream', |
| 135 | + bootstrap_servers='localhost:9092', |
| 136 | + auto_offset_reset='earliest', |
| 137 | + consumer_timeout_ms=5000, |
| 138 | + value_deserializer=lambda m: json.loads(m.decode('utf-8')) |
| 139 | + ) |
| 140 | + |
| 141 | + messages = [] |
| 142 | + for message in consumer: |
| 143 | + messages.append(message.value) |
| 144 | + if len(messages) >= 10: |
| 145 | + break |
| 146 | + |
| 147 | + consumer.close() |
| 148 | + |
| 149 | + print(f"Latest {len(messages)} messages:") |
| 150 | + for i, msg in enumerate(messages): |
| 151 | + print(f"\n[{i+1}] Block {msg.get('block_num')} at {msg.get('timestamp')}") |
| 152 | + print(f" TX: {msg.get('tx_hash')}") |
| 153 | + print(f" Address: {msg.get('address')}") |
| 154 | + return |
| 155 | + |
| 156 | + |
| 157 | + |
| 158 | +@app.cell |
| 159 | +def _(): |
| 160 | + return |
| 161 | + |
| 162 | + |
| 163 | +if __name__ == "__main__": |
| 164 | + app.run() |
0 commit comments