Skip to content

Commit ce8bee4

Browse files
[Kernel] Expose withSchema API to enable schema evolution (delta-io#4361)
#### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description This PR exposes withSchema API which will enable users to do non intent-based schema evolution with the documented validation . ## How was this patch tested? Added the same E2E tests as in delta-io#4196 except for nested ID consistency, since that is being reworked as part of delta-io#4360 and that validation is not there. ## Does this PR introduce _any_ user-facing changes? Yeah the transaction API exposes a withSchema API which allows users to pass in their own schema for evolution.
1 parent c514066 commit ce8bee4

File tree

7 files changed

+1278
-27
lines changed

7 files changed

+1278
-27
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/TransactionBuilder.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,29 @@
3434
@Evolving
3535
public interface TransactionBuilder {
3636
/**
37-
* Set the schema of the table when creating a new table.
37+
* Set the schema of the table. If setting the schema on an existing table for a schema evolution,
38+
* then column mapping must be enabled. This API will preserve field metadata for fields such as
39+
* field IDs and physical names. If field metadata is not specified for a field, it is considered
40+
* as a new column and new IDs/physical names will be specified. The possible schema evolutions
41+
* supported include column additions, removals, renames, and moves. If a schema evolution is
42+
* performed, implementations must perform the following validations:
43+
*
44+
* <ul>
45+
* <li>No duplicate columns are allowed
46+
* <li>Column names contain only valid characters
47+
* <li>Data types are supported
48+
* <li>No new non-nullable fields are added
49+
* <li>Physical column name consistency is preserved in the new schema
50+
* <li>No type changes
51+
* <li>ToDo: Nested IDs for array/map types are preserved in the new schema
52+
* <li>ToDo: Validate invalid field reorderings
53+
* </ul>
3854
*
3955
* @param engine {@link Engine} instance to use.
4056
* @param schema The new schema of the table.
4157
* @return updated {@link TransactionBuilder} instance.
58+
* @throws io.delta.kernel.exceptions.KernelException in case column mapping is not enabled
59+
* @throws IllegalArgumentException in case of any validation failure
4260
*/
4361
TransactionBuilder withSchema(Engine engine, StructType schema);
4462

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import io.delta.kernel.*;
3131
import io.delta.kernel.engine.Engine;
32+
import io.delta.kernel.exceptions.KernelException;
3233
import io.delta.kernel.exceptions.TableNotFoundException;
3334
import io.delta.kernel.internal.actions.*;
3435
import io.delta.kernel.internal.fs.Path;
@@ -162,7 +163,9 @@ public Transaction build(Engine engine) {
162163
newMetadata = Optional.of(snapshotMetadata.withMergedConfiguration(newProperties));
163164
}
164165

165-
// TODO In the future update metadata with new schema if provided
166+
if (schema.isPresent() && !isNewTable) {
167+
newMetadata = Optional.of(newMetadata.orElse(snapshotMetadata).withNewSchema(schema.get()));
168+
}
166169

167170
/* ----- 2: Update the PROTOCOL based on the table properties or schema ----- */
168171
// This is the only place we update the protocol action; takes care of any dependent features
@@ -269,12 +272,6 @@ private void validateTransactionInputs(Engine engine, SnapshotImpl snapshot, boo
269272
snapshot.getProtocol(), snapshot.getMetadata(), tablePath);
270273

271274
if (!isNewTable) {
272-
if (schema.isPresent()) {
273-
throw tableAlreadyExists(
274-
tablePath,
275-
"Table already exists, but provided a new schema. "
276-
+ "Schema can only be set on a new table.");
277-
}
278275
if (partitionColumns.isPresent()) {
279276
throw tableAlreadyExists(
280277
tablePath,
@@ -319,7 +316,26 @@ private void validateMetadataChange(
319316
IcebergWriterCompatV1MetadataValidatorAndUpdater.validateIcebergWriterCompatV1Change(
320317
oldMetadata.getConfiguration(), newMetadata.getConfiguration(), isNewTable);
321318

322-
// TODO In the future validate any schema change
319+
// Validate the conditions for schema evolution and the updated schema if applicable
320+
if (schema.isPresent() && !isNewTable) {
321+
ColumnMappingMode updatedMappingMode =
322+
ColumnMapping.getColumnMappingMode(newMetadata.getConfiguration());
323+
ColumnMappingMode currentMappingMode =
324+
ColumnMapping.getColumnMappingMode(oldMetadata.getConfiguration());
325+
if (currentMappingMode != updatedMappingMode) {
326+
throw new KernelException("Cannot update mapping mode and perform schema evolution");
327+
}
328+
329+
if (!isColumnMappingModeEnabled(updatedMappingMode)) {
330+
throw new KernelException("Cannot update schema for table when column mapping is disabled");
331+
}
332+
333+
SchemaUtils.validateUpdatedSchema(
334+
oldMetadata.getSchema(),
335+
newMetadata.getSchema(),
336+
oldMetadata.getPartitionColNames(),
337+
newMetadata);
338+
}
323339
}
324340

325341
private class InitialSnapshot extends SnapshotImpl {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergCompatV2MetadataValidatorAndUpdater.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.delta.kernel.internal.icebergcompat;
1717

1818
import static io.delta.kernel.internal.tablefeatures.TableFeatures.*;
19+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
1920
import static java.util.Collections.singletonList;
2021
import static java.util.stream.Collectors.toList;
2122

@@ -131,9 +132,15 @@ public static void validateDataFileStatus(DataFileStatus dataFileStatus) {
131132
.newMetadata
132133
.getPartitionColNames()
133134
.forEach(
134-
partitonCol -> {
135+
partitionCol -> {
136+
int partitionFieldIndex =
137+
inputContext.newMetadata.getSchema().indexOf(partitionCol);
138+
checkArgument(
139+
partitionFieldIndex != -1,
140+
"Partition column %s not found in the schema",
141+
partitionCol);
135142
DataType dataType =
136-
inputContext.newMetadata.getSchema().get(partitonCol).getDataType();
143+
inputContext.newMetadata.getSchema().at(partitionFieldIndex).getDataType();
137144
boolean validType =
138145
dataType instanceof ByteType
139146
|| dataType instanceof ShortType

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,17 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab
9999
* </ul>
100100
*/
101101
public static void validateUpdatedSchema(
102-
StructType currentSchema, StructType newSchema, Metadata metadata) {
102+
StructType currentSchema,
103+
StructType newSchema,
104+
Set<String> currentPartitionColumns,
105+
Metadata newMetadata) {
103106
checkArgument(
104-
isColumnMappingModeEnabled(ColumnMapping.getColumnMappingMode(metadata.getConfiguration())),
107+
isColumnMappingModeEnabled(
108+
ColumnMapping.getColumnMappingMode(newMetadata.getConfiguration())),
105109
"Cannot validate updated schema when column mapping is disabled");
106110
validateSchema(newSchema, true /*columnMappingEnabled*/);
107-
validateSchemaEvolution(currentSchema, newSchema, metadata);
111+
validatePartitionColumns(newSchema, new ArrayList<>(currentPartitionColumns));
112+
validateSchemaEvolution(currentSchema, newSchema, newMetadata);
108113
}
109114

110115
/**

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/SchemaUtilsSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ class SchemaUtilsSuite extends AnyFunSuite {
490490
validateUpdatedSchema(
491491
current,
492492
updated,
493+
Set.empty.asJava,
493494
metadata(current, properties = tblProperties))
494495
}
495496

@@ -682,7 +683,7 @@ class SchemaUtilsSuite extends AnyFunSuite {
682683
test("validateUpdatedSchema fails with schema with duplicate column ID") {
683684
forAll(updatedSchemaHasDuplicateColumnId) { (schemaBefore, schemaAfter) =>
684685
val e = intercept[IllegalArgumentException] {
685-
validateUpdatedSchema(schemaBefore, schemaAfter, metadata(schemaBefore))
686+
validateUpdatedSchema(schemaBefore, schemaAfter, Set.empty.asJava, metadata(schemaBefore))
686687
}
687688

688689
assert(e.getMessage.matches("Field duplicate_id with id .* already exists"))
@@ -731,7 +732,7 @@ class SchemaUtilsSuite extends AnyFunSuite {
731732

732733
test("validateUpdatedSchema succeeds with valid ID and physical name") {
733734
forAll(validUpdatedSchemas) { (schemaBefore, schemaAfter) =>
734-
validateUpdatedSchema(schemaBefore, schemaAfter, metadata(schemaBefore))
735+
validateUpdatedSchema(schemaBefore, schemaAfter, Set.empty.asJava, metadata(schemaBefore))
735736
}
736737
}
737738

@@ -917,7 +918,7 @@ class SchemaUtilsSuite extends AnyFunSuite {
917918

918919
test("validateUpdatedSchema succeeds when adding field") {
919920
forAll(validateAddedFields) { (schemaBefore, schemaAfter) =>
920-
validateUpdatedSchema(schemaBefore, schemaAfter, metadata(schemaBefore))
921+
validateUpdatedSchema(schemaBefore, schemaAfter, Set.empty.asJava, metadata(schemaBefore))
921922
}
922923
}
923924

@@ -951,7 +952,7 @@ class SchemaUtilsSuite extends AnyFunSuite {
951952

952953
test("validateUpdatedSchema succeeds when updating field metadata") {
953954
forAll(validateMetadataChange) { (schemaBefore, schemaAfter) =>
954-
validateUpdatedSchema(schemaBefore, schemaAfter, metadata(schemaBefore))
955+
validateUpdatedSchema(schemaBefore, schemaAfter, Set.empty.asJava, metadata(schemaBefore))
955956
}
956957
}
957958

@@ -1001,6 +1002,7 @@ class SchemaUtilsSuite extends AnyFunSuite {
10011002
validateUpdatedSchema(
10021003
schemaBefore,
10031004
schemaAfter,
1005+
Set.empty.asJava,
10041006
metadata(schemaBefore, tableProperties))
10051007
}
10061008

0 commit comments

Comments
 (0)