-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat: Add initial support for Iceberg format version 3 #27021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -112,6 +112,7 @@ | |
| import org.apache.iceberg.Snapshot; | ||
| import org.apache.iceberg.SortOrder; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableMetadata; | ||
| import org.apache.iceberg.TableProperties; | ||
| import org.apache.iceberg.Transaction; | ||
| import org.apache.iceberg.UpdatePartitionSpec; | ||
|
|
@@ -188,6 +189,7 @@ | |
| import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG; | ||
| import static com.facebook.presto.iceberg.IcebergTableType.DATA; | ||
| import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES; | ||
| import static com.facebook.presto.iceberg.IcebergUtil.MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS; | ||
| import static com.facebook.presto.iceberg.IcebergUtil.MIN_FORMAT_VERSION_FOR_DELETE; | ||
| import static com.facebook.presto.iceberg.IcebergUtil.getColumns; | ||
| import static com.facebook.presto.iceberg.IcebergUtil.getColumnsForWrite; | ||
|
|
@@ -357,6 +359,48 @@ public Optional<IcebergProcedureContext> getProcedureContext() | |
| return this.procedureContext; | ||
| } | ||
|
|
||
| protected static void validateTableForPresto(BaseTable table, Optional<Long> tableSnapshotId) | ||
| { | ||
| Snapshot snapshot; | ||
| try { | ||
| snapshot = tableSnapshotId | ||
| .map(table::snapshot) | ||
| .orElse(table.currentSnapshot()); | ||
| } | ||
| catch (RuntimeException e) { | ||
| // If the snapshot cannot be retrieved (e.g. metadata is missing), we cannot validate the table. | ||
| // Returning here allows operations that do not strictly require the snapshot (like DROP TABLE) to proceed. | ||
| return; | ||
| } | ||
|
|
||
| if (snapshot == null) { | ||
| // empty table, nothing to validate | ||
| return; | ||
| } | ||
|
|
||
| TableMetadata metadata = table.operations().current(); | ||
| if (metadata.formatVersion() < 3) { | ||
| return; | ||
| } | ||
|
|
||
| Schema schema = metadata.schemasById().get(snapshot.schemaId()); | ||
| if (schema == null) { | ||
| schema = metadata.schema(); | ||
| } | ||
|
|
||
| // Reject schema default values (initial-default / write-default) | ||
| for (Types.NestedField field : schema.columns()) { | ||
|
Comment on lines
+391
to
+392
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: Column default validation only considers top-level columns and may miss nested fields
|
||
| if (field.initialDefault() != null || field.writeDefault() != null) { | ||
| throw new PrestoException(NOT_SUPPORTED, "Iceberg v3 column default values are not supported"); | ||
| } | ||
| } | ||
|
|
||
| // Reject Iceberg table encryption | ||
| if (!metadata.encryptionKeys().isEmpty() || snapshot.keyId() != null || metadata.properties().containsKey("encryption.key-id")) { | ||
| throw new PrestoException(NOT_SUPPORTED, "Iceberg table encryption is not supported"); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker | ||
| */ | ||
|
|
@@ -832,10 +876,17 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT | |
| Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName()); | ||
| int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); | ||
|
|
||
| if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE || | ||
| !Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE)) | ||
| .map(mode -> mode.equals(MERGE_ON_READ.modeName())) | ||
| .orElse(false)) { | ||
| if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) { | ||
| throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION, | ||
|
Comment on lines
+879
to
+880
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: The error message for low format versions mentions update mode before it is validated, which can be misleading. The exception text currently states both a format version and update mode requirement, but this branch only validates the format version; update mode could still be valid. Consider separating these into distinct errors (one for unsupported format version, one for invalid update mode) so users can see exactly which precondition failed. Suggested implementation: Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName());
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(
ICEBERG_INVALID_FORMAT_VERSION,
"Iceberg table updates require at least format version 2");
}
if (!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
.orElse(false)) {
throw new PrestoException(
ICEBERG_INVALID_METADATA,
format("Iceberg table updates require table property %s to be set to %s",
TableProperties.UPDATE_MODE,
MERGE_ON_READ.modeName()));
}
/**These edits assume:
|
||
| "Iceberg table updates require at least format version 2 and update mode must be merge-on-read"); | ||
| } | ||
| if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) { | ||
| throw new PrestoException(NOT_SUPPORTED, | ||
| format("Iceberg table updates for format version %s are not supported yet", formatVersion)); | ||
| } | ||
| if (!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE)) | ||
| .map(mode -> mode.equals(MERGE_ON_READ.modeName())) | ||
|
Comment on lines
+879
to
+888
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: Error codes/messages for unsupported format versions are inconsistent between beginMerge and beginUpdate.
Suggested implementation: if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(
ICEBERG_INVALID_FORMAT_VERSION,
format(
"Iceberg row-level operations are only supported for table format versions in range [%s, %s]. Found: %s",
MIN_FORMAT_VERSION_FOR_DELETE,
MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS,
formatVersion));
}
if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) {
throw new PrestoException(
ICEBERG_INVALID_FORMAT_VERSION,
format(
"Iceberg row-level operations are only supported for table format versions in range [%s, %s]. Found: %s",
MIN_FORMAT_VERSION_FOR_DELETE,
MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS,
formatVersion));
}To fully align messages and error codes across all row-level operations (merge/update/delete), you should:
|
||
| .orElse(false)) { | ||
| throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION, | ||
| "Iceberg table updates require at least format version 2 and update mode must be merge-on-read"); | ||
| } | ||
|
|
@@ -1174,6 +1225,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto | |
| verify(table.getIcebergTableName().getTableType() == DATA, "only the data table can have data inserted"); | ||
| Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); | ||
| validateTableMode(session, icebergTable); | ||
| BaseTable baseTable = (BaseTable) icebergTable; | ||
| validateTableForPresto(baseTable, Optional.ofNullable(baseTable.currentSnapshot()).map(Snapshot::snapshotId)); | ||
|
|
||
| return beginIcebergTableInsert(session, table, icebergTable); | ||
| } | ||
|
|
@@ -1261,6 +1314,12 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa | |
| }) | ||
| .orElseGet(() -> resolveSnapshotIdByName(table, name)); | ||
|
|
||
| // Validate unsupported v3 features (column defaults, encryption) before | ||
| // proceeding | ||
| if (table instanceof BaseTable) { | ||
| validateTableForPresto((BaseTable) table, tableSnapshotId); | ||
| } | ||
|
|
||
| // Get Iceberg tables schema, properties, and location with missing | ||
| // filesystem metadata will fail. | ||
| // See https://github.com/prestodb/presto/pull/21181 | ||
|
|
@@ -1364,6 +1423,10 @@ public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, Connecto | |
| if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) { | ||
| throw new PrestoException(NOT_SUPPORTED, format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE)); | ||
| } | ||
| if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) { | ||
| throw new PrestoException(NOT_SUPPORTED, | ||
| format("Iceberg table updates for format version %s are not supported yet", formatVersion)); | ||
| } | ||
| if (getDeleteMode(icebergTable) == RowLevelOperationMode.COPY_ON_WRITE) { | ||
| throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely. Configure write.delete.mode table property to allow row level deletions."); | ||
| } | ||
|
|
@@ -1620,11 +1683,17 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable | |
| IcebergTableHandle handle = (IcebergTableHandle) tableHandle; | ||
| Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); | ||
| int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); | ||
| if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE || | ||
| !Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE)) | ||
| .map(mode -> mode.equals(MERGE_ON_READ.modeName())) | ||
| .orElse(false)) { | ||
| throw new RuntimeException("Iceberg table updates require at least format version 2 and update mode must be merge-on-read"); | ||
| if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) { | ||
| throw new PrestoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2"); | ||
| } | ||
| if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) { | ||
| throw new PrestoException(NOT_SUPPORTED, | ||
| format("Iceberg table updates for format version %s are not supported yet", formatVersion)); | ||
| } | ||
| if (!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE)) | ||
| .map(mode -> mode.equals(MERGE_ON_READ.modeName())) | ||
| .orElse(false)) { | ||
| throw new PrestoException(NOT_SUPPORTED, "Iceberg table updates require update mode to be merge-on-read"); | ||
| } | ||
| validateTableMode(session, icebergTable); | ||
| transaction = icebergTable.newTransaction(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,7 @@ | |
| import com.google.common.collect.ImmutableMap; | ||
| import com.google.common.collect.ImmutableSet; | ||
| import io.airlift.slice.Slice; | ||
| import org.apache.iceberg.BaseTable; | ||
| import org.apache.iceberg.DataFile; | ||
| import org.apache.iceberg.DataFiles; | ||
| import org.apache.iceberg.DeleteFile; | ||
|
|
@@ -69,6 +70,7 @@ | |
| import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; | ||
| import static com.facebook.presto.iceberg.IcebergAbstractMetadata.getSupportedSortFields; | ||
| import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; | ||
| import static com.facebook.presto.iceberg.IcebergUtil.MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS; | ||
| import static com.facebook.presto.iceberg.IcebergUtil.getColumns; | ||
| import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; | ||
| import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; | ||
|
|
@@ -124,6 +126,16 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec | |
| Table icebergTable = procedureContext.getTable(); | ||
| IcebergTableHandle tableHandle = layoutHandle.getTable(); | ||
|
|
||
| // Validate format version for OPTIMIZE operation | ||
| int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); | ||
| if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) { | ||
|
Comment on lines
+129
to
+131
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: Reusing MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS for OPTIMIZE may conflate different capability boundaries Since OPTIMIZE/rewrite-data-files is not actually a row-level operation, tying it to Suggested implementation: // Validate format version for OPTIMIZE operation
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (formatVersion > MAX_FORMAT_VERSION_FOR_OPTIMIZE) {
throw new PrestoException(NOT_SUPPORTED,
format("OPTIMIZE is not supported for Iceberg table format version > %d. Table %s format version is %s.",
MAX_FORMAT_VERSION_FOR_OPTIMIZE,
icebergTable.name(),
formatVersion));
}
|
||
| throw new PrestoException(NOT_SUPPORTED, | ||
| format("OPTIMIZE is not supported for Iceberg table format version > %d. Table %s format version is %s.", | ||
| MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS, | ||
| icebergTable.name(), | ||
| formatVersion)); | ||
| } | ||
|
|
||
| SortOrder sortOrder = icebergTable.sortOrder(); | ||
| List<String> sortFieldStrings = ImmutableList.of(); | ||
| if (sortOrderIndex.isPresent()) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Catching RuntimeException broadly here may hide unexpected failures during snapshot retrieval.
Swallowing all RuntimeExceptions here will also hide programming/configuration errors that should surface or at least be observable. Consider catching a narrower, Iceberg-specific exception for the expected failure mode, or at minimum log the exception before returning so operational debugging remains possible.