Skip to content

Commit cd5b2e2

Browse files
apurva-metafacebook-github-bot
authored andcommitted
feat: [presto][iceberg] Add Iceberg V3 deletion vector write path with DV page sink and compaction procedure (#27395)
Summary: - Add IcebergDeletionVectorPageSink for writing DV files during table maintenance - Add RewriteDeleteFilesProcedure for DV compaction - Wire DV page sink through IcebergCommonModule, IcebergAbstractMetadata, IcebergPageSourceProvider - Add IcebergUpdateablePageSource for DV-aware page source - Update CommitTaskData, IcebergUtil for DV support - Add test coverage in TestIcebergV3 == RELEASE NOTES == General Changes * Upgrade Apache Iceberg library from 1.10.0 to 1.10.1. Hive Connector Changes * Add Iceberg V3 deletion vector (DV) support using Puffin-encoded roaring�bitmaps, including a DV reader, writer, page sink, and compaction procedure. * Add Iceberg equality delete file reader with sequence number conflict�resolution per the Iceberg V2+ spec: equality deletes skip when�deleteFileSeqNum <= dataFileSeqNum; positional deletes and DVs skip when�deleteFileSeqNum < dataFileSeqNum; sequence number 0 (V1 legacy) never skips. * Wire dataSequenceNumber through the Presto protocol layer (Java → C++)�to enable server-side sequence number conflict resolution for all delete�file types. * Add PUFFIN file format support for deletion vector discovery, enabling�the coordinator to locate DV files during split creation. * Add Iceberg V3 deletion vector write path with DV page sink and�rewrite_delete_files compaction procedure for DV maintenance. * Add nanosecond timestamp (TIMESTAMP_NANO) type support for Iceberg V3�tables. * Add Variant type support for Iceberg V3, enabling semi-structured data�columns in Iceberg tables. * Eagerly collect delete files during split creation with improved logging�for easier debugging of Iceberg delete file resolution. * Improve IcebergSplitReader error handling and fix test file handle leaks. * Add end-to-end integration tests for Iceberg V3 covering snapshot�lifecycle (INSERT, DELETE with equality/positional/DV deletes, UPDATE,�MERGE, time-travel) and all 99 TPC-DS queries. Differential Revision: D97531549
1 parent c73cb15 commit cd5b2e2

File tree

9 files changed

+1534
-37
lines changed

9 files changed

+1534
-37
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/CommitTaskData.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.fasterxml.jackson.annotation.JsonProperty;
1818

1919
import java.util.Optional;
20+
import java.util.OptionalLong;
2021

2122
import static java.util.Objects.requireNonNull;
2223

@@ -30,6 +31,9 @@ public class CommitTaskData
3031
private final FileFormat fileFormat;
3132
private final Optional<String> referencedDataFile;
3233
private final FileContent content;
34+
private final OptionalLong contentOffset;
35+
private final OptionalLong contentSizeInBytes;
36+
private final OptionalLong recordCount;
3337

3438
@JsonCreator
3539
public CommitTaskData(
@@ -40,7 +44,10 @@ public CommitTaskData(
4044
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
4145
@JsonProperty("fileFormat") FileFormat fileFormat,
4246
@JsonProperty("referencedDataFile") String referencedDataFile,
43-
@JsonProperty("content") FileContent content)
47+
@JsonProperty("content") FileContent content,
48+
@JsonProperty("contentOffset") OptionalLong contentOffset,
49+
@JsonProperty("contentSizeInBytes") OptionalLong contentSizeInBytes,
50+
@JsonProperty("recordCount") OptionalLong recordCount)
4451
{
4552
this.path = requireNonNull(path, "path is null");
4653
this.fileSizeInBytes = fileSizeInBytes;
@@ -50,6 +57,24 @@ public CommitTaskData(
5057
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
5158
this.referencedDataFile = Optional.ofNullable(referencedDataFile);
5259
this.content = requireNonNull(content, "content is null");
60+
this.contentOffset = contentOffset != null ? contentOffset : OptionalLong.empty();
61+
this.contentSizeInBytes = contentSizeInBytes != null ? contentSizeInBytes : OptionalLong.empty();
62+
this.recordCount = recordCount != null ? recordCount : OptionalLong.empty();
63+
}
64+
65+
public CommitTaskData(
66+
String path,
67+
long fileSizeInBytes,
68+
MetricsWrapper metrics,
69+
int partitionSpecId,
70+
Optional<String> partitionDataJson,
71+
FileFormat fileFormat,
72+
String referencedDataFile,
73+
FileContent content)
74+
{
75+
this(path, fileSizeInBytes, metrics, partitionSpecId, partitionDataJson,
76+
fileFormat, referencedDataFile, content,
77+
OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty());
5378
}
5479

5580
@JsonProperty
@@ -99,4 +124,22 @@ public FileContent getContent()
99124
{
100125
return content;
101126
}
127+
128+
@JsonProperty
129+
public OptionalLong getContentOffset()
130+
{
131+
return contentOffset;
132+
}
133+
134+
@JsonProperty
135+
public OptionalLong getContentSizeInBytes()
136+
{
137+
return contentSizeInBytes;
138+
}
139+
140+
@JsonProperty
141+
public OptionalLong getRecordCount()
142+
{
143+
return recordCount;
144+
}
102145
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -405,12 +405,9 @@ protected static void validateTableForPresto(BaseTable table, Optional<Long> tab
405405
schema = metadata.schema();
406406
}
407407

408-
// Reject schema default values (initial-default / write-default)
409-
for (Types.NestedField field : schema.columns()) {
410-
if (field.initialDefault() != null || field.writeDefault() != null) {
411-
throw new PrestoException(NOT_SUPPORTED, "Iceberg v3 column default values are not supported");
412-
}
413-
}
408+
// Iceberg v3 column default values (initial-default / write-default) are supported.
409+
// The Iceberg library handles applying defaults when reading files that were written
410+
// before a column with a default was added via schema evolution.
414411

415412
// Reject Iceberg table encryption
416413
if (!metadata.encryptionKeys().isEmpty() || snapshot.keyId() != null || metadata.properties().containsKey("encryption.key-id")) {
@@ -1524,8 +1521,23 @@ public Optional<ConnectorOutputMetadata> finishDeleteWithOutput(ConnectorSession
15241521
.ofPositionDeletes()
15251522
.withPath(task.getPath())
15261523
.withFileSizeInBytes(task.getFileSizeInBytes())
1527-
.withFormat(FileFormat.fromString(task.getFileFormat().name()))
1528-
.withMetrics(task.getMetrics().metrics());
1524+
.withFormat(FileFormat.fromString(task.getFileFormat().name()));
1525+
1526+
if (task.getFileFormat() == com.facebook.presto.iceberg.FileFormat.PUFFIN) {
1527+
builder.withRecordCount(task.getRecordCount().orElseThrow(() ->
1528+
new VerifyException("recordCount required for deletion vector")));
1529+
builder.withContentOffset(task.getContentOffset().orElseThrow(() ->
1530+
new VerifyException("contentOffset required for deletion vector")));
1531+
builder.withContentSizeInBytes(task.getContentSizeInBytes().orElseThrow(() ->
1532+
new VerifyException("contentSizeInBytes required for deletion vector")));
1533+
}
1534+
else {
1535+
builder.withMetrics(task.getMetrics().metrics());
1536+
}
1537+
1538+
if (task.getReferencedDataFile().isPresent()) {
1539+
builder.withReferencedDataFile(task.getReferencedDataFile().get());
1540+
}
15291541

15301542
if (!spec.fields().isEmpty()) {
15311543
String partitionDataJson = task.getPartitionDataJson()

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
5050
import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles;
5151
import com.facebook.presto.iceberg.procedure.RewriteDataFilesProcedure;
52+
import com.facebook.presto.iceberg.procedure.RewriteDeleteFilesProcedure;
5253
import com.facebook.presto.iceberg.procedure.RewriteManifestsProcedure;
5354
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
5455
import com.facebook.presto.iceberg.procedure.RollbackToTimestampProcedure;
@@ -195,6 +196,7 @@ protected void setup(Binder binder)
195196
procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
196197
procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
197198
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
199+
procedures.addBinding().toProvider(RewriteDeleteFilesProcedure.class).in(Scopes.SINGLETON);
198200
procedures.addBinding().toProvider(RewriteManifestsProcedure.class).in(Scopes.SINGLETON);
199201

200202
// for orc

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.facebook.presto.iceberg.delete.DeleteFile;
4646
import com.facebook.presto.iceberg.delete.DeleteFilter;
4747
import com.facebook.presto.iceberg.delete.IcebergDeletePageSink;
48+
import com.facebook.presto.iceberg.delete.IcebergDeletionVectorPageSink;
4849
import com.facebook.presto.iceberg.delete.PositionDeleteFilter;
4950
import com.facebook.presto.iceberg.delete.RowPredicate;
5051
import com.facebook.presto.memory.context.AggregatedMemoryContext;
@@ -70,6 +71,7 @@
7071
import com.facebook.presto.parquet.predicate.Predicate;
7172
import com.facebook.presto.parquet.reader.ParquetReader;
7273
import com.facebook.presto.spi.ColumnHandle;
74+
import com.facebook.presto.spi.ConnectorPageSink;
7375
import com.facebook.presto.spi.ConnectorPageSource;
7476
import com.facebook.presto.spi.ConnectorSession;
7577
import com.facebook.presto.spi.ConnectorSplit;
@@ -863,17 +865,33 @@ else if (subColumn.getId() == MERGE_PARTITION_DATA.getId()) {
863865
verify(storageProperties.isPresent(), "storageProperties are null");
864866

865867
LocationProvider locationProvider = getLocationProvider(table.getSchemaTableName(), outputPath.get(), storageProperties.get());
866-
Supplier<IcebergDeletePageSink> deleteSinkSupplier = () -> new IcebergDeletePageSink(
867-
partitionSpec,
868-
split.getPartitionDataJson(),
869-
locationProvider,
870-
fileWriterFactory,
871-
hdfsEnvironment,
872-
hdfsContext,
873-
jsonCodec,
874-
session,
875-
split.getPath(),
876-
split.getFileFormat());
868+
int tableFormatVersion = Integer.parseInt(
869+
storageProperties.get().getOrDefault("format-version", "2"));
870+
Supplier<ConnectorPageSink> deleteSinkSupplier;
871+
if (tableFormatVersion >= 3) {
872+
deleteSinkSupplier = () -> new IcebergDeletionVectorPageSink(
873+
partitionSpec,
874+
split.getPartitionDataJson(),
875+
locationProvider,
876+
hdfsEnvironment,
877+
hdfsContext,
878+
jsonCodec,
879+
session,
880+
split.getPath());
881+
}
882+
else {
883+
deleteSinkSupplier = () -> new IcebergDeletePageSink(
884+
partitionSpec,
885+
split.getPartitionDataJson(),
886+
locationProvider,
887+
fileWriterFactory,
888+
hdfsEnvironment,
889+
hdfsContext,
890+
jsonCodec,
891+
session,
892+
split.getPath(),
893+
split.getFileFormat());
894+
}
877895
boolean storeDeleteFilePath = icebergColumns.contains(DELETE_FILE_PATH_COLUMN_HANDLE);
878896
Supplier<List<DeleteFilter>> deleteFilters = memoize(() -> {
879897
// If equality deletes are optimized into a join they don't need to be applied here

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import com.facebook.presto.common.block.RunLengthEncodedBlock;
2222
import com.facebook.presto.hive.HivePartitionKey;
2323
import com.facebook.presto.iceberg.delete.DeleteFilter;
24-
import com.facebook.presto.iceberg.delete.IcebergDeletePageSink;
2524
import com.facebook.presto.iceberg.delete.RowPredicate;
25+
import com.facebook.presto.spi.ConnectorPageSink;
2626
import com.facebook.presto.spi.ConnectorPageSource;
2727
import com.facebook.presto.spi.PrestoException;
2828
import com.facebook.presto.spi.UpdatablePageSource;
@@ -71,8 +71,8 @@ public class IcebergUpdateablePageSource
7171
implements UpdatablePageSource
7272
{
7373
private final ConnectorPageSource delegate;
74-
private final Supplier<IcebergDeletePageSink> deleteSinkSupplier;
75-
private IcebergDeletePageSink positionDeleteSink;
74+
private final Supplier<ConnectorPageSink> deleteSinkSupplier;
75+
private ConnectorPageSink positionDeleteSink;
7676
private final Supplier<Optional<RowPredicate>> deletePredicate;
7777
private final Supplier<List<DeleteFilter>> deleteFilters;
7878

@@ -107,7 +107,7 @@ public IcebergUpdateablePageSource(
107107
ConnectorPageSource delegate,
108108
// represents the columns output by the delegate page source
109109
List<IcebergColumnHandle> delegateColumns,
110-
Supplier<IcebergDeletePageSink> deleteSinkSupplier,
110+
Supplier<ConnectorPageSink> deleteSinkSupplier,
111111
Supplier<Optional<RowPredicate>> deletePredicate,
112112
Supplier<List<DeleteFilter>> deleteFilters,
113113
Supplier<IcebergPageSink> updatedRowPageSinkSupplier,
@@ -295,7 +295,7 @@ public void updateRows(Page page, List<Integer> columnValueAndRowIdChannels)
295295
public CompletableFuture<Collection<Slice>> finish()
296296
{
297297
return Optional.ofNullable(positionDeleteSink)
298-
.map(IcebergDeletePageSink::finish)
298+
.map(ConnectorPageSink::finish)
299299
.orElseGet(() -> completedFuture(ImmutableList.of()))
300300
.thenCombine(
301301
Optional.ofNullable(updatedRowPageSink).map(IcebergPageSink::finish)

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public final class IcebergUtil
221221
{
222222
private static final Logger log = Logger.get(IcebergUtil.class);
223223
public static final int MIN_FORMAT_VERSION_FOR_DELETE = 2;
224-
public static final int MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS = 2;
224+
public static final int MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS = 3;
225225
public static final int MAX_SUPPORTED_FORMAT_VERSION = 3;
226226

227227
public static final long DOUBLE_POSITIVE_ZERO = 0x0000000000000000L;

0 commit comments

Comments
 (0)