-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Description
1. Background and Motivation
1.1 Current State
The Druid Iceberg extension (druid-iceberg-extensions) supports ingesting data from Iceberg
tables via IcebergInputSource. Internally it:
- Connects to an Iceberg catalog (Hive, Glue, REST, or local)
- Lists data files up to the current (or a specified) snapshot
- Passes those raw file paths to a native Druid
InputSourceviawarehouseSource - Druid reads the Parquet files directly using the configured
InputFormat
This works correctly for Iceberg v1 tables because v1 only supports append operations —
every file in the table is a live data file.
1.2 The Problem
Iceberg v2 introduced row-level deletes. A v2 table can contain two additional file types
alongside data files:
| File Type | Content | Purpose |
|---|---|---|
| Positional Delete File | (file_path, row_position) pairs |
Delete the row at position N in file F |
| Equality Delete File | Column value sets | Delete any row where column values match |
When Druid reads an Iceberg v2 table today, it only reads the data files and completely
ignores delete files. As a result, deleted rows are silently included in ingested Druid segments.
1.3 Example
Snapshot 1 (append): data-001.parquet → rows: order_id=1, order_id=2, order_id=3
Snapshot 2 (delete): eq-delete-001.parquet → "delete where order_id = 2"
Today (v1-only behaviour):
Druid ingests all three rows including order_id=2, which was deleted in Iceberg.
Expected (v2 behaviour):
Druid should ingest only order_id=1 and order_id=3.
2. Proposed Design
In the V2 path, the Iceberg input source is used to obtain FileScanTask objects (data
file paths + associated delete file paths + table schema). That metadata is serialized into the
per-worker task spec. Workers open the associated data files and applies the position delete and equality delete files before converting the records to Druid InputRow.
2.1 High-Level Architecture
Proposed (v2-aware) flow:
IcebergInputSource
│
├─► IcebergCatalog.extractFileScanTasksWithSchema()
│ └─► tableScan.planFiles() → List<FileScanTask> + table Schema
│
└─► Are there deletes?
│
├─NO──► [V1 path] extract paths → warehouseSource.create(paths) (unchanged)
│
└─YES─► [V2 path] encode tasks as V2 splits
└─► creates a IcebergFileTaskInputSource object per split:
carries: data file path, delete file paths,
serialized table schema,
warehouseSource (for worker-side file access)
IcebergFileTaskInputSource
└─► IcebergNativeRecordReader
├─► Deserialize table schema from JSON
├─► Open files via WarehouseFileIO
├─► Read delete files, apply deletes row by row
└─► IcebergRecordConverter → MapBasedInputRow
2.2 Differentiate between V1/V2 during catalog read
At scan planning time, IcebergInputSource.retrieveIcebergDatafiles() checks whether any
returned FileScanTask carries delete files. Based on this, the following paths can be followed:
- No delete files (V1 path): File paths are extracted and handed to the existing
warehouseSourceinput source — identical to today's behaviour. - Delete files present (V2 path): Tasks are encoded as V2 splits, each carrying the data
file metadata, delete file metadata, and the serialized table schema.warehouseSourceis
passed through to eachIcebergFileTaskInputSourcefor use on the worker.
Please note that a v2-format table that has never had any rows deleted (no delete files) goes through the V1
path.
2.3 Handling Delete files
A new InputSourceReader implementation: IcebergNativeRecordReader will be used that reads each of the assigned Iceberg data file and applies any associated position-delete and equality-delete files before converting records to Druid InputRows.
Step 1: Deserialize table schema
The Iceberg Schema is deserialized from the JSON string embedded in
IcebergFileTaskInputSource using SchemaParser.fromJson(tableSchemaJson). This is the schema
that was captured from IcebergInputSource.
Step 2: Collect positional deletes
Each positional delete file is read using Iceberg's
DeleteSchemaUtil.pathPosSchema(). Only rows matching the current data file path are retained:
pos-delete-001.parquet:
file_path="data-001.parquet", pos=1
file_path="data-001.parquet", pos=3
file_path="data-002.parquet", pos=0 ← different file, ignored
Result: deletedPositions = {1, 3}
Step 3: Collect equality deletes
Each equality delete file is read using a schema projected to
its equality fields (identified by field IDs in the delete file metadata):
eq-delete-001.parquet (equality field: order_id, field ID=1):
order_id=1002
order_id=1005
Result: equalityDeleteSets = [ {fieldNames=["order_id"], keys={{"order_id":1002}, {"order_id":1005}}} ]
Step 4: Stream data file and apply both delete sets
For each record:
- If its 0-indexed position is in
deletedPositions→ skip. - Otherwise check each equality delete set → skip if matched.
- Surviving records are passed to
IcebergRecordConverter.
data-001.parquet (read via WarehouseFileIO):
pos=0 order_id=1001 → KEEP
pos=1 order_id=1002 → SKIP (positional delete)
pos=2 order_id=1003 → KEEP
pos=3 order_id=1004 → SKIP (positional delete)
pos=4 order_id=1005 → SKIP (equality delete)
Output: rows for order_id=1001 and order_id=1003
Step 5: Record-to-InputRow Conversion
IcebergRecordConverter converts an Iceberg GenericRecord to a Druid InputRow in two steps:
- Record → Map: iterate
Schema.columns(), callrecord.getField(name), apply type
conversion. - Map → InputRow: use
TimestampSpec.extractTimestamp()andDimensionsSpecto build a
MapBasedInputRow. If the dimensions list is empty (auto mode), all columns except the
timestamp column are used as dimensions.
No changes to the ingestion spec are required. Existing specs continue to work unchanged.
warehouseSource is used actively in both the V1 and V2 paths.
3. Limitations
3.1 Materializing all records from Datafile in Memory per Task
IcebergNativeRecordReader currently materializes all surviving records from a data file into a
List<InputRow> before returning the iterator. For very large data files this increases worker
heap pressure. Streaming row-by-row can be a candidate for follow-up optimization.
3.2 Already-Ingested Druid Segments Are Not Corrected
This proposal only fixes new ingestion runs which that rows deleted in Iceberg are not
included when Druid reads the table going forward.
It does not correct Druid segments that are already ingested into Druid. If a row was ingested into Druid at time T₁ and then deleted from Iceberg at
time T₂, the Druid segment written at T₁ still contains that row.
A full re-ingestion would do the right thing but incremental / append-based ingestion can accumulate stale segments for any rows that were deleted or updated in Iceberg between runs.
This gap will be addressed in a follow up Phase 2 proposal, which can do a Iceberg snapshot diff to detect which Druid time intervals are affected by Iceberg deletes/updates, and issues targeted MSQ REPLACE
tasks to overwrite those intervals with the current, correct Iceberg state.