Skip to content

Commit 8915932

Browse files
authored
[Kernel][Type Widening] 10/ Add iceberg compat check for type changes (delta-io#4603)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/4603/files) to review incremental changes. - [**stack/tw_upgrade_downgrade**](delta-io#4603) [[Files changed](https://github.com/delta-io/delta/pull/4603/files)] --------- #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description This PR adds a validator to ensure only iceberg compatible type changes are present before enabling IcebergCompatV2. Resolves delta-io#4491 ## How was this patch tested? Added direct unit tests, and integration tests to check that the feature can be turned on with valid type changes, and throws an exception when invalid type changes are present. ## Does this PR introduce _any_ user-facing changes? This adds validation for type-widening so if there are tables that do not follow the protocol, it could start rejecting some writes.
1 parent 0f8c501 commit 8915932

File tree

7 files changed

+277
-6
lines changed

7 files changed

+277
-6
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.delta.kernel.internal.tablefeatures.TableFeature;
2424
import io.delta.kernel.types.DataType;
2525
import io.delta.kernel.types.StructType;
26+
import io.delta.kernel.types.TypeChange;
2627
import io.delta.kernel.utils.DataFileStatus;
2728
import java.io.IOException;
2829
import java.sql.Timestamp;
@@ -297,6 +298,13 @@ public static KernelException icebergCompatUnsupportedTypeColumns(
297298
format("%s does not support the data types: %s.", compatVersion, dataTypes));
298299
}
299300

301+
public static KernelException icebergCompatUnsupportedTypeWidening(
302+
String compatVersion, TypeChange typeChange) {
303+
throw new KernelException(
304+
format(
305+
"%s does not support type widening present in table: %s.", compatVersion, typeChange));
306+
}
307+
300308
public static KernelException icebergCompatUnsupportedTypePartitionColumn(
301309
String compatVersion, DataType dataType) {
302310
throw new KernelException(

kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergCompatV2MetadataValidatorAndUpdater.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import io.delta.kernel.internal.actions.Metadata;
2626
import io.delta.kernel.internal.actions.Protocol;
2727
import io.delta.kernel.internal.tablefeatures.TableFeature;
28+
import io.delta.kernel.internal.types.TypeWideningChecker;
2829
import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode;
30+
import io.delta.kernel.internal.util.SchemaIterable;
2931
import io.delta.kernel.internal.util.SchemaUtils;
3032
import io.delta.kernel.internal.util.Tuple2;
3133
import io.delta.kernel.types.*;
@@ -175,6 +177,25 @@ public static void validateDataFileStatus(DataFileStatus dataFileStatus) {
175177
}
176178
};
177179

180+
private static final IcebergCompatCheck ICEBERG_COMPAT_V2_CHECK_HAS_SUPPORTED_TYPE_WIDENING =
181+
(inputContext) -> {
182+
Protocol protocol = inputContext.newProtocol;
183+
if (!protocol.supportsFeature(TYPE_WIDENING_RW_FEATURE)
184+
&& !protocol.supportsFeature(TYPE_WIDENING_RW_PREVIEW_FEATURE)) {
185+
return;
186+
}
187+
for (SchemaIterable.SchemaElement element :
188+
new SchemaIterable(inputContext.newMetadata.getSchema())) {
189+
for (TypeChange typeChange : element.getField().getTypeChanges()) {
190+
if (!TypeWideningChecker.isIcebergV2Compatible(
191+
typeChange.getFrom(), typeChange.getTo())) {
192+
throw DeltaErrors.icebergCompatUnsupportedTypeWidening(
193+
INSTANCE.compatFeatureName(), typeChange);
194+
}
195+
}
196+
}
197+
};
198+
178199
@Override
179200
String compatFeatureName() {
180201
return "icebergCompatV2";
@@ -202,7 +223,8 @@ List<IcebergCompatCheck> icebergCompatChecks() {
202223
ICEBERG_COMPAT_V2_CHECK_HAS_SUPPORTED_TYPES,
203224
ICEBERG_COMPAT_V2_CHECK_HAS_ALLOWED_PARTITION_TYPES,
204225
ICEBERG_COMPAT_V2_CHECK_HAS_NO_PARTITION_EVOLUTION,
205-
ICEBERG_COMPAT_V2_CHECK_HAS_NO_DELETION_VECTORS)
226+
ICEBERG_COMPAT_V2_CHECK_HAS_NO_DELETION_VECTORS,
227+
ICEBERG_COMPAT_V2_CHECK_HAS_SUPPORTED_TYPE_WIDENING)
206228
.collect(toList());
207229
}
208230
}

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergCompatV2MetadataValidatorAndUpdaterSuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,50 @@ class IcebergCompatV2MetadataValidatorAndUpdaterSuite
161161
validateAndUpdateIcebergCompatV2Metadata(isNewTable, metadata, protocol)
162162
}
163163

