Skip to content

Commit 8859d59

Browse files
authored
[Kernel][#4] Introduce IcebergWriterCompatV3 (delta-io#4774)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> - Implement IcebergWriterCompatV3. We skipped v2 to align with IcebergCompatV3 which maps to iceberg format v3. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Unit tests. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> Yes. Introduces IcebergWriterCompatV3.
1 parent d95d2a8 commit 8859d59

File tree

7 files changed

+846
-46
lines changed

7 files changed

+846
-46
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
4242
import io.delta.kernel.internal.icebergcompat.IcebergUniversalFormatMetadataValidatorAndUpdater;
4343
import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV1MetadataValidatorAndUpdater;
44+
import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV3MetadataValidatorAndUpdater;
4445
import io.delta.kernel.internal.lang.Lazy;
4546
import io.delta.kernel.internal.metrics.SnapshotMetrics;
4647
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
@@ -481,6 +482,16 @@ protected Tuple2<Optional<Protocol>, Optional<Metadata>> validateAndUpdateProtoc
481482
newMetadata = icebergWriterCompatV1;
482483
}
483484

485+
Optional<Metadata> icebergWriterCompatV3 =
486+
IcebergWriterCompatV3MetadataValidatorAndUpdater
487+
.validateAndUpdateIcebergWriterCompatV3Metadata(
488+
isCreateOrReplace,
489+
newMetadata.orElse(baseMetadata),
490+
newProtocol.orElse(baseProtocol));
491+
if (icebergWriterCompatV3.isPresent()) {
492+
newMetadata = icebergWriterCompatV3;
493+
}
494+
484495
// TODO: refactor this method to use a single validator and updater.
485496
Optional<Metadata> icebergCompatV2Metadata =
486497
IcebergCompatV2MetadataValidatorAndUpdater.validateAndUpdateIcebergCompatV2Metadata(

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/GenerateIcebergCompatActionUtils.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.delta.kernel.internal.data.TransactionStateRow;
3535
import io.delta.kernel.internal.fs.Path;
3636
import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater;
37+
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
3738
import io.delta.kernel.statistics.DataFileStatistics;
3839
import io.delta.kernel.types.StructType;
3940
import io.delta.kernel.utils.CloseableIterable;
@@ -107,6 +108,57 @@ public static Row generateIcebergCompatWriterV1AddAction(
107108
transactionState, fileStatus, partitionValues, dataChange, Collections.emptyMap());
108109
}
109110

