Skip to content

Commit bdd26fe

Browse files
[Kernel] Update column mapping and schema evolution code to support usage with replace table (delta-io#4520)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Update SchemaUtils and ColumnMapping with unit tests in order to support REPLACE TABLE with column mapping + fieldId re-use in PR #2. Specifically this involves the following changes (not necessarily related, but combined in this PR) 1) When a connector provides its own column mapping info in the schema pre-populated we require that it's complete (i.e. fieldId AND physicalName must be present) 2) We add an argument to our schema validation checks `allowNewNonNullableFields`. This is useful in cases where we can be sure the table state has been completely cleared, and thus new non-null fields are valid (like REPLACE). 3) We don't allow adding a new column with a fieldId less than the maxColId. For now, do this proactively for safety. In the future in the case of something like RESTORE in the future we will likely need a config to bypass this check. ## How was this patch tested? Updates unit tests. Also, all the changes in this PR are used by delta-io#4520 which adds a lot more E2E tests with multiple schema scenarios. ## Does this PR introduce _any_ user-facing changes? No.
1 parent 842a4bf commit bdd26fe

File tree

6 files changed

+196
-53
lines changed

6 files changed

+196
-53
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -608,11 +608,10 @@ private void validateMetadataChange(
608608
.collect(toSet());
609609

610610
SchemaUtils.validateUpdatedSchema(
611-
oldMetadata.getSchema(),
612-
newMetadata.getSchema(),
613-
oldMetadata.getPartitionColNames(),
611+
oldMetadata,
612+
newMetadata,
614613
clusteringColumnPhysicalNames,
615-
newMetadata);
614+
false /* allowNewRequiredFields*/);
616615
}
617616
}
618617

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,15 @@ private static StructField assignColumnIdAndPhysicalNameToField(
468468
AtomicInteger maxColumnId,
469469
boolean isNewTable,
470470
boolean useColumnIdForPhysicalName) {
471+
if (hasColumnId(field) ^ hasPhysicalName(field)) {
472+
// If a connector is providing column mapping metadata in the given schema we require it to be
473+
// complete
474+
throw new IllegalArgumentException(
475+
String.format(
476+
"Both columnId and physicalName must be present if one is present. "
477+
+ "Found this field with incomplete column mapping metadata: %s",
478+
field));
479+
}
471480
if (!hasColumnId(field)) {
472481
field =
473482
field.withNewMetadata(

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,27 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab
101101
* </ul>
102102
*/
103103
public static void validateUpdatedSchema(
104-
StructType currentSchema,
105-
StructType newSchema,
106-
Set<String> currentPartitionColumns,
104+
Metadata currentMetadata,
105+
Metadata newMetadata,
107106
Set<String> clusteringColumnPhysicalNames,
108-
Metadata newMetadata) {
107+
boolean allowNewRequiredFields) {
109108
checkArgument(
110109
isColumnMappingModeEnabled(
111110
ColumnMapping.getColumnMappingMode(newMetadata.getConfiguration())),
112111
"Cannot validate updated schema when column mapping is disabled");
113-
validateSchema(newSchema, true /*columnMappingEnabled*/);
114-
validatePartitionColumns(newSchema, new ArrayList<>(currentPartitionColumns));
115-
validateSchemaEvolution(currentSchema, newSchema, newMetadata, clusteringColumnPhysicalNames);
112+
validateSchema(newMetadata.getSchema(), true /*columnMappingEnabled*/);
113+
validatePartitionColumns(
114+
newMetadata.getSchema(), new ArrayList<>(newMetadata.getPartitionColNames()));
115+
int currentMaxFieldId =
116+
Integer.parseInt(
117+
currentMetadata.getConfiguration().getOrDefault(COLUMN_MAPPING_MAX_COLUMN_ID_KEY, "0"));
118+
validateSchemaEvolution(
119+
currentMetadata.getSchema(),
120+
newMetadata.getSchema(),
121+
ColumnMapping.getColumnMappingMode(newMetadata.getConfiguration()),
122+
clusteringColumnPhysicalNames,
123+
currentMaxFieldId,
124+
allowNewRequiredFields);
116125
}
117126

118127
/**
@@ -426,14 +435,19 @@ private static void validatePhysicalNameConsistency(
426435
private static void validateSchemaEvolution(
427436
StructType currentSchema,
428437
StructType newSchema,
429-
Metadata metadata,
430-
Set<String> clusteringColumnPhysicalNames) {
431-
ColumnMappingMode columnMappingMode =
432-
ColumnMapping.getColumnMappingMode(metadata.getConfiguration());
438+
ColumnMappingMode columnMappingMode,
439+
Set<String> clusteringColumnPhysicalNames,
440+
int currentMaxFieldId,
441+
boolean allowNewRequiredFields) {
433442
switch (columnMappingMode) {
434443
case ID:
435444
case NAME:
436-
validateSchemaEvolutionById(currentSchema, newSchema, clusteringColumnPhysicalNames);
445+
validateSchemaEvolutionById(
446+
currentSchema,
447+
newSchema,
448+
clusteringColumnPhysicalNames,
449+
currentMaxFieldId,
450+
allowNewRequiredFields);
437451
return;
438452
case NONE:
439453
throw new UnsupportedOperationException(
@@ -449,14 +463,18 @@ private static void validateSchemaEvolution(
449463
* fields
450464
*/
451465
private static void validateSchemaEvolutionById(
452-
StructType currentSchema, StructType newSchema, Set<String> clusteringColumnPhysicalNames) {
466+
StructType currentSchema,
467+
StructType newSchema,
468+
Set<String> clusteringColumnPhysicalNames,
469+
int oldMaxFieldId,
470+
boolean allowNewRequiredFields) {
453471
Map<Integer, StructField> currentFieldsById = fieldsById(currentSchema);
454472
Map<Integer, StructField> updatedFieldsById = fieldsById(newSchema);
455473
SchemaChanges schemaChanges = computeSchemaChangesById(currentFieldsById, updatedFieldsById);
456474
validatePhysicalNameConsistency(schemaChanges.updatedFields());
457475
// Validates that the updated schema does not contain breaking changes in terms of types and
458476
// nullability
459-
validateUpdatedSchemaCompatibility(schemaChanges);
477+
validateUpdatedSchemaCompatibility(schemaChanges, oldMaxFieldId, allowNewRequiredFields);
460478
validateClusteringColumnsNotDropped(
461479
schemaChanges.removedFields(), clusteringColumnPhysicalNames);
462480
// ToDo Potentially validate IcebergCompatV2 nested IDs
@@ -480,12 +498,21 @@ private static void validateClusteringColumnsNotDropped(
480498
*
481499
* <p>ToDo: Prevent moving fields outside of their containing struct
482500
*/
483-
private static void validateUpdatedSchemaCompatibility(SchemaChanges schemaChanges) {
501+
private static void validateUpdatedSchemaCompatibility(
502+
SchemaChanges schemaChanges, int oldMaxFieldId, boolean allowNewRequiredFields) {
484503
for (StructField addedField : schemaChanges.addedFields()) {
485-
if (!addedField.isNullable()) {
504+
if (!allowNewRequiredFields && !addedField.isNullable()) {
486505
throw new KernelException(
487506
String.format("Cannot add non-nullable field %s", addedField.getName()));
488507
}
508+
int colId = getColumnId(addedField);
509+
if (colId <= oldMaxFieldId) {
510+
throw new IllegalArgumentException(
511+
String.format(
512+
"Cannot add a new column with a fieldId <= maxFieldId. Found field: %s with"
513+
+ "fieldId=%s. Current maxFieldId in the table is: %s",
514+
addedField, colId, oldMaxFieldId));
515+
}
489516
}
490517

491518
for (Tuple2<StructField, StructField> updatedFields : schemaChanges.updatedFields()) {

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/ColumnMappingSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,35 @@ class ColumnMappingSuite extends AnyFunSuite with ColumnMappingSuiteBase {
626626
}
627627
}
628628

629+
test("both id and physical name must be provided if one is provided") {
630+
val schemaWithoutPhysicalName = new StructType()
631+
.add(
632+
new StructField(
633+
"col1",
634+
StringType.STRING,
635+
true,
636+
FieldMetadata.builder()
637+
.putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 0)
638+
.build()))
639+
val schemaWithoutId = new StructType()
640+
.add(
641+
new StructField(
642+
"col1",
643+
StringType.STRING,
644+
true,
645+
FieldMetadata.builder()
646+
.putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "physical-name-col1")
647+
.build()))
648+
649+
Seq(schemaWithoutId, schemaWithoutPhysicalName).foreach { schema =>
650+
val e = intercept[IllegalArgumentException] {
651+
updateColumnMappingMetadataIfNeeded(testMetadata(schema).withColumnMappingEnabled(), true)
652+
}
653+
assert(e.getMessage.contains(
654+
"Both columnId and physicalName must be present if one is present"))
655+
}
656+
}
657+
629658
/**
630659
* A struct type with all necessary CM info won't cause metadata change by
631660
* [[updateColumnMappingMetadataIfNeeded]]

0 commit comments

Comments
 (0)