164+
test("compatible type widening is allowed with icebergCompatV2") {
165+
val schema = new StructType()
166+
.add(
167+
new StructField(
168+
"intToLong",
169+
IntegerType.INTEGER,
170+
true,
171+
FieldMetadata.empty()).withTypeChanges(
172+
Seq(new TypeChange(IntegerType.INTEGER, LongType.LONG)).asJava))
173+
.add(
174+
new StructField(
175+
"decimalToDecimal",
176+
new DecimalType(10, 2),
177+
true,
178+
FieldMetadata.empty()).withTypeChanges(
179+
Seq(new TypeChange(new DecimalType(5, 2), new DecimalType(10, 2))).asJava))
180+
181+
val metadata = getCompatEnabledMetadata(schema)
182+
val protocol = getCompatEnabledProtocol(TYPE_WIDENING_RW_FEATURE)
183+
184+
// This should not throw an exception
185+
validateAndUpdateIcebergCompatV2Metadata(false, metadata, protocol)
186+
}
187+
188+
test("incompatible type widening throws exception with icebergCompatV2") {
189+
val schema = new StructType()
190+
.add(
191+
new StructField(
192+
"dateToTimestamp",
193+
TimestampNTZType.TIMESTAMP_NTZ,
194+
true,
195+
FieldMetadata.empty()).withTypeChanges(
196+
Seq(new TypeChange(DateType.DATE, TimestampNTZType.TIMESTAMP_NTZ)).asJava))
197+
198+
val metadata = getCompatEnabledMetadata(schema)
199+
val protocol = getCompatEnabledProtocol(TYPE_WIDENING_RW_FEATURE)
200+
201+
val e = intercept[KernelException] {
202+
validateAndUpdateIcebergCompatV2Metadata(false, metadata, protocol)
203+
}
204+
205+
assert(e.getMessage.contains("icebergCompatV2 does not support type widening present in table"))
206+
}
207+
164208
Seq(true, false).foreach { isNewTable =>
165209
test(s"protocol is missing required column mapping feature, isNewTable $isNewTable") {
166210
val schema = new StructType().add("col", BooleanType.BOOLEAN)

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite.scala

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import io.delta.kernel.internal.TableConfig
2222
import io.delta.kernel.internal.actions.{Metadata, Protocol}
2323
import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV1MetadataValidatorAndUpdater.validateAndUpdateIcebergWriterCompatV1Metadata
2424
import io.delta.kernel.internal.tablefeatures.TableFeature
25-
import io.delta.kernel.internal.tablefeatures.TableFeatures.{COLUMN_MAPPING_RW_FEATURE, ICEBERG_COMPAT_V2_W_FEATURE, ICEBERG_WRITER_COMPAT_V1}
25+
import io.delta.kernel.internal.tablefeatures.TableFeatures.{COLUMN_MAPPING_RW_FEATURE, ICEBERG_COMPAT_V2_W_FEATURE, ICEBERG_WRITER_COMPAT_V1, TYPE_WIDENING_RW_FEATURE}
2626
import io.delta.kernel.internal.util.ColumnMapping
27-
import io.delta.kernel.types.{ByteType, DataType, FieldMetadata, IntegerType, ShortType, StructType}
27+
import io.delta.kernel.types.{ByteType, DataType, DecimalType, FieldMetadata, IntegerType, LongType, ShortType, StructField, StructType, TypeChange}
2828

2929
class IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite
3030
extends IcebergCompatV2MetadataValidatorAndUpdaterSuiteBase {
@@ -274,6 +274,52 @@ class IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite
274274
validateAndUpdateIcebergWriterCompatV1Metadata(false, metadata, protocol)
275275
}
276276

277+
test("compatible type widening is allowed with icebergWriterCompatV1") {
278+
val schema = new StructType()
279+
.add(
280+
new StructField(
281+
"intToLong",
282+
IntegerType.INTEGER,
283+
true,
284+
FieldMetadata.builder()
285+
.putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1)
286+
.putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "col-1")
287+
.build()).withTypeChanges(Seq(new TypeChange(
288+
IntegerType.INTEGER,
289+
LongType.LONG)).asJava))
290+
291+
val metadata = getCompatEnabledMetadata(schema)
292+
.withMergedConfiguration(Map(ColumnMapping.COLUMN_MAPPING_MAX_COLUMN_ID_KEY -> "1").asJava)
293+
val protocol = getCompatEnabledProtocol(TYPE_WIDENING_RW_FEATURE)
294+
295+
// This should not throw an exception
296+
validateAndUpdateIcebergWriterCompatV1Metadata(false, metadata, protocol)
297+
}
298+
299+
test("incompatible type widening throws exception with icebergWriterCompatV1") {
300+
val schema = new StructType()
301+
.add(
302+
new StructField(
303+
"intToLong",
304+
IntegerType.INTEGER,
305+
true,
306+
FieldMetadata.builder()
307+
.putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1)
308+
.putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "col-1")
309+
.build()).withTypeChanges(
310+
Seq(new TypeChange(ByteType.BYTE, new DecimalType(10, 0))).asJava))
311+
312+
val metadata = getCompatEnabledMetadata(schema)
313+
.withMergedConfiguration(Map(ColumnMapping.COLUMN_MAPPING_MAX_COLUMN_ID_KEY -> "1").asJava)
314+
val protocol = getCompatEnabledProtocol(TYPE_WIDENING_RW_FEATURE)
315+
316+
val e = intercept[KernelException] {
317+
validateAndUpdateIcebergWriterCompatV1Metadata(false, metadata, protocol)
318+
}
319+
320+
assert(e.getMessage.contains("icebergCompatV2 does not support type widening present in table"))
321+
}
322+
277323
private def checkUnsupportedOrIncompatibleFeature(
278324
tableFeature: String,
279325
expectedErrorMessageContains: String): Unit = {

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/types/TypeWideningCheckerSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,22 @@ class TypeWideningCheckerSuite extends AnyFunSuite {
161161
}
162162

163163
test("iceberg V2 unsupported type widening") {
164+
/////////////////////////////////////////////////////////////////////////////////////
165+
// Test generally unsupported widening operations
166+
/////////////////////////////////////////////////////////////////////////////////////
167+
assert(!TypeWideningChecker.isIcebergV2Compatible(StringType.STRING, BinaryType.BINARY))
168+
assert(!TypeWideningChecker.isIcebergV2Compatible(IntegerType.INTEGER, StringType.STRING))
169+
assert(!TypeWideningChecker.isIcebergV2Compatible(DateType.DATE, StringType.STRING))
170+
assert(!TypeWideningChecker.isIcebergV2Compatible(DoubleType.DOUBLE, new DecimalType(10, 2)))
171+
assert(!TypeWideningChecker.isIcebergV2Compatible(DateType.DATE, TimestampType.TIMESTAMP))
172+
assert(!TypeWideningChecker.isIcebergV2Compatible(
173+
TimestampNTZType.TIMESTAMP_NTZ,
174+
DateType.DATE))
175+
176+
////////////////////////////////////////////////////////////////////////////////////
177+
// Test invalid widening that are generally supported by Delta but not by Iceberg V2
178+
////////////////////////////////////////////////////////////////////////////////////
179+
164180
// Integer to Double widening (not supported by Iceberg)
165181
assert(!TypeWideningChecker.isIcebergV2Compatible(ByteType.BYTE, DoubleType.DOUBLE))
166182
assert(!TypeWideningChecker.isIcebergV2Compatible(ShortType.SHORT, DoubleType.DOUBLE))

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaIcebergCompatV2Suite.scala

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.delta.kernel.defaults
1717

18+
import scala.collection.JavaConverters._
1819
import scala.collection.immutable.Seq
1920
import scala.reflect.ClassTag
2021

@@ -23,8 +24,8 @@ import io.delta.kernel.data.Row
2324
import io.delta.kernel.exceptions.KernelException
2425
import io.delta.kernel.internal.TableConfig
2526
import io.delta.kernel.internal.tablefeatures.TableFeatures
26-
import io.delta.kernel.internal.util.{ColumnMappingSuiteBase, VectorUtils}
27-
import io.delta.kernel.types.{DataType, StructType}
27+
import io.delta.kernel.internal.util.{ColumnMapping, ColumnMappingSuiteBase, VectorUtils}
28+
import io.delta.kernel.types.{DataType, DateType, FieldMetadata, IntegerType, LongType, StructField, StructType, TimestampNTZType, TypeChange}
2829

2930
/** This suite tests reading or writing into Delta table that have `icebergCompatV2` enabled. */
3031
class DeltaIcebergCompatV2Suite extends DeltaTableWriteSuiteBase with ColumnMappingSuiteBase {
@@ -222,6 +223,87 @@ class DeltaIcebergCompatV2Suite extends DeltaTableWriteSuiteBase with ColumnMapp
222223
}
223224
}
224225

226+
test("compatible type widening is allowed with icebergCompatV2") {
227+
withTempDirAndEngine { (tablePath, engine) =>
228+
// Create a table with icebergCompatV2 and type widening enabled
229+
val schema = new StructType()
230+
.add(new StructField(
231+
"intToLong",
232+
LongType.LONG,
233+
false).withTypeChanges(Seq(new TypeChange(IntegerType.INTEGER, LongType.LONG)).asJava))
234+
235+
val tblProps = Map(
236+
TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true",
237+
TableConfig.TYPE_WIDENING_ENABLED.getKey -> "true")
238+
239+
// This should not throw an exception
240+
createEmptyTable(engine, tablePath, schema, tableProperties = tblProps)
241+
appendData(engine, tablePath, data = Seq.empty)
242+
243+
val protocol = getProtocol(engine, tablePath)
244+
assert(protocol.supportsFeature(TableFeatures.TYPE_WIDENING_RW_FEATURE))
245+
val metadata = getMetadata(engine, tablePath)
246+
assert(metadata.getSchema.get("intToLong").getTypeChanges.asScala == schema.get(
247+
"intToLong").getTypeChanges.asScala)
248+
}
249+
}
250+
251+
test("incompatible type widening throws exception with icebergCompatV2") {
252+
withTempDirAndEngine { (tablePath, engine) =>
253+
// Try to create a table with icebergCompatV2 and incompatible type widening
254+
val schema = new StructType()
255+
.add(
256+
new StructField(
257+
"dateToTimestamp",
258+
TimestampNTZType.TIMESTAMP_NTZ,
259+
false).withTypeChanges(Seq(
260+
new TypeChange(DateType.DATE, TimestampNTZType.TIMESTAMP_NTZ)).asJava))
261+
262+
val tblProps = Map(
263+
TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true",
264+
TableConfig.TYPE_WIDENING_ENABLED.getKey -> "true")
265+
266+
val e = intercept[KernelException] {
267+
createEmptyTable(engine, tablePath, schema, tableProperties = tblProps)
268+
}
269+
270+
assert(
271+
e.getMessage.contains("icebergCompatV2 does not support type widening present in table"))
272+
}
273+
}
274+
275+
test(
276+
"incompatible type widening throws exception with icebergCompatV2 enabled on existing table") {
277+
withTempDirAndEngine { (tablePath, engine) =>
278+
val schema = new StructType()
279+
.add(new StructField(
280+
"dateToTimestamp",
281+
TimestampNTZType.TIMESTAMP_NTZ,
282+
false,
283+
FieldMetadata.builder()
284+
.putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1)
285+
.putString(
286+
ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY,
287+
"col-1").build()).withTypeChanges(
288+
Seq(new TypeChange(DateType.DATE, TimestampNTZType.TIMESTAMP_NTZ)).asJava))
289+
290+
val tblProps = Map(TableConfig.TYPE_WIDENING_ENABLED.getKey -> "true")
291+
createEmptyTable(engine, tablePath, schema, tableProperties = tblProps)
292+
293+
val e = intercept[KernelException] {
294+
updateTableMetadata(
295+
engine,
296+
tablePath,
297+
tableProperties = Map(
298+
TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true",
299+
TableConfig.COLUMN_MAPPING_MODE.getKey -> "ID"))
300+
}
301+
302+
assert(
303+
e.getMessage.contains("icebergCompatV2 does not support type widening present in table"))
304+
}
305+
}
306+
225307
/**
226308
* Utility that checks after executing given fn gets the given exception and error message.
227309
* [[ClassTag]] is used to preserve the type information during the runtime.

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/IcebergWriterCompatV1Suite.scala

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAn
2727
import io.delta.kernel.internal.tablefeatures.TableFeatures
2828
import io.delta.kernel.internal.util.{ColumnMapping, ColumnMappingSuiteBase}
2929
import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode
30-
import io.delta.kernel.types.{ByteType, DataType, FieldMetadata, IntegerType, ShortType, StructType, TimestampNTZType, VariantType}
30+
import io.delta.kernel.types.{ByteType, DataType, DateType, FieldMetadata, IntegerType, LongType, ShortType, StructField, StructType, TimestampNTZType, TypeChange, VariantType}
3131
import io.delta.kernel.utils.CloseableIterable.emptyIterable
3232

3333
import org.assertj.core.api.Assertions.assertThat
@@ -642,6 +642,59 @@ class IcebergWriterCompatV1Suite extends DeltaTableWriteSuiteBase with ColumnMap
642642
}
643643
}
644644

645+
test("compatible type widening is allowed with icebergWriterCompatV1") {
646+
withTempDirAndEngine { (tablePath, engine) =>
647+
// Create a table with icebergWriterCompatV1 and type widening enabled
648+
val schema = new StructType()
649+
.add(new StructField(
650+
"intToLong",
651+
LongType.LONG,
652+
false,
653+
FieldMetadata.builder()
654+
.putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1)
655+
.putString(
656+
ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY,
657+
"col-1").build()).withTypeChanges(
658+
Seq(new TypeChange(IntegerType.INTEGER, LongType.LONG)).asJava))
659+
660+
val tblProps = tblPropertiesIcebergWriterCompatV1Enabled ++
661+
Map(TableConfig.TYPE_WIDENING_ENABLED.getKey -> "true")
662+
663+
// This should not throw an exception
664+
createEmptyTable(engine, tablePath, schema, tableProperties = tblProps)
665+
666+
val protocol = getProtocol(engine, tablePath)
667+
assert(protocol.supportsFeature(TableFeatures.TYPE_WIDENING_RW_FEATURE))
668+
}
669+
}
670+
671+
test("incompatible type widening throws exception with icebergWriterCompatV1 on new Table") {
672+
withTempDirAndEngine { (tablePath, engine) =>
673+
// Try to create a table with icebergWriterCompatV1 and incompatible type widening
674+
val schema = new StructType()
675+
.add(new StructField(
676+
"dateToTimestamp",
677+
TimestampNTZType.TIMESTAMP_NTZ,
678+
false,
679+
FieldMetadata.builder()
680+
.putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1)
681+
.putString(
682+
ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY,
683+
"col-1").build()).withTypeChanges(
684+
Seq(new TypeChange(DateType.DATE, TimestampNTZType.TIMESTAMP_NTZ)).asJava))
685+
686+
val tblProps = tblPropertiesIcebergWriterCompatV1Enabled ++
687+
Map(TableConfig.TYPE_WIDENING_ENABLED.getKey -> "true")
688+
689+
val e = intercept[KernelException] {
690+
createEmptyTable(engine, tablePath, schema, tableProperties = tblProps)
691+
}
692+
693+
assert(
694+
e.getMessage.contains("icebergCompatV2 does not support type widening present in table"))
695+
}
696+
}
697+
645698
/* -------------------- Enforcements blocked by icebergCompatV2 -------------------- */
646699
// We test the deletionVector checks above as part of blocked table feature tests
647700

0 commit comments

Comments
 (0)