Skip to content

Commit c8d93d0

Browse files
authored
[Kernel][#3][test-only] IcebergCompatV3 - refactor IcebergCompatMetadataValidatorAndUpdaterSuites (delta-io#4739)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR is a refactor-only change that restructures the test suites related to IcebergCompatMetadataValidatorAndUpdater. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Existing test suites. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 6cd345a commit c8d93d0

File tree

5 files changed

+313
-219
lines changed

5 files changed

+313
-219
lines changed
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.icebergcompat
17+
18+
import java.util.Optional
19+
20+
import scala.collection.JavaConverters._
21+
22+
import io.delta.kernel.exceptions.KernelException
23+
import io.delta.kernel.internal.actions.{Metadata, Protocol}
24+
import io.delta.kernel.internal.tablefeatures.TableFeature
25+
import io.delta.kernel.internal.tablefeatures.TableFeatures.{COLUMN_MAPPING_RW_FEATURE, DELETION_VECTORS_RW_FEATURE, ICEBERG_COMPAT_V2_W_FEATURE, TYPE_WIDENING_RW_FEATURE, TYPE_WIDENING_RW_PREVIEW_FEATURE}
26+
import io.delta.kernel.internal.util.ColumnMappingSuiteBase
27+
import io.delta.kernel.test.VectorTestUtils
28+
import io.delta.kernel.types._
29+
30+
import org.scalatest.funsuite.AnyFunSuite
31+
32+
/**
33+
* Base trait for testing Iceberg compatibility metadata validation and updates.
34+
* This trait provides common functionality and test cases
35+
* that can be used by both writer and compat test suites.
36+
*/
37+
trait IcebergCompatMetadataValidatorAndUpdaterSuiteBase extends AnyFunSuite
38+
with VectorTestUtils with ColumnMappingSuiteBase {
39+
40+
/** The version of Iceberg compatibility being tested (e.g., "V2" or "V3") */
41+
def icebergCompatVersion: String
42+
43+
/** When testing supported simple column types skip any types defined here */
44+
def simpleTypesToSkip: Set[DataType]
45+
46+
/** Get a metadata with the given schema and partCols with the desired icebergCompat enabled */
47+
def getCompatEnabledMetadata(
48+
schema: StructType,
49+
partCols: Seq[String] = Seq.empty): Metadata
50+
51+
/** Get a protocol with features needed for the desired icebergCompat plus the `tableFeatures` */
52+
def getCompatEnabledProtocol(tableFeatures: TableFeature*): Protocol
53+
54+
/** Run the desired validate and update metadata method that triggers icebergCompat checks */
55+
def validateAndUpdateIcebergCompatMetadata(
56+
isNewTable: Boolean,
57+
metadata: Metadata,
58+
protocol: Protocol): Optional[Metadata]
59+
60+
/** Returns a [[Metadata]] instance with IcebergCompat feature and column mapping mode enabled */
61+
def withIcebergCompatAndCMEnabled(schema: StructType, partCols: Seq[String]): Metadata
62+
63+
/** Get the set of supported data column types */
64+
def supportedDataColumnTypes: Set[DataType]
65+
66+
/** Get the set of unsupported data column types */
67+
def unsupportedDataColumnTypes: Set[DataType]
68+
69+
/** Get the set of unsupported partition column types */
70+
def unsupportedPartitionColumnTypes: Set[DataType]
71+
72+
/** Whether deletion vectors are supported */
73+
def isDeletionVectorsSupported: Boolean
74+
75+
/** Get the set of required table features */
76+
def requiredTableFeatures: Set[TableFeature]
77+
78+
// Common test cases that apply to both writer and compat versions
79+
80+
supportedDataColumnTypes.diff(simpleTypesToSkip).foreach {
81+
dataType: DataType =>
82+
Seq(true, false).foreach { isNewTable =>
83+
test(s"allowed data column types: $dataType, new table = $isNewTable") {
84+
val schema = new StructType().add("col", dataType)
85+
val metadata = getCompatEnabledMetadata(schema)
86+
val protocol = getCompatEnabledProtocol()
87+
validateAndUpdateIcebergCompatMetadata(isNewTable, metadata, protocol)
88+
}
89+
}
90+
}
91+
92+
IcebergCompatMetadataValidatorAndUpdaterSuiteBase.SIMPLE_TYPES.diff(simpleTypesToSkip).foreach {
93+
dataType: DataType =>
94+
Seq(true, false).foreach { isNewTable =>
95+
test(s"allowed partition column types: $dataType, new table = $isNewTable") {
96+
val schema = new StructType().add("col", dataType)
97+
val metadata = getCompatEnabledMetadata(schema, Seq("col"))
98+
val protocol = getCompatEnabledProtocol()
99+
validateAndUpdateIcebergCompatMetadata(isNewTable, metadata, protocol)
100+
}
101+
}
102+
}
103+
104+
unsupportedDataColumnTypes.foreach {
105+
dataType: DataType =>
106+
Seq(true, false).foreach { isNewTable =>
107+
test(s"disallowed data column types: $dataType, new table = $isNewTable") {
108+
val schema = new StructType().add("col", dataType)
109+
val metadata = getCompatEnabledMetadata(schema)
110+
val protocol = getCompatEnabledProtocol()
111+
val e = intercept[KernelException] {
112+
validateAndUpdateIcebergCompatMetadata(isNewTable, metadata, protocol)
113+
}
114+
assert(e.getMessage.contains(
115+
s"icebergCompat$icebergCompatVersion does not support the data types: "))
116+
}
117+
}
118+
}
119+
120+
unsupportedPartitionColumnTypes.foreach {
121+
dataType: DataType =>
122+
Seq(true, false).foreach { isNewTable =>
123+
test(s"disallowed partition column types: $dataType, new table = $isNewTable") {
124+
val schema = new StructType().add("col", dataType)
125+
val metadata = getCompatEnabledMetadata(schema, Seq("col"))
126+
val protocol = getCompatEnabledProtocol()
127+
val e = intercept[KernelException] {
128+
validateAndUpdateIcebergCompatMetadata(isNewTable, metadata, protocol)
129+
}
130+
assert(e.getMessage.matches(
131+
s"icebergCompat$icebergCompatVersion does not support" +
132+
s" the data type .* for a partition column."))
133+
}
134+
}
135+
}
136+
137+
Seq(true, false).foreach { isNewTable =>
138+
test(s"deletion vectors support behavior, isNewTable $isNewTable") {
139+
val schema = new StructType().add("col", BooleanType.BOOLEAN)
140+
val metadata = getCompatEnabledMetadata(schema)
141+
val protocol = getCompatEnabledProtocol(DELETION_VECTORS_RW_FEATURE)
142+
143+
if (isDeletionVectorsSupported) {
144+
// Should not throw an exception
145+
validateAndUpdateIcebergCompatMetadata(isNewTable, metadata, protocol)
146+
} else {
147+
val e = intercept[KernelException] {
148+
validateAndUpdateIcebergCompatMetadata(isNewTable, metadata, protocol)
149+
}
150+
assert(e.getMessage.contains(
151+
s"Table features [deletionVectors] are incompatible " +
152+
s"with icebergCompat$icebergCompatVersion"))
153+
}
154+
}
155+
}
156+
157+
// Compat-specific test cases
158+
test("compatible type widening is allowed") {
159+
val schema = new StructType()
160+
.add(
161+
new StructField(
162+
"intToLong",
163+
IntegerType.INTEGER,
164+
true,
165+
FieldMetadata.empty()).withTypeChanges(
166+
Seq(new TypeChange(IntegerType.INTEGER, LongType.LONG)).asJava))
167+
.add(
168+
new StructField(
169+
"decimalToDecimal",
170+
new DecimalType(10, 2),
171+
true,
172+
FieldMetadata.empty()).withTypeChanges(
173+
Seq(new TypeChange(new DecimalType(5, 2), new DecimalType(10, 2))).asJava))
174+
175+
val metadata = getCompatEnabledMetadata(schema)
176+
val protocol = getCompatEnabledProtocol(TYPE_WIDENING_RW_FEATURE)
177+
178+
// This should not throw an exception
179+
validateAndUpdateIcebergCompatMetadata(false, metadata, protocol)
180+
}
181+
182+
test("incompatible type widening throws exception") {
183+
val schema = new StructType()
184+
.add(
185+
new StructField(
186+
"dateToTimestamp",
187+
TimestampNTZType.TIMESTAMP_NTZ,
188+
true,
189+
FieldMetadata.empty()).withTypeChanges(
190+
Seq(new TypeChange(DateType.DATE, TimestampNTZType.TIMESTAMP_NTZ)).asJava))
191+
192+
val metadata = getCompatEnabledMetadata(schema)
193+
val protocol = getCompatEnabledProtocol(TYPE_WIDENING_RW_FEATURE)
194+
195+
val e = intercept[KernelException] {
196+
validateAndUpdateIcebergCompatMetadata(false, metadata, protocol)
197+
}
198+
199+
assert(e.getMessage.contains(
200+
s"icebergCompat$icebergCompatVersion does not support type widening present in table"))
201+
}
202+
203+
Seq("id", "name").foreach { existingCMMode =>
204+
Seq(true, false).foreach { isNewTable =>
205+
test(s"existing column mapping mode `$existingCMMode` is preserved " +
206+
s"when icebergCompat is enabled, isNewTable = $isNewTable") {
207+
val metadata = testMetadata(cmTestSchema())
208+
.withMergedConfiguration(
209+
Map(
210+
s"delta.enableIcebergCompat$icebergCompatVersion" -> "true",
211+
"delta.columnMapping.mode" -> existingCMMode).asJava)
212+
val protocol = getCompatEnabledProtocol()
213+
214+
assert(metadata.getConfiguration.get("delta.columnMapping.mode") === existingCMMode)
215+
216+
val updatedMetadata =
217+
validateAndUpdateIcebergCompatMetadata(isNewTable, metadata, protocol)
218+
// No metadata update is needed since already compatible column mapping mode
219+
assert(!updatedMetadata.isPresent)
220+
}
221+
}
222+
}
223+
224+
Seq(true, false).foreach { isNewTable =>
225+
test(
226+
s"can't enable icebergCompat$icebergCompatVersion on a table with icebergCompatv1 enabled, " +
227+
s"isNewTable = $isNewTable") {
228+
val schema = new StructType().add("col", BooleanType.BOOLEAN)
229+
val metadata = getCompatEnabledMetadata(schema)
230+
.withMergedConfiguration(
231+
Map("delta.enableIcebergCompatV1" -> "true").asJava)
232+
val protocol = getCompatEnabledProtocol()
233+
234+
val ex = intercept[KernelException] {
235+
validateAndUpdateIcebergCompatMetadata(isNewTable, metadata, protocol)
236+
}
237+
assert(ex.getMessage.contains(
238+
s"icebergCompat$icebergCompatVersion: Only one IcebergCompat version can be enabled. " +
239+
"Incompatible version enabled: delta.enableIcebergCompatV1"))
240+
}
241+
}
242+
}
243+
244+
object IcebergCompatMetadataValidatorAndUpdaterSuiteBase {
245+
// Allowed simple types as data or partition columns
246+
val SIMPLE_TYPES: Set[DataType] = Set(
247+
BooleanType.BOOLEAN,
248+
ByteType.BYTE,
249+
ShortType.SHORT,
250+
IntegerType.INTEGER,
251+
LongType.LONG,
252+
FloatType.FLOAT,
253+
DoubleType.DOUBLE,
254+
DateType.DATE,
255+
TimestampType.TIMESTAMP,
256+
TimestampNTZType.TIMESTAMP_NTZ,
257+
StringType.STRING,
258+
BinaryType.BINARY,
259+
new DecimalType(10, 5))
260+
261+
// Allowed complex types as data columns
262+
val COMPLEX_TYPES: Set[DataType] = Set(
263+
new ArrayType(BooleanType.BOOLEAN, true),
264+
new MapType(IntegerType.INTEGER, LongType.LONG, true),
265+
new StructType().add("s1", BooleanType.BOOLEAN).add("s2", IntegerType.INTEGER))
266+
}

0 commit comments

Comments
 (0)