Skip to content

Commit ebab5c3

Browse files
committed
Add Iceberg format version 3 table support
1 parent 418e326 commit ebab5c3

File tree

5 files changed

+329
-9
lines changed

5 files changed

+329
-9
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
import org.apache.iceberg.Snapshot;
113113
import org.apache.iceberg.SortOrder;
114114
import org.apache.iceberg.Table;
115+
import org.apache.iceberg.TableMetadata;
115116
import org.apache.iceberg.TableProperties;
116117
import org.apache.iceberg.Transaction;
117118
import org.apache.iceberg.UpdatePartitionSpec;
@@ -188,6 +189,7 @@
188189
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
189190
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
190191
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
192+
import static com.facebook.presto.iceberg.IcebergUtil.MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS;
191193
import static com.facebook.presto.iceberg.IcebergUtil.MIN_FORMAT_VERSION_FOR_DELETE;
192194
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
193195
import static com.facebook.presto.iceberg.IcebergUtil.getColumnsForWrite;
@@ -357,6 +359,43 @@ public Optional<IcebergProcedureContext> getProcedureContext()
357359
return this.procedureContext;
358360
}
359361

362+
/**
363+
* Validates that an Iceberg table does not use unsupported v3 features.
364+
* TODO: Remove when Iceberg v3 is fully supported
365+
*/
366+
protected static void validateTableForPresto(BaseTable table, Optional<Long> tableSnapshotId)
367+
{
368+
Snapshot snapshot = tableSnapshotId
369+
.map(table::snapshot)
370+
.orElse(table.currentSnapshot());
371+
if (snapshot == null) {
372+
// empty table, nothing to validate
373+
return;
374+
}
375+
376+
TableMetadata metadata = table.operations().current();
377+
if (metadata.formatVersion() < 3) {
378+
return;
379+
}
380+
381+
Schema schema = metadata.schemasById().get(snapshot.schemaId());
382+
if (schema == null) {
383+
schema = metadata.schema();
384+
}
385+
386+
// Reject schema default values (initial-default / write-default)
387+
for (Types.NestedField field : schema.columns()) {
388+
if (field.initialDefault() != null || field.writeDefault() != null) {
389+
throw new PrestoException(NOT_SUPPORTED, "Iceberg v3 column default values are not supported");
390+
}
391+
}
392+
393+
// Reject Iceberg table encryption
394+
if (!metadata.encryptionKeys().isEmpty() || snapshot.keyId() != null || metadata.properties().containsKey("encryption.key-id")) {
395+
throw new PrestoException(NOT_SUPPORTED, "Iceberg table encryption is not supported");
396+
}
397+
}
398+
360399
/**
361400
* This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
362401
*/
@@ -832,10 +871,17 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
832871
Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName());
833872
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
834873

835-
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE ||
836-
!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
837-
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
838-
.orElse(false)) {
874+
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
875+
throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION,
876+
"Iceberg table updates require at least format version 2 and update mode must be merge-on-read");
877+
}
878+
if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) {
879+
throw new PrestoException(NOT_SUPPORTED,
880+
format("Iceberg table updates for format version %s are not supported yet", formatVersion));
881+
}
882+
if (!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
883+
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
884+
.orElse(false)) {
839885
throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION,
840886
"Iceberg table updates require at least format version 2 and update mode must be merge-on-read");
841887
}
@@ -1174,6 +1220,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
11741220
verify(table.getIcebergTableName().getTableType() == DATA, "only the data table can have data inserted");
11751221
Table icebergTable = getIcebergTable(session, table.getSchemaTableName());
11761222
validateTableMode(session, icebergTable);
1223+
BaseTable baseTable = (BaseTable) icebergTable;
1224+
validateTableForPresto(baseTable, Optional.ofNullable(baseTable.currentSnapshot()).map(Snapshot::snapshotId));
11771225

