-
Notifications
You must be signed in to change notification settings - Fork 1
Description
I have been thinking about use cases for quite some while and came to the conclusion that there is one more killer feature.
Imagine I have 1000 topics and I want to write them into a Lakehouse.
Is it really the right approach to execute 1000 times a tributary_scan_topic()? I guess not. Should be processed in parallel.
The most interesting use case is to copy all of Kafka into DuckDB.
I would think we should have subscribe(), poll() and commit() stored procedure.
Step one is to subscribe to a set of topics subscribe(['^BRONZE-.*, 'topic1', 'topic2']).
This executes a subscribe in the Kafka API, moving the offsets to the currently stored commit offset. Obviously we have to specify the connection parameters somewhere, like hosts, consumer group, etc.
Then a poll(1000sec, 1_000_000rows, 200sec idle) is executed, which collects all data for 1000 seconds or 1m rows or when there is no data for 200secs, whichever comes first.
This poll() calls the poll api of Kafka in a loop and for each found record it checks what the target table is, creates the table if needed, and inserts the record. The table name is either derived from the topic name, the schema name or the concat of both. Just like the different Confluent Subject Naming Strategies.
Note: I am using Avro and Schema Subject name strategy, meaning one topic can contain data for two DuckDB tables.
If a record is found for a new schema version of the same subject, the additional columns are added to the existing table, e.g. record1 was for the SALES table and the schema_id=123 and had 10 columns, record2 is for the same subject with schema_id=124 and has 11 columns, an alter table add column is executed.
The table also contains the schema_id, topic and the offset as additional columns, read from the Kafka message object.
After the poll I have lots of records in many tables and now it is time to do something with these. For example, my Python code then goes through the list of tables, for each found I create a Parquet file with the data and after the successful upload of the file I call commit('topic1', 0, 89327) to move the offset to the highest offset in the table for this topic/partition. Finally I truncate the processed table to empty it for the next round.
From an implementation point of view, the logic should be implemented multi threaded. So actually you have multiple threads, which each call the Kafka APIs for subscribe(), poll(). The commit() is executed by the thread that actually handles that partition.
Another complication are rebalance events but frankly, I would keep it simple here. If a rebalance happens, the thread gets the data from the currently stored commit offset in Kafka, hence write some data again into the table. Duplicates can happen, but these are better handled by the developer.