Skip to content

Commit c73cb15

Browse files
apurva-metafacebook-github-bot
authored andcommitted
feat: [prestissimo][iceberg] Wire PUFFIN file format through C++ protocol and connector layer (#27394)
Summary: This is the C++ counterpart to the Java PUFFIN support diff. It wires the PUFFIN file format through the Prestissimo protocol and connector conversion layer so that Iceberg V3 deletion vector files can be deserialized and handled by native workers. Changes: 1. Adds PUFFIN to the C++ protocol FileFormat enum and its JSON serialization table in presto_protocol_iceberg.{h,cpp}. 2. Handles PUFFIN in toVeloxFileFormat() in IcebergPrestoToVeloxConnector.cpp, mapping it to DWRF as a placeholder since DeletionVectorReader reads raw binary and does not use the DWRF/Parquet reader infrastructure. == RELEASE NOTES == General Changes * Upgrade Apache Iceberg library from 1.10.0 to 1.10.1. Hive Connector Changes * Add Iceberg V3 deletion vector (DV) support using Puffin-encoded roaring�bitmaps, including a DV reader, writer, page sink, and compaction procedure. * Add Iceberg equality delete file reader with sequence number conflict�resolution per the Iceberg V2+ spec: equality deletes skip when�deleteFileSeqNum <= dataFileSeqNum; positional deletes and DVs skip when�deleteFileSeqNum < dataFileSeqNum; sequence number 0 (V1 legacy) never skips. * Wire dataSequenceNumber through the Presto protocol layer (Java → C++)�to enable server-side sequence number conflict resolution for all delete�file types. * Add PUFFIN file format support for deletion vector discovery, enabling�the coordinator to locate DV files during split creation. * Add Iceberg V3 deletion vector write path with DV page sink and�rewrite_delete_files compaction procedure for DV maintenance. * Add nanosecond timestamp (TIMESTAMP_NANO) type support for Iceberg V3�tables. * Add Variant type support for Iceberg V3, enabling semi-structured data�columns in Iceberg tables. * Eagerly collect delete files during split creation with improved logging�for easier debugging of Iceberg delete file resolution. * Improve IcebergSplitReader error handling and fix test file handle leaks. * Add end-to-end integration tests for Iceberg V3 covering snapshot�lifecycle (INSERT, DELETE with equality/positional/DV deletes, UPDATE,�MERGE, time-travel) and all 99 TPC-DS queries. Differential Revision: D97531555
1 parent d24f722 commit c73cb15

File tree

3 files changed

+30
-9
lines changed

3 files changed

+30
-9
lines changed

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ velox::dwio::common::FileFormat toVeloxFileFormat(
4242
return velox::dwio::common::FileFormat::ORC;
4343
} else if (format == protocol::iceberg::FileFormat::PARQUET) {
4444
return velox::dwio::common::FileFormat::PARQUET;
45+
} else if (format == protocol::iceberg::FileFormat::PUFFIN) {
46+
// PUFFIN is used for Iceberg V3 deletion vectors. The DeletionVectorReader
47+
// reads raw binary from the file and does not use the DWRF/Parquet reader,
48+
// so we map PUFFIN to DWRF as a placeholder — the format value is not
49+
// actually used by the reader. This mapping is only safe for deletion
50+
// vector files; if PUFFIN is encountered for other file content types,
51+
// the DV routing logic in toHiveIcebergSplit() must reclassify it first.
52+
return velox::dwio::common::FileFormat::DWRF;
4553
}
4654
VELOX_UNSUPPORTED("Unsupported file format: {}", fmt::underlying(format));
4755
}
@@ -173,7 +181,7 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
173181
const protocol::ConnectorId& catalogId,
174182
const protocol::ConnectorSplit* connectorSplit,
175183
const protocol::SplitContext* splitContext) const {
176-
auto icebergSplit =
184+
const auto* icebergSplit =
177185
dynamic_cast<const protocol::iceberg::IcebergSplit*>(connectorSplit);
178186
VELOX_CHECK_NOT_NULL(
179187
icebergSplit, "Unexpected split type {}", connectorSplit->_type);
@@ -196,14 +204,27 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
196204
std::vector<velox::connector::hive::iceberg::IcebergDeleteFile> deletes;
197205
deletes.reserve(icebergSplit->deletes.size());
198206
for (const auto& deleteFile : icebergSplit->deletes) {
199-
std::unordered_map<int32_t, std::string> lowerBounds(
207+
const std::unordered_map<int32_t, std::string> lowerBounds(
200208
deleteFile.lowerBounds.begin(), deleteFile.lowerBounds.end());
201209

202-
std::unordered_map<int32_t, std::string> upperBounds(
210+
const std::unordered_map<int32_t, std::string> upperBounds(
203211
deleteFile.upperBounds.begin(), deleteFile.upperBounds.end());
204212

205-
velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile(
206-
toVeloxFileContent(deleteFile.content),
213+
// Iceberg V3 deletion vectors arrive from the coordinator as
214+
// POSITION_DELETES with PUFFIN format. Reclassify them as
215+
// kDeletionVector so that IcebergSplitReader routes them to
216+
// DeletionVectorReader instead of PositionalDeleteFileReader.
217+
velox::connector::hive::iceberg::FileContent veloxContent =
218+
toVeloxFileContent(deleteFile.content);
219+
if (veloxContent ==
220+
velox::connector::hive::iceberg::FileContent::kPositionalDeletes &&
221+
deleteFile.format == protocol::iceberg::FileFormat::PUFFIN) {
222+
veloxContent =
223+
velox::connector::hive::iceberg::FileContent::kDeletionVector;
224+
}
225+
226+
const velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile(
227+
veloxContent,
207228
deleteFile.path,
208229
toVeloxFileFormat(deleteFile.format),
209230
deleteFile.recordCount,
@@ -218,8 +239,7 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
218239

219240

220241
std::unordered_map<std::string, std::string> infoColumns = {
221-
{"$data_sequence_number",
222-
std::to_string(dataSequenceNumber)},
242+
{"$data_sequence_number", std::to_string(dataSequenceNumber)},
223243
{"$path", icebergSplit->path}};
224244

225245
return std::make_unique<velox::connector::hive::iceberg::HiveIcebergSplit>(

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ static const std::pair<FileFormat, json> FileFormat_enum_table[] =
306306
{FileFormat::ORC, "ORC"},
307307
{FileFormat::PARQUET, "PARQUET"},
308308
{FileFormat::AVRO, "AVRO"},
309-
{FileFormat::METADATA, "METADATA"}};
309+
{FileFormat::METADATA, "METADATA"},
310+
{FileFormat::PUFFIN, "PUFFIN"}};
310311
void to_json(json& j, const FileFormat& e) {
311312
static_assert(std::is_enum<FileFormat>::value, "FileFormat must be an enum!");
312313
const auto* it = std::find_if(

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ extern void to_json(json& j, const FileContent& e);
8787
extern void from_json(const json& j, FileContent& e);
8888
} // namespace facebook::presto::protocol::iceberg
8989
namespace facebook::presto::protocol::iceberg {
90-
enum class FileFormat { ORC, PARQUET, AVRO, METADATA };
90+
enum class FileFormat { ORC, PARQUET, AVRO, METADATA, PUFFIN };
9191
extern void to_json(json& j, const FileFormat& e);
9292
extern void from_json(const json& j, FileFormat& e);
9393
} // namespace facebook::presto::protocol::iceberg

0 commit comments

Comments
 (0)