Skip to content

Commit d90b060

Browse files
authored
[Kernel][#2] Refactor WriterCompat metadata validator and updater (delta-io#4734)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR is refactoring only, move all the validation checks from `IcebergWriterCompatV1MetadataValidatorAndUpdater.java` to a new base class `IcebergWriterCompatMetadataValidatorAndUpdater.java` so that later newly added `IcebergWriterCompatV3MetadataValidatorAndUpdater.java` can use them. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Existing unit tests since it is only doing refactoring. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 7e2a01c commit d90b060

File tree

2 files changed

+254
-142
lines changed

2 files changed

+254
-142
lines changed
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.icebergcompat;
17+
18+
import static io.delta.kernel.internal.tablefeatures.TableFeatures.*;
19+
import static io.delta.kernel.internal.util.SchemaUtils.concatWithDot;
20+
import static java.util.stream.Collectors.toList;
21+
22+
import io.delta.kernel.internal.DeltaErrors;
23+
import io.delta.kernel.internal.TableConfig;
24+
import io.delta.kernel.internal.tablefeatures.TableFeature;
25+
import io.delta.kernel.internal.tablefeatures.TableFeatures;
26+
import io.delta.kernel.internal.util.ColumnMapping;
27+
import io.delta.kernel.internal.util.SchemaUtils;
28+
import io.delta.kernel.internal.util.Tuple2;
29+
import io.delta.kernel.types.*;
30+
import java.util.*;
31+
32+
/**
33+
* Contains interfaces and common utility classes performing the validations and updates necessary
34+
* to support the table feature IcebergWriterCompats when it is enabled by the table properties such
35+
* as "delta.enableIcebergWriterCompatV3".
36+
*/
37+
abstract class IcebergWriterCompatMetadataValidatorAndUpdater
38+
extends IcebergCompatMetadataValidatorAndUpdater {
39+
/////////////////////////////////////////////////////////////////////////////////
40+
/// Interfaces for defining validations and updates necessary to support IcebergWriterCompats
41+
// ///
42+
/////////////////////////////////////////////////////////////////////////////////
43+
public static void validateIcebergWriterCompatChange(
44+
Map<String, String> oldConfig,
45+
Map<String, String> newConfig,
46+
boolean isNewTable,
47+
TableConfig<Boolean> writerCompatProperty) {
48+
if (!isNewTable) {
49+
boolean wasEnabled = writerCompatProperty.fromMetadata(oldConfig);
50+
boolean isEnabled = writerCompatProperty.fromMetadata(newConfig);
51+
if (!wasEnabled && isEnabled) {
52+
throw DeltaErrors.enablingIcebergWriterCompatV1OnExistingTable(
53+
writerCompatProperty.getKey());
54+
}
55+
if (wasEnabled && !isEnabled) {
56+
throw DeltaErrors.disablingIcebergWriterCompatV1OnExistingTable(
57+
writerCompatProperty.getKey());
58+
}
59+
}
60+
}
61+
62+
/**
63+
* Common property enforcer for Column Mapping ID mode requirement. This is identical across all
64+
* Writer Compat versions.
65+
*/
66+
protected static final IcebergCompatRequiredTablePropertyEnforcer CM_ID_MODE_ENABLED =
67+
new IcebergCompatRequiredTablePropertyEnforcer<>(
68+
TableConfig.COLUMN_MAPPING_MODE,
69+
(value) -> ColumnMapping.ColumnMappingMode.ID == value,
70+
ColumnMapping.ColumnMappingMode.ID.value,
71+
// We need to update the CM info in the schema here because we check that the physical
72+
// name is correctly set as part of icebergWriterCompat checks
73+
(inputContext) ->
74+
ColumnMapping.updateColumnMappingMetadataIfNeeded(
75+
inputContext.newMetadata, inputContext.isCreatingNewTable));
76+
77+
protected static IcebergCompatCheck createUnsupportedFeaturesCheck(
78+
IcebergWriterCompatMetadataValidatorAndUpdater instance) {
79+
return (inputContext) -> {
80+
Set<TableFeature> allowedTableFeatures = instance.getAllowedTableFeatures();
81+
if (!allowedTableFeatures.containsAll(
82+
inputContext.newProtocol.getImplicitlyAndExplicitlySupportedFeatures())) {
83+
Set<TableFeature> incompatibleFeatures =
84+
inputContext.newProtocol.getImplicitlyAndExplicitlySupportedFeatures();
85+
incompatibleFeatures.removeAll(allowedTableFeatures);
86+
throw DeltaErrors.icebergCompatIncompatibleTableFeatures(
87+
inputContext.compatFeatureName, incompatibleFeatures);
88+
}
89+
};
90+
}
91+
92+
/**
93+
* Checks that there are no unsupported types in the schema. Data types {@link ByteType} and
94+
* {@link ShortType} are unsupported for IcebergWriterCompatV1 and V3 tables.
95+
*/
96+
protected static final IcebergCompatCheck UNSUPPORTED_TYPES_CHECK =
97+
(inputContext) -> {
98+
List<Tuple2<List<String>, StructField>> matches =
99+
SchemaUtils.filterRecursively(
100+
inputContext.newMetadata.getSchema(),
101+
/* recurseIntoMapAndArrayTypes= */ true,
102+
/* stopOnFirstMatch = */ false,
103+
field -> {
104+
DataType dataType = field.getDataType();
105+
return (dataType instanceof ByteType || dataType instanceof ShortType);
106+
});
107+
108+
if (!matches.isEmpty()) {
109+
throw DeltaErrors.icebergCompatUnsupportedTypeColumns(
110+
inputContext.compatFeatureName,
111+
matches.stream().map(tuple -> tuple._2.getDataType()).collect(toList()));
112+
}
113+
};
114+
115+
/**
116+
* Checks that in the schema column mapping is set up such that the physicalName is equal to
117+
* "col-[fieldId]". This check assumes column mapping is enabled (and so should be performed after
118+
* that check).
119+
*/
120+
protected static final IcebergCompatCheck PHYSICAL_NAMES_MATCH_FIELD_IDS_CHECK =
121+
(inputContext) -> {
122+
List<Tuple2<List<String>, StructField>> invalidFields =
123+
SchemaUtils.filterRecursively(
124+
inputContext.newMetadata.getSchema(),
125+
/* recurseIntoMapAndArrayTypes= */ true,
126+
/* stopOnFirstMatch = */ false,
127+
field -> {
128+
String physicalName = ColumnMapping.getPhysicalName(field);
129+
long columnId = ColumnMapping.getColumnId(field);
130+
return !physicalName.equals(String.format("col-%s", columnId));
131+
});
132+
if (!invalidFields.isEmpty()) {
133+
List<String> invalidFieldsFormatted =
134+
invalidFields.stream()
135+
.map(
136+
pair ->
137+
String.format(
138+
"%s(physicalName='%s', columnId=%s)",
139+
concatWithDot(pair._1),
140+
ColumnMapping.getPhysicalName(pair._2),
141+
ColumnMapping.getColumnId(pair._2)))
142+
.collect(toList());
143+
throw DeltaErrors.icebergWriterCompatInvalidPhysicalName(invalidFieldsFormatted);
144+
}
145+
};
146+
147+
/**
148+
* Checks that the table feature `invariants` is not active in the table, meaning there are no
149+
* invariants stored in the table schema.
150+
*/
151+
protected static final IcebergCompatCheck INVARIANTS_INACTIVE_CHECK =
152+
(inputContext) -> {
153+
// Note - since Kernel currently does not support the table feature `invariants` we will not
154+
// hit this check for E2E writes since we will fail early due to unsupported write
155+
// If Kernel starts supporting the feature `invariants` this check will become applicable
156+
if (TableFeatures.hasInvariants(inputContext.newMetadata.getSchema())) {
157+
throw DeltaErrors.icebergCompatIncompatibleTableFeatures(
158+
inputContext.compatFeatureName, Collections.singleton(INVARIANTS_W_FEATURE));
159+
}
160+
};
161+
162+
/**
163+
* Checks that the table feature `changeDataFeed` is not active in the table, meaning the table
164+
* property `delta.enableChangeDataFeed` is not enabled.
165+
*/
166+
protected static final IcebergCompatCheck CHANGE_DATA_FEED_INACTIVE_CHECK =
167+
(inputContext) -> {
168+
// Note - since Kernel currently does not support the table feature `changeDataFeed` we will
169+
// not hit this check for E2E writes since we will fail early due to unsupported write
170+
// If Kernel starts supporting the feature `changeDataFeed` this check will become
171+
// applicable
172+
if (TableConfig.CHANGE_DATA_FEED_ENABLED.fromMetadata(inputContext.newMetadata)) {
173+
throw DeltaErrors.icebergCompatIncompatibleTableFeatures(
174+
inputContext.compatFeatureName, Collections.singleton(CHANGE_DATA_FEED_W_FEATURE));
175+
}
176+
};
177+
178+
/**
179+
* Checks that the table feature `checkConstraints` is not active in the table, meaning the table
180+
* has no check constraints stored in its metadata configuration.
181+
*/
182+
protected static final IcebergCompatCheck CHECK_CONSTRAINTS_INACTIVE_CHECK =
183+
(inputContext) -> {
184+
// Note - since Kernel currently does not support the table feature `checkConstraints` we
185+
// will
186+
// not hit this check for E2E writes since we will fail early due to unsupported write
187+
// If Kernel starts supporting the feature `checkConstraints` this check will become
188+
// applicable
189+
if (TableFeatures.hasCheckConstraints(inputContext.newMetadata)) {
190+
throw DeltaErrors.icebergCompatIncompatibleTableFeatures(
191+
inputContext.compatFeatureName, Collections.singleton(CONSTRAINTS_W_FEATURE));
192+
}
193+
};
194+
195+
/**
196+
* Checks that the table feature `identityColumns` is not active in the table, meaning no identity
197+
* columns exist in the table schema.
198+
*/
199+
protected static final IcebergCompatCheck IDENTITY_COLUMNS_INACTIVE_CHECK =
200+
(inputContext) -> {
201+
// Note - since Kernel currently does not support the table feature `identityColumns` we
202+
// will
203+
// not hit this check for E2E writes since we will fail early due to unsupported write
204+
// If Kernel starts supporting the feature `identityColumns` this check will become
205+
// applicable
206+
if (TableFeatures.hasIdentityColumns(inputContext.newMetadata)) {
207+
throw DeltaErrors.icebergCompatIncompatibleTableFeatures(
208+
inputContext.compatFeatureName, Collections.singleton(IDENTITY_COLUMNS_W_FEATURE));
209+
}
210+
};
211+
212+
/**
213+
* Checks that the table feature `generatedColumns` is not active in the table, meaning no
214+
* generated columns exist in the table schema.
215+
*/
216+
protected static final IcebergCompatCheck GENERATED_COLUMNS_INACTIVE_CHECK =
217+
(inputContext) -> {
218+
// Note - since Kernel currently does not support the table feature `generatedColumns` we
219+
// will
220+
// not hit this check for E2E writes since we will fail early due to unsupported write
221+
// If Kernel starts supporting the feature `generatedColumns` this check will become
222+
// applicable
223+
if (TableFeatures.hasGeneratedColumns(inputContext.newMetadata)) {
224+
throw DeltaErrors.icebergCompatIncompatibleTableFeatures(
225+
inputContext.compatFeatureName, Collections.singleton(GENERATED_COLUMNS_W_FEATURE));
226+
}
227+
};
228+
229+
@Override
230+
abstract String compatFeatureName();
231+
232+
@Override
233+
abstract TableConfig<Boolean> requiredDeltaTableProperty();
234+
235+
@Override
236+
abstract List<IcebergCompatRequiredTablePropertyEnforcer> requiredDeltaTableProperties();
237+
238+
@Override
239+
abstract List<TableFeature> requiredDependencyTableFeatures();
240+
241+
@Override
242+
abstract List<IcebergCompatCheck> icebergCompatChecks();
243+
244+
abstract Set<TableFeature> getAllowedTableFeatures();
245+
}

0 commit comments

Comments
 (0)