Skip to content

Commit 0d1600e

Browse files
authored
[Kernel][#2] IcebergCompatV3 - move IcebergCompatCheck to base class (delta-io#4732)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR is refactoring only, move all the IcebergCompatChecks from `IcebergCompatV2MetadataValidatorAndUpdater.java` to its base class `IcebergCompatMetadataValidatorAndUpdater.java` so that later newly added `IcebergCompatV3MetadataValidatorAndUpdater.java` can use them. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Existing unit tests since it is only doing refactoring. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 8e6d9ec commit 0d1600e

File tree

3 files changed

+188
-142
lines changed

3 files changed

+188
-142
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergCompatMetadataValidatorAndUpdater.java

Lines changed: 177 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,27 @@
1515
*/
1616
package io.delta.kernel.internal.icebergcompat;
1717

18+
import static io.delta.kernel.internal.tablefeatures.TableFeatures.*;
19+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
1820
import static java.util.Collections.singletonMap;
21+
import static java.util.stream.Collectors.toList;
1922

2023
import io.delta.kernel.exceptions.KernelException;
2124
import io.delta.kernel.internal.DeltaErrors;
2225
import io.delta.kernel.internal.TableConfig;
2326
import io.delta.kernel.internal.actions.Metadata;
2427
import io.delta.kernel.internal.actions.Protocol;
2528
import io.delta.kernel.internal.tablefeatures.TableFeature;
26-
import java.util.List;
27-
import java.util.Optional;
29+
import io.delta.kernel.internal.types.TypeWideningChecker;
30+
import io.delta.kernel.internal.util.ColumnMapping;
31+
import io.delta.kernel.internal.util.SchemaIterable;
32+
import io.delta.kernel.internal.util.SchemaUtils;
33+
import io.delta.kernel.internal.util.Tuple2;
34+
import io.delta.kernel.types.*;
35+
import java.util.*;
2836
import java.util.function.Predicate;
37+
import java.util.stream.Collectors;
38+
import java.util.stream.Stream;
2939

3040
/**
3141
* Contains interfaces and common utility classes for defining the iceberg conversion compatibility
@@ -53,19 +63,25 @@ public abstract class IcebergCompatMetadataValidatorAndUpdater {
5363
/////////////////////////////////////////////////////////////////////////////////
5464
/** Defines the input context for the metadata validator and updater. */
5565
public static class IcebergCompatInputContext {
66+
final String compatFeatureName;
5667
final boolean isCreatingNewTable;
5768
final Metadata newMetadata;
5869
final Protocol newProtocol;
5970

6071
public IcebergCompatInputContext(
61-
boolean isCreatingNewTable, Metadata newMetadata, Protocol newProtocol) {
72+
String compatFeatureName,
73+
boolean isCreatingNewTable,
74+
Metadata newMetadata,
75+
Protocol newProtocol) {
76+
this.compatFeatureName = compatFeatureName;
6277
this.isCreatingNewTable = isCreatingNewTable;
6378
this.newMetadata = newMetadata;
6479
this.newProtocol = newProtocol;
6580
}
6681

6782
public IcebergCompatInputContext withUpdatedMetadata(Metadata newMetadata) {
68-
return new IcebergCompatInputContext(isCreatingNewTable, newMetadata, newProtocol);
83+
return new IcebergCompatInputContext(
84+
compatFeatureName, isCreatingNewTable, newMetadata, newProtocol);
6985
}
7086
}
7187

@@ -79,7 +95,7 @@ interface PostMetadataProcessor {
7995
* is not set, we will set it to a default value. It will also update the metadata to make it
8096
* compatible with Iceberg compat version targeted.
8197
*/
82-
static class IcebergCompatRequiredTablePropertyEnforcer<T> {
98+
protected static class IcebergCompatRequiredTablePropertyEnforcer<T> {
8399
public final TableConfig<T> property;
84100
public final Predicate<T> validator;
85101
public final String autoSetValue;
@@ -147,14 +163,169 @@ Optional<Metadata> validateAndUpdate(
147163
}
148164
}
149165

166+
/////////////////////////////////////////////////////////////////////////////////
167+
/// Implementation of {@link IcebergCompatRequiredTablePropertyEnforcer} ///
168+
/////////////////////////////////////////////////////////////////////////////////
169+
protected static final IcebergCompatRequiredTablePropertyEnforcer COLUMN_MAPPING_REQUIREMENT =
170+
new IcebergCompatRequiredTablePropertyEnforcer<>(
171+
TableConfig.COLUMN_MAPPING_MODE,
172+
(value) ->
173+
ColumnMapping.ColumnMappingMode.NAME == value
174+
|| ColumnMapping.ColumnMappingMode.ID == value,
175+
ColumnMapping.ColumnMappingMode.NAME.value);
176+
150177
/**
151178
* Defines checks for compatibility with the targeted iceberg features (icebergCompatV1 or
152179
* icebergCompatV2 etc.)
153180
*/
154-
interface IcebergCompatCheck {
181+
protected interface IcebergCompatCheck {
155182
void check(IcebergCompatInputContext inputContext);
156183
}
157184

185+
///////////////////////////////////////////////////////////
186+
/// Implementation of {@link IcebergCompatCheck} ///
187+
///////////////////////////////////////////////////////////
188+
protected static IcebergCompatCheck disallowOtherCompatVersions(List<String> incompatibleProps) {
189+
return (inputContext) -> {
190+
for (String prop : incompatibleProps) {
191+
if (Boolean.parseBoolean(
192+
inputContext.newMetadata.getConfiguration().getOrDefault(prop, "false"))) {
193+
throw DeltaErrors.icebergCompatIncompatibleVersionEnabled(
194+
inputContext.compatFeatureName, prop);
195+
}
196+
}
197+
};
198+
}
199+
200+
protected static final IcebergCompatCheck CHECK_ONLY_ICEBERG_COMPAT_V2_ENABLED =
201+
disallowOtherCompatVersions(
202+
Arrays.asList("delta.enableIcebergCompatV1", "delta.enableIcebergCompatV3"));
203+
204+
protected static final IcebergCompatCheck CHECK_ONLY_ICEBERG_COMPAT_V3_ENABLED =
205+
disallowOtherCompatVersions(
206+
Arrays.asList("delta.enableIcebergCompatV1", "delta.enableIcebergCompatV2"));
207+
208+
protected static IcebergCompatCheck hasOnlySupportedTypes(
209+
Set<Class<? extends DataType>> supportedTypes) {
210+
return (inputContext) -> {
211+
List<Tuple2<List<String>, StructField>> matches =
212+
SchemaUtils.filterRecursively(
213+
inputContext.newMetadata.getSchema(),
214+
/* recurseIntoMapAndArrayTypes= */ true,
215+
/* stopOnFirstMatch = */ false,
216+
field -> {
217+
DataType dataType = field.getDataType();
218+
for (Class<? extends DataType> clazz : supportedTypes) {
219+
if (clazz.isInstance(dataType)) return false;
220+
}
221+
return true;
222+
});
223+
224+
if (!matches.isEmpty()) {
225+
throw DeltaErrors.icebergCompatUnsupportedTypeColumns(
226+
inputContext.compatFeatureName,
227+
matches.stream().map(tuple -> tuple._2.getDataType()).collect(toList()));
228+
}
229+
};
230+
}
231+
232+
private static final Set<Class<? extends DataType>> V2_SUPPORTED_TYPES =
233+
new HashSet<>(
234+
Arrays.asList(
235+
ByteType.class,
236+
ShortType.class,
237+
IntegerType.class,
238+
LongType.class,
239+
FloatType.class,
240+
DoubleType.class,
241+
DecimalType.class,
242+
StringType.class,
243+
BinaryType.class,
244+
BooleanType.class,
245+
DateType.class,
246+
TimestampType.class,
247+
TimestampNTZType.class,
248+
ArrayType.class,
249+
MapType.class,
250+
StructType.class));
251+
252+
protected static final IcebergCompatCheck V2_CHECK_HAS_SUPPORTED_TYPES =
253+
hasOnlySupportedTypes(V2_SUPPORTED_TYPES);
254+
255+
protected static final IcebergCompatCheck V3_CHECK_HAS_SUPPORTED_TYPES =
256+
hasOnlySupportedTypes(
257+
Stream.concat(V2_SUPPORTED_TYPES.stream(), Stream.of(VariantType.class))
258+
.collect(Collectors.toSet()));
259+
260+
// These are the common supported partition types for both Iceberg compat V2 and V3
261+
protected static final IcebergCompatCheck CHECK_HAS_ALLOWED_PARTITION_TYPES =
262+
(inputContext) ->
263+
inputContext
264+
.newMetadata
265+
.getPartitionColNames()
266+
.forEach(
267+
partitionCol -> {
268+
int partitionFieldIndex =
269+
inputContext.newMetadata.getSchema().indexOf(partitionCol);
270+
checkArgument(
271+
partitionFieldIndex != -1,
272+
"Partition column %s not found in the schema",
273+
partitionCol);
274+
DataType dataType =
275+
inputContext.newMetadata.getSchema().at(partitionFieldIndex).getDataType();
276+
boolean validType =
277+
dataType instanceof ByteType
278+
|| dataType instanceof ShortType
279+
|| dataType instanceof IntegerType
280+
|| dataType instanceof LongType
281+
|| dataType instanceof FloatType
282+
|| dataType instanceof DoubleType
283+
|| dataType instanceof DecimalType
284+
|| dataType instanceof StringType
285+
|| dataType instanceof BinaryType
286+
|| dataType instanceof BooleanType
287+
|| dataType instanceof DateType
288+
|| dataType instanceof TimestampType
289+
|| dataType instanceof TimestampNTZType;
290+
if (!validType) {
291+
throw DeltaErrors.icebergCompatUnsupportedTypePartitionColumn(
292+
inputContext.compatFeatureName, dataType);
293+
}
294+
});
295+
296+
protected static final IcebergCompatCheck CHECK_HAS_NO_PARTITION_EVOLUTION =
297+
(inputContext) -> {
298+
// TODO: Kernel doesn't support replace table yet. When it is supported, extend
299+
// this to allow checking the partition columns aren't changed
300+
};
301+
302+
protected static final IcebergCompatCheck CHECK_HAS_NO_DELETION_VECTORS =
303+
(inputContext) -> {
304+
if (inputContext.newProtocol.supportsFeature(DELETION_VECTORS_RW_FEATURE)) {
305+
throw DeltaErrors.icebergCompatIncompatibleTableFeatures(
306+
inputContext.compatFeatureName, Collections.singleton(DELETION_VECTORS_RW_FEATURE));
307+
}
308+
};
309+
310+
protected static final IcebergCompatCheck CHECK_HAS_SUPPORTED_TYPE_WIDENING =
311+
(inputContext) -> {
312+
Protocol protocol = inputContext.newProtocol;
313+
if (!protocol.supportsFeature(TYPE_WIDENING_RW_FEATURE)
314+
&& !protocol.supportsFeature(TYPE_WIDENING_RW_PREVIEW_FEATURE)) {
315+
return;
316+
}
317+
for (SchemaIterable.SchemaElement element :
318+
new SchemaIterable(inputContext.newMetadata.getSchema())) {
319+
for (TypeChange typeChange : element.getField().getTypeChanges()) {
320+
if (!TypeWideningChecker.isIcebergV2Compatible(
321+
typeChange.getFrom(), typeChange.getTo())) {
322+
throw DeltaErrors.icebergCompatUnsupportedTypeWidening(
323+
inputContext.compatFeatureName, typeChange);
324+
}
325+
}
326+
}
327+
};
328+
158329
/////////////////////////////////////////////////////////////////////////////////
159330
/// Implementation of {@link IcebergCompatMetadataValidatorAndUpdater} ///
160331
/////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)