11781226
return beginIcebergTableInsert(session, table, icebergTable);
11791227
}
@@ -1364,6 +1412,10 @@ public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, Connecto
13641412
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
13651413
throw new PrestoException(NOT_SUPPORTED, format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE));
13661414
}
1415+
if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) {
1416+
throw new PrestoException(NOT_SUPPORTED,
1417+
format("Iceberg table updates for format version %s are not supported yet", formatVersion));
1418+
}
13671419
if (getDeleteMode(icebergTable) == RowLevelOperationMode.COPY_ON_WRITE) {
13681420
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely. Configure write.delete.mode table property to allow row level deletions.");
13691421
}
@@ -1620,11 +1672,17 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
16201672
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
16211673
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
16221674
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
1623-
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE ||
1624-
!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
1625-
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
1626-
.orElse(false)) {
1627-
throw new RuntimeException("Iceberg table updates require at least format version 2 and update mode must be merge-on-read");
1675+
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
1676+
throw new PrestoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2");
1677+
}
1678+
if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) {
1679+
throw new PrestoException(NOT_SUPPORTED,
1680+
format("Iceberg table updates for format version %s are not supported yet", formatVersion));
1681+
}
1682+
if (!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
1683+
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
1684+
.orElse(false)) {
1685+
throw new PrestoException(NOT_SUPPORTED, "Iceberg table updates require update mode to be merge-on-read");
16281686
}
16291687
validateTableMode(session, icebergTable);
16301688
transaction = icebergTable.newTransaction();

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.spi.ConnectorSession;
1919
import com.facebook.presto.spi.ConnectorSplit;
2020
import com.facebook.presto.spi.ConnectorSplitSource;
21+
import com.facebook.presto.spi.PrestoException;
2122
import com.facebook.presto.spi.SplitWeight;
2223
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
2324
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
@@ -46,6 +47,7 @@
4647
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
4748
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
4849
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
50+
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
4951
import static com.google.common.collect.ImmutableList.toImmutableList;
5052
import static com.google.common.collect.Iterators.limit;
5153
import static java.util.Objects.requireNonNull;
@@ -124,6 +126,13 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
124126
PartitionSpec spec = task.spec();
125127
Optional<PartitionData> partitionData = partitionDataFromStructLike(spec, task.file().partition());
126128

129+
// Validate no PUFFIN deletion vectors (Iceberg v3 feature not yet supported)
130+
for (org.apache.iceberg.DeleteFile deleteFile : task.deletes()) {
131+
if (deleteFile.format() == org.apache.iceberg.FileFormat.PUFFIN) {
132+
throw new PrestoException(NOT_SUPPORTED, "Iceberg deletion vectors (PUFFIN format) are not supported");
133+
}
134+
}
135+
127136
// TODO: We should leverage residual expression and convert that to TupleDomain.
128137
// The predicate here is used by readers for predicate push down at reader level,
129138
// so when we do not use residual expression, we are just wasting CPU cycles

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,8 @@ public final class IcebergUtil
215215
{
216216
private static final Logger log = Logger.get(IcebergUtil.class);
217217
public static final int MIN_FORMAT_VERSION_FOR_DELETE = 2;
218+
public static final int MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS = 2;
219+
public static final int MAX_SUPPORTED_FORMAT_VERSION = 3;
218220

219221
public static final long DOUBLE_POSITIVE_ZERO = 0x0000000000000000L;
220222
public static final long DOUBLE_POSITIVE_INFINITE = 0x7ff0000000000000L;

presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.common.collect.ImmutableMap;
3838
import com.google.common.collect.ImmutableSet;
3939
import io.airlift.slice.Slice;
40+
import org.apache.iceberg.BaseTable;
4041
import org.apache.iceberg.DataFile;
4142
import org.apache.iceberg.DataFiles;
4243
import org.apache.iceberg.DeleteFile;
@@ -69,6 +70,7 @@
6970
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
7071
import static com.facebook.presto.iceberg.IcebergAbstractMetadata.getSupportedSortFields;
7172
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
73+
import static com.facebook.presto.iceberg.IcebergUtil.MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS;
7274
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
7375
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
7476
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
@@ -124,6 +126,16 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec
124126
Table icebergTable = procedureContext.getTable();
125127
IcebergTableHandle tableHandle = layoutHandle.getTable();
126128

129+
// Validate format version for OPTIMIZE operation
130+
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
131+
if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) {
132+
throw new PrestoException(NOT_SUPPORTED,
133+
format("OPTIMIZE is not supported for Iceberg table format version > %d. Table %s format version is %s.",
134+
MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS,
135+
icebergTable.name(),
136+
formatVersion));
137+
}
138+
127139
SortOrder sortOrder = icebergTable.sortOrder();
128140
List<String> sortFieldStrings = ImmutableList.of();
129141
if (sortOrderIndex.isPresent()) {

0 commit comments

Comments
 (0)