111+
/**
112+
* Create an add action {@link Row} that can be passed to {@link Transaction#commit(Engine,
113+
* CloseableIterable)} from an Iceberg add.
114+
*
115+
* @param transactionState the transaction state from the built transaction
116+
* @param fileStatus the file status to create the add with (contains path, time, size, and stats)
117+
* @param partitionValues the partition values for the add
118+
* @param dataChange whether or not the add constitutes a dataChange (i.e. append vs. compaction)
119+
* @param tags key-value metadata to be attached to the add action
120+
* @return add action row that can be included in the transaction
121+
* @throws UnsupportedOperationException if icebergWriterCompatV3 is not enabled
122+
* @throws UnsupportedOperationException if maxRetries != 0 in the transaction
123+
* @throws KernelException if stats are not present (required for icebergCompatV3)
124+
* @throws UnsupportedOperationException if the table is partitioned (currently unsupported)
125+
*/
126+
public static Row generateIcebergCompatWriterV3AddAction(
127+
Row transactionState,
128+
DataFileStatus fileStatus,
129+
Map<String, Literal> partitionValues,
130+
boolean dataChange,
131+
Map<String, String> tags) {
132+
Map<String, String> configuration = TransactionStateRow.getConfiguration(transactionState);
133+
134+
/* ----- Validate that this is a valid usage of this API ----- */
135+
validateIcebergWriterCompatV3Enabled(configuration);
136+
validateMaxRetriesSetToZero(transactionState);
137+
138+
/* ----- Validate this is valid write given the table's protocol & configurations ----- */
139+
checkState(
140+
TableConfig.ICEBERG_COMPAT_V3_ENABLED.fromMetadata(configuration),
141+
"icebergCompatV3 not enabled despite icebergWriterCompatV3 enabled");
142+
// We require field `numRecords` when icebergCompatV3 is enabled
143+
IcebergCompatV3MetadataValidatorAndUpdater.validateDataFileStatus(fileStatus);
144+
145+
/* --- Validate and update partitionValues ---- */
146+
// Currently we don't support partitioned tables; fail here
147+
blockPartitionedTables(transactionState, partitionValues);
148+
149+
URI tableRoot = new Path(TransactionStateRow.getTablePath(transactionState)).toUri();
150+
// This takes care of relativizing the file path and serializing the file statistics
151+
AddFile addFile =
152+
AddFile.convertDataFileStatus(
153+
TransactionStateRow.getPhysicalSchema(transactionState),
154+
tableRoot,
155+
fileStatus,
156+
partitionValues,
157+
dataChange,
158+
tags);
159+
return SingleAction.createAddFileSingleAction(addFile.toRow());
160+
}
161+
110162
/**
111163
* Create a remove action {@link Row} that can be passed to {@link Transaction#commit(Engine,
112164
* CloseableIterable)} from an Iceberg remove.
@@ -134,13 +186,64 @@ public static Row generateIcebergCompatWriterV1RemoveAction(
134186
validateIcebergWriterCompatV1Enabled(config);
135187
validateMaxRetriesSetToZero(transactionState);
136188

189+
/* ----- Validate this is valid write given the table's protocol & configurations ----- */
190+
// We only allow removes with dataChange=false when appendOnly=true
191+
blockUpdatingAppendOnlyTables(dataChange, transactionState, config);
192+
193+
/* --- Validate and update partitionValues ---- */
194+
// Currently we don't support partitioned tables; fail here
195+
blockPartitionedTables(transactionState, partitionValues);
196+
197+
URI tableRoot = new Path(TransactionStateRow.getTablePath(transactionState)).toUri();
198+
// This takes care of relativizing the file path and serializing the file statistics
199+
Row removeFileRow =
200+
convertRemoveDataFileStatus(
201+
TransactionStateRow.getPhysicalSchema(transactionState),
202+
tableRoot,
203+
fileStatus,
204+
partitionValues,
205+
dataChange);
206+
return SingleAction.createRemoveFileSingleAction(removeFileRow);
207+
}
208+
209+
/**
210+
* Create a remove action {@link Row} that can be passed to {@link Transaction#commit(Engine,
211+
* CloseableIterable)} from an Iceberg remove.
212+
*
213+
* @param transactionState the transaction state from the built transaction
214+
* @param fileStatus the file status to create the remove with (contains path, time, size, and
215+
* stats)
216+
* @param partitionValues the partition values for the remove
217+
* @param dataChange whether or not the remove constitutes a dataChange (i.e. delete vs.
218+
* compaction)
219+
* @return remove action row that can be committed to the transaction
220+
* @throws UnsupportedOperationException if icebergWriterCompatV3 is not enabled
221+
* @throws UnsupportedOperationException if maxRetries != 0 in the transaction
222+
* @throws KernelException if the table is an append-only table and dataChange=true
223+
* @throws UnsupportedOperationException if the table is partitioned (currently unsupported)
224+
*/
225+
public static Row generateIcebergCompatWriterV3RemoveAction(
226+
Row transactionState,
227+
DataFileStatus fileStatus,
228+
Map<String, Literal> partitionValues,
229+
boolean dataChange) {
230+
Map<String, String> config = TransactionStateRow.getConfiguration(transactionState);
231+
232+
/* ----- Validate that this is a valid usage of this API ----- */
233+
validateIcebergWriterCompatV3Enabled(config);
234+
validateMaxRetriesSetToZero(transactionState);
235+
137236
/* ----- Validate this is valid write given the table's protocol & configurations ----- */
138237
// We only allow removes with dataChange=false when appendOnly=true
139238
if (dataChange && TableConfig.APPEND_ONLY_ENABLED.fromMetadata(config)) {
140239
throw DeltaErrors.cannotModifyAppendOnlyTable(
141240
TransactionStateRow.getTablePath(transactionState));
142241
}
143242

243+
/* ----- Validate this is valid write given the table's protocol & configurations ----- */
244+
// We only allow removes with dataChange=false when appendOnly=true
245+
blockUpdatingAppendOnlyTables(dataChange, transactionState, config);
246+
144247
/* --- Validate and update partitionValues ---- */
145248
// Currently we don't support partitioned tables; fail here
146249
blockPartitionedTables(transactionState, partitionValues);
@@ -177,6 +280,21 @@ private static void validateIcebergWriterCompatV1Enabled(Map<String, String> con
177280
}
178281
}
179282

