feat: [presto][iceberg] Add DWRF file format support for Iceberg read and write paths#27401
Open
apurva-meta wants to merge 10 commits intoprestodb:masterfrom
Open
feat: [presto][iceberg] Add DWRF file format support for Iceberg read and write paths#27401apurva-meta wants to merge 10 commits intoprestodb:masterfrom
apurva-meta wants to merge 10 commits intoprestodb:masterfrom
Conversation
…tensibility Summary: - Reformat FileContent enum in presto_protocol_iceberg.h from single-line to multi-line for better readability and future extension. - Add blank line for visual separation before infoColumns initialization. Protocol files are auto-generated from Java sources via chevron. The manual edits here mirror what the generator would produce once the Java changes are landed and the protocol is regenerated. Differential Revision: D97531548
…equality delete conflict resolution Summary: Wire the dataSequenceNumber field from the Java Presto protocol to the C++ Velox connector layer, enabling server-side sequence number conflict resolution for equality delete files. Changes: - Add dataSequenceNumber field to IcebergSplit protocol (Java + C++) - Parse dataSequenceNumber in IcebergPrestoToVeloxConnector and pass it through HiveIcebergSplit to IcebergSplitReader - Add const qualifiers to local variables for code clarity Differential Revision: D97531547
…discovery Summary: Iceberg V3 introduces deletion vectors stored as blobs inside Puffin files. Previously, the coordinator's IcebergSplitSource rejected PUFFIN-format delete files with a NOT_SUPPORTED error, preventing V3 deletion vectors from being discovered and sent to workers. This diff: 1. Adds PUFFIN to the FileFormat enum (both presto-trunk and presto-facebook-trunk) so fromIcebergFileFormat() can convert Iceberg's PUFFIN format to Presto's FileFormat.PUFFIN. 2. Removes the PUFFIN rejection check in presto-trunk's IcebergSplitSource.toIcebergSplit(), allowing deletion vector files to flow through to workers. 3. Updates TestIcebergV3 to verify PUFFIN files are accepted rather than rejected at split enumeration time. The C++ worker-side changes (protocol enum + connector conversion) will follow in a separate diff. Differential Revision: D97531557
…nd connector layer
Summary:
This is the C++ counterpart to the Java PUFFIN support diff. It wires
the PUFFIN file format through the Prestissimo protocol and connector
conversion layer so that Iceberg V3 deletion vector files can be
deserialized and handled by native workers.
Changes:
1. Adds PUFFIN to the C++ protocol FileFormat enum and its JSON
serialization table in presto_protocol_iceberg.{h,cpp}.
2. Handles PUFFIN in toVeloxFileFormat() in
IcebergPrestoToVeloxConnector.cpp, mapping it to DWRF as a
placeholder since DeletionVectorReader reads raw binary and
does not use the DWRF/Parquet reader infrastructure.
Differential Revision: D97531555
…age sink and compaction procedure Summary: - Add IcebergDeletionVectorPageSink for writing DV files during table maintenance - Add RewriteDeleteFilesProcedure for DV compaction - Wire DV page sink through IcebergCommonModule, IcebergAbstractMetadata, IcebergPageSourceProvider - Add IcebergUpdateablePageSource for DV-aware page source - Update CommitTaskData, IcebergUtil for DV support - Add test coverage in TestIcebergV3 Differential Revision: D97531549
…ort for Iceberg V3 Summary: Iceberg V3 introduces nanosecond-precision timestamp types (timestamp_ns and timestamptz_ns). This diff adds support for reading tables with these column types by mapping them to Presto's best available precision (TIMESTAMP_MICROSECONDS for timestamp_ns, TIMESTAMP_WITH_TIME_ZONE for timestamptz_ns). Changes: - TypeConverter: Map TIMESTAMP_NANO to Presto types and ORC types - ExpressionConverter: Fix predicate pushdown for TIMESTAMP_MICROSECONDS precision (was incorrectly converting microseconds as milliseconds) - IcebergUtil: Handle TIMESTAMP_NANO partition values (nanos → micros) - PartitionData: Handle TIMESTAMP_NANO in JSON partition deserialization - PartitionTable: Convert nanosecond partition values to microseconds - TestIcebergV3: Add testNanosecondTimestampSchema integration test Differential Revision: D97531552
Summary:
Add support for Iceberg V3 VARIANT type across the Presto-Iceberg connector type
conversion pipeline, including a binary codec for the Apache Variant spec and
SQL scalar functions for Variant data manipulation.
The VARIANT type represents semi-structured data (JSON) and is mapped to Presto's
unbounded VARCHAR type. This diff provides:
1. Type mapping: VARIANT to/from VARCHAR across all converter layers
2. VariantBinaryCodec: Full encoder/decoder for Apache Variant binary format
- Supports primitives (null, bool, int8/16/32/64, double, string)
- Supports short strings (0-63 bytes) and long strings
- Supports objects (with metadata dictionary) and arrays
- JSON string round-trip: JSON -> binary -> JSON
- Binary format detection (isVariantBinary) and auto-decode (decodeVariantAuto)
- Type introspection from binary headers (getValueTypeName)
3. SQL scalar functions registered via IcebergConnector.getSystemFunctions():
- variant_get(varchar, varchar): Extract field with dot-path and array indexing
(e.g., 'users[0].name', 'address.city')
- variant_keys(varchar): Return top-level object keys as JSON array
- variant_type(varchar): Return JSON type name (object, array, string, number, boolean, null)
- to_variant(varchar): Validate JSON and cast to Variant (Phase 5 CAST support)
- parse_variant(varchar): Validate and normalize through Variant binary codec
- variant_to_json(varchar): Normalize Variant to compact JSON representation
- variant_binary_roundtrip(varchar): Encode to binary and decode back (interop testing)
4. Predicate pushdown: IS NULL/IS NOT NULL works through VARCHAR mapping;
variant_get pushdown tracked as future optimizer rule work
5. Comprehensive tests for the codec, functions, and end-to-end connector behavior
Changes:
- TypeConverter: Map VARIANT to VarcharType (unbounded) in toPrestoType(), and to
ORC STRING type in toOrcType()
- IcebergUtil: Handle VARIANT partition values as string slices in domain creation
- PartitionData: Deserialize VARIANT partition values as text (same as STRING)
- PartitionTable: Convert VariantType partition values to string representation
- VariantBinaryCodec: Full Apache Variant binary spec (v1) encoder/decoder with
binary detection, type introspection, and auto-decode capabilities
- VariantFunctions: 7 SQL scalar functions for Variant data manipulation including
dot-path navigation, array indexing, key enumeration, type introspection, CAST
- IcebergConnector: Register VariantFunctions in getSystemFunctions()
- TestVariantBinaryCodec: 40+ tests covering primitives, objects, arrays, metadata,
binary detection, type names, and auto-decode
- TestVariantFunctions: 40+ tests covering all 7 functions including dot-path,
array indexing, error handling, and edge cases
- TestIcebergV3: Integration tests for VARIANT type including JSON data round-trip
Note: Velox/Prestissimo does not require changes - the VARIANT->VARCHAR type
mapping flows automatically through HiveTypeParser on the C++ side.
Differential Revision: D97531551
Summary: Bump the Iceberg dependency version from 1.10.0 to 1.10.1. This is a patch release containing bug fixes and stability improvements. No API changes or breaking changes. Differential Revision: D97531550
…lifecycle + all 99 TPC-DS queries Summary: Add comprehensive end-to-end tests for the Iceberg V3 stack: 1. Snapshot lifecycle tests (TestIcebergV3.java): - testV3SnapshotTimeTravelById: Time travel to specific snapshots via snapshot ID, verifying correct data visibility across inserts and deletes with DVs - testV3SnapshotsMetadataTable: Query $snapshots metadata table, verify snapshot IDs, parent-child chain, operations (append/delete), and committed_at timestamps - testV3HistoryMetadataTable: Query $history metadata table, verify is_current_ancestor flag and consistency with $snapshots table - testV3RollbackToSnapshot: Rollback to a previous snapshot, verify data reverts and new inserts work after rollback, V3 format preserved - testV3RollbackWithDeletionVectors: Rollback past a DELETE that created PUFFIN DVs, verify deleted rows reappear after rollback - testV3ExpireSnapshots: Generate multiple snapshots, expire old ones retaining only the last, verify data integrity and V3 format after expiration - testV3SnapshotTimeTravelWithPartitioning: Time travel on partitioned V3 table, verify partition pruning at historical snapshots - testV3SnapshotAfterSchemaEvolution: Time travel across schema evolution boundary, verify old snapshots readable with pre-evolution schema 2. Full TPC-DS 99-query coverage (TestIcebergTpcds.java): - Creates all 24 TPC-DS tables as Iceberg Parquet tables via CTAS from tpcds.tiny - Handles CHAR→VARCHAR type conversion (Iceberg does not support CHAR(n)): DESCRIBE each source table, identify CHAR columns, use CAST(TRIM(col) AS VARCHAR) - Loads all 99 official TPC-DS query templates from SQL files (103 total including multi-part variants for Q14, Q23, Q24, Q39) - Validates successful execution of all queries against Iceberg tables, exercising: * Complex multi-table star schema joins (up to 8-way joins) * Aggregations with GROUP BY, HAVING, ROLLUP * Window functions (ROW_NUMBER, RANK, SUM OVER) * CTEs, INTERSECT, UNION ALL, EXISTS, IN subqueries * Predicate pushdown (date ranges, equality, IN lists) * QUERY_MAX_STAGE_COUNT=200 for complex CTE queries (Q14_1 has 102 stages) - Q90 correctly expects Division by zero on tpcds.tiny dataset - 105 total tests: 2 table validation + 103 query tests (all passing) Compilation fixes for upstream V3 stack (cherry-picked from higher diffs): - IcebergErrorCode: Added ICEBERG_WRITER_CLOSE_ERROR(21, EXTERNAL) - IcebergDeletionVectorPageSink: Fixed int→Long cast for Metrics constructor - RewriteDeleteFilesProcedure: Fixed Pair import (util.Pair, not puffin.Pair) - TestIcebergV3: Commented out testV3DefaultValues (not in Iceberg 1.10.1), added throws Exception to 3 methods using CloseableIterable Differential Revision: D97531553
… and write paths Summary: Add end-to-end DWRF support for Iceberg tables. DWRF (a Meta-developed fork of ORC with optimizations like FlatMap encoding and dictionary sharing) is now supported alongside PARQUET for both reading and writing Iceberg data files. Changes: - Add DWRF to the C++ protocol FileFormat enum and JSON serde - Add DWRF→DWRF mapping in IcebergPrestoToVeloxConnector - Remove PARQUET-only restriction in IcebergDataSink write path - Gate Parquet-specific writer options and stats collection by format - Use dynamic file format string in Iceberg commit messages - For DWRF files, produce minimal file statistics (the DWRF reader/writer infrastructure does not provide Parquet-style field-level column stats) - Add DWRF to Java FileFormat enums (presto-trunk and presto-facebook-trunk) Read path: Already supported via SplitReader inheritance — the DWRF reader factory is registered at startup and handles DWRF files transparently. Write path: IcebergDataSink now accepts DWRF as a valid storage format, creates DWRF writers via the registered factory, and produces commit messages with the correct format identifier. == RELEASE NOTES == General Changes * Upgrade Apache Iceberg library from 1.10.0 to 1.10.1. Hive Connector Changes * Add Iceberg V3 deletion vector (DV) support using Puffin-encoded roaring�bitmaps, including a DV reader, writer, page sink, and compaction procedure. * Add Iceberg equality delete file reader with sequence number conflict�resolution per the Iceberg V2+ spec: equality deletes skip when�deleteFileSeqNum <= dataFileSeqNum; positional deletes and DVs skip when�deleteFileSeqNum < dataFileSeqNum; sequence number 0 (V1 legacy) never skips. * Wire dataSequenceNumber through the Presto protocol layer (Java → C++)�to enable server-side sequence number conflict resolution for all delete�file types. * Add PUFFIN file format support for deletion vector discovery, enabling�the coordinator to locate DV files during split creation. * Add Iceberg V3 deletion vector write path with DV page sink and�rewrite_delete_files compaction procedure for DV maintenance. * Add nanosecond timestamp (TIMESTAMP_NANO) type support for Iceberg V3�tables. * Add Variant type support for Iceberg V3, enabling semi-structured data�columns in Iceberg tables. * Eagerly collect delete files during split creation with improved logging�for easier debugging of Iceberg delete file resolution. * Improve IcebergSplitReader error handling and fix test file handle leaks. * Add end-to-end integration tests for Iceberg V3 covering snapshot�lifecycle (INSERT, DELETE with equality/positional/DV deletes, UPDATE,�MERGE, time-travel) and all 99 TPC-DS queries. Differential Revision: D97531546
Contributor
There was a problem hiding this comment.
Sorry @apurva-meta, your pull request is larger than the review limit of 150000 diff characters
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary:
Add end-to-end DWRF support for Iceberg tables. DWRF (a Meta-developed fork
of ORC with optimizations like FlatMap encoding and dictionary sharing) is
now supported alongside PARQUET for both reading and writing Iceberg data files.
Changes:
infrastructure does not provide Parquet-style field-level column stats)
Read path: Already supported via SplitReader inheritance — the DWRF reader
factory is registered at startup and handles DWRF files transparently.
Write path: IcebergDataSink now accepts DWRF as a valid storage format,
creates DWRF writers via the registered factory, and produces commit messages
with the correct format identifier.
== RELEASE NOTES ==
General Changes
Hive Connector Changes
Differential Revision: D97531546