283+
/**
284+
* Validates that table feature `icebergWriterCompatV3` is enabled. We restrict usage of these
285+
* APIs to require that this table feature is enabled to prevent any unsafe usage due to the table
286+
* features that are blocked via `icebergWriterCompatV3`.
287+
*/
288+
private static void validateIcebergWriterCompatV3Enabled(Map<String, String> config) {
289+
if (!TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.fromMetadata(config)) {
290+
throw new UnsupportedOperationException(
291+
String.format(
292+
"APIs within GenerateIcebergCompatActionUtils are only supported on tables with"
293+
+ " '%s' set to true",
294+
TableConfig.ICEBERG_WRITER_COMPAT_V3_ENABLED.getKey()));
295+
}
296+
}
297+
180298
/**
181299
* Throws an exception if `maxRetries` was not set to 0 in the transaction. We restrict these APIs
182300
* to require `maxRetries = 0` since conflict resolution is not supported for operations other
@@ -192,6 +310,15 @@ private static void validateMaxRetriesSetToZero(Row transactionState) {
192310
}
193311
}
194312

313+
private static void blockUpdatingAppendOnlyTables(
314+
boolean dataChange, Row transactionState, Map<String, String> config) {
315+
// We only allow removes with dataChange=false when appendOnly=true
316+
if (dataChange && TableConfig.APPEND_ONLY_ENABLED.fromMetadata(config)) {
317+
throw DeltaErrors.cannotModifyAppendOnlyTable(
318+
TransactionStateRow.getTablePath(transactionState));
319+
}
320+
}
321+
195322
private static void blockPartitionedTables(
196323
Row transactionState, Map<String, Literal> partitionValues) {
197324
if (!TransactionStateRow.getPartitionColumnsList(transactionState).isEmpty()) {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatMetadataValidatorAndUpdater.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.delta.kernel.internal.util.SchemaIterable;
2828
import io.delta.kernel.types.*;
2929
import java.util.*;
30+
import java.util.stream.Stream;
3031

3132
/**
3233
* Contains interfaces and common utility classes performing the validations and updates necessary
@@ -55,6 +56,49 @@ abstract class IcebergWriterCompatMetadataValidatorAndUpdater
5556
ColumnMapping.updateColumnMappingMetadataIfNeeded(
5657
inputContext.newMetadata, inputContext.isCreatingNewTable));
5758

59+
/**
60+
* Creates an IcebergCompatRequiredTablePropertyEnforcer for enabling a specific Iceberg
61+
* compatibility version. The enforcer ensures the property is set to "true" and delegates
62+
* validation to the appropriate metadata validator.
63+
*
64+
* @param tableConfigProperty the table configuration property to enforce
65+
* @param postProcessor the version-specific validation and metadata update processor
66+
* @return configured enforcer for the specified Iceberg compatibility version
67+
*/
68+
protected static IcebergCompatRequiredTablePropertyEnforcer<Boolean> createIcebergCompatEnforcer(
69+
TableConfig<Boolean> tableConfigProperty, PostMetadataProcessor postProcessor) {
70+
return new IcebergCompatRequiredTablePropertyEnforcer<>(
71+
tableConfigProperty, (value) -> value, "true", postProcessor);
72+
}
73+
74+
/**
75+
* Common set of allowed table features shared across all Iceberg writer compatibility versions.
76+
* This includes the incompatible legacy features (invariants, changeDataFeed, checkConstraints,
77+
* identityColumns, generatedColumns) because they may be present in the table protocol even when
78+
* they are not in use. In later checks we validate that these incompatible features are inactive
79+
* in the table. See the protocol spec for more details.
80+
*/
81+
protected static final Set<TableFeature> COMMON_ALLOWED_FEATURES =
82+
Stream.of(
83+
// Incompatible, but not active, legacy table features
84+
INVARIANTS_W_FEATURE,
85+
CHANGE_DATA_FEED_W_FEATURE,
86+
CONSTRAINTS_W_FEATURE,
87+
IDENTITY_COLUMNS_W_FEATURE,
88+
GENERATED_COLUMNS_W_FEATURE,
89+
// Compatible table features
90+
APPEND_ONLY_W_FEATURE,
91+
COLUMN_MAPPING_RW_FEATURE,
92+
DOMAIN_METADATA_W_FEATURE,
93+
VACUUM_PROTOCOL_CHECK_RW_FEATURE,
94+
CHECKPOINT_V2_RW_FEATURE,
95+
IN_COMMIT_TIMESTAMP_W_FEATURE,
96+
CLUSTERING_W_FEATURE,
97+
TIMESTAMP_NTZ_RW_FEATURE,
98+
TYPE_WIDENING_RW_FEATURE,
99+
TYPE_WIDENING_RW_PREVIEW_FEATURE)
100+
.collect(toSet());
101+
58102
protected static IcebergCompatCheck createUnsupportedFeaturesCheck(
59103
IcebergWriterCompatMetadataValidatorAndUpdater instance) {
60104
return (inputContext) -> {
@@ -209,6 +253,16 @@ protected static IcebergCompatCheck createUnsupportedFeaturesCheck(
209253
}
210254
};
211255

256+
protected static final List<IcebergCompatCheck> COMMON_CHECKS =
257+
Arrays.asList(
258+
UNSUPPORTED_TYPES_CHECK,
259+
PHYSICAL_NAMES_MATCH_FIELD_IDS_CHECK,
260+
INVARIANTS_INACTIVE_CHECK,
261+
CHANGE_DATA_FEED_INACTIVE_CHECK,
262+
CHECK_CONSTRAINTS_INACTIVE_CHECK,
263+
IDENTITY_COLUMNS_INACTIVE_CHECK,
264+
GENERATED_COLUMNS_INACTIVE_CHECK);
265+
212266
@Override
213267
abstract String compatFeatureName();
214268

kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdater.java

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -107,45 +107,29 @@ public static Optional<Metadata> validateAndUpdateIcebergWriterCompatV1Metadata(
107107
private static final IcebergWriterCompatV1MetadataValidatorAndUpdater INSTANCE =
108108
new IcebergWriterCompatV1MetadataValidatorAndUpdater();
109109

110-
private static final IcebergCompatRequiredTablePropertyEnforcer ICEBERG_COMPAT_V2_ENABLED =
111-
new IcebergCompatRequiredTablePropertyEnforcer<>(
112-
TableConfig.ICEBERG_COMPAT_V2_ENABLED,
113-
(value) -> value,
114-
"true",
115-
(inputContext) ->
116-
IcebergCompatV2MetadataValidatorAndUpdater.validateAndUpdateIcebergCompatV2Metadata(
117-
inputContext.isCreatingNewTable,
118-
inputContext.newMetadata,
119-
inputContext.newProtocol));
110+
/**
111+
* Enforcer for Iceberg compatibility V2 (required by V1). Ensures the ICEBERG_COMPAT_V2_ENABLED
112+
* property is set to "true" and delegates validation to the V2 metadata validator.
113+
*/
114+
private static final IcebergCompatRequiredTablePropertyEnforcer<Boolean>
115+
ICEBERG_COMPAT_V2_ENABLED =
116+
createIcebergCompatEnforcer(
117+
TableConfig.ICEBERG_COMPAT_V2_ENABLED,
118+
(inputContext) ->
119+
IcebergCompatV2MetadataValidatorAndUpdater
120+
.validateAndUpdateIcebergCompatV2Metadata(
121+
inputContext.isCreatingNewTable,
122+
inputContext.newMetadata,
123+
inputContext.newProtocol));
120124

121125
/**
122-
* Current set of allowed table features. This may evolve as the protocol evolves. This includes
123-
* the incompatible legacy features (invariants, changeDataFeed, checkConstraints,
124-
* identityColumns, generatedColumns) because they may be present in the table protocol even when
125-
* they are not in use. In later checks we validate that these incompatible features are inactive
126-
* in the table. See the protocol spec for more details.
126+
* Current set of allowed table features for Iceberg writer compat V1. This combines the common
127+
* features with V1-specific features (ICEBERG_COMPAT_V2_W_FEATURE, ICEBERG_WRITER_COMPAT_V1).
127128
*/
128129
private static Set<TableFeature> ALLOWED_TABLE_FEATURES =
129-
Stream.of(
130-
// Incompatible legacy table features
131-
INVARIANTS_W_FEATURE,
132-
CHANGE_DATA_FEED_W_FEATURE,
133-
CONSTRAINTS_W_FEATURE,
134-
IDENTITY_COLUMNS_W_FEATURE,
135-
GENERATED_COLUMNS_W_FEATURE,
136-
// Compatible table features
137-
APPEND_ONLY_W_FEATURE,
138-
COLUMN_MAPPING_RW_FEATURE,
139-
ICEBERG_COMPAT_V2_W_FEATURE,
140-
ICEBERG_WRITER_COMPAT_V1,
141-
DOMAIN_METADATA_W_FEATURE,
142-
VACUUM_PROTOCOL_CHECK_RW_FEATURE,
143-
CHECKPOINT_V2_RW_FEATURE,
144-
IN_COMMIT_TIMESTAMP_W_FEATURE,
145-
CLUSTERING_W_FEATURE,
146-
TIMESTAMP_NTZ_RW_FEATURE,
147-
TYPE_WIDENING_RW_FEATURE,
148-
TYPE_WIDENING_RW_PREVIEW_FEATURE)
130+
Stream.concat(
131+
COMMON_ALLOWED_FEATURES.stream(),
132+
Stream.of(ICEBERG_COMPAT_V2_W_FEATURE, ICEBERG_WRITER_COMPAT_V1))
149133
.collect(toSet());
150134

151135
@Override
@@ -177,15 +161,7 @@ protected Set<TableFeature> getAllowedTableFeatures() {
177161

178162
@Override
179163
List<IcebergCompatCheck> icebergCompatChecks() {
180-
return Stream.of(
181-
createUnsupportedFeaturesCheck(this), // Pass 'this' instance
182-
UNSUPPORTED_TYPES_CHECK,
183-
PHYSICAL_NAMES_MATCH_FIELD_IDS_CHECK,
184-
INVARIANTS_INACTIVE_CHECK,
185-
CHANGE_DATA_FEED_INACTIVE_CHECK,
186-
CHECK_CONSTRAINTS_INACTIVE_CHECK,
187-
IDENTITY_COLUMNS_INACTIVE_CHECK,
188-
GENERATED_COLUMNS_INACTIVE_CHECK)
164+
return Stream.concat(Stream.of(createUnsupportedFeaturesCheck(this)), COMMON_CHECKS.stream())
189165
.collect(toList());
190166
}
191167
}

0 commit comments

Comments
 (0)