Skip to content

Commit 591624a

Browse files
[Kernel] Update maxFieldId when updating the schema (delta-io#4365)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> --------- Co-authored-by: Micah Kornfield <[email protected]>
1 parent ad44fa7 commit 591624a

File tree

4 files changed

+51
-12
lines changed

4 files changed

+51
-12
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,10 @@ private static Optional<Metadata> assignColumnIdAndPhysicalName(
326326
Metadata metadata, boolean isNewTable) {
327327
StructType oldSchema = metadata.getSchema();
328328

329+
// This is the maxColumnId to use when assigning any new field-ids; we update this as we
330+
// traverse the schema and after traversal this is the value that should be stored in the
331+
// metadata. Note - this could be greater than the current value stored in the metadata if
332+
// the connector has added new fields with field-ids
329333
AtomicInteger maxColumnId =
330334
new AtomicInteger(
331335
Math.max(
@@ -335,8 +339,6 @@ private static Optional<Metadata> assignColumnIdAndPhysicalName(
335339
.getOrDefault(COLUMN_MAPPING_MAX_COLUMN_ID_KEY, "0")),
336340
findMaxColumnId(oldSchema)));
337341

338-
int oldMaxColumnId = maxColumnId.get();
339-
340342
StructType newSchema = new StructType();
341343
for (StructField field : oldSchema.fields()) {
342344
newSchema =
@@ -354,17 +356,16 @@ private static Optional<Metadata> assignColumnIdAndPhysicalName(
354356
newSchema = rewriteFieldIdsForIceberg(newSchema, maxColumnId);
355357
}
356358

359+
// The maxColumnId in the metadata may be out-of-date either due to field-id assignment
360+
// performed in this fx, or due to connector adding new fields
361+
boolean shouldUpdateMaxId =
362+
TableConfig.COLUMN_MAPPING_MAX_COLUMN_ID.fromMetadata(metadata) != maxColumnId.get();
363+
357364
// We are comparing the old schema with the new schema to determine if the schema has changed.
358365
// If this becomes hotspot, we can consider updating the methods to pass around AtomicBoolean
359366
// to track if the schema has changed. It is a bit convoluted to pass around and update the
360367
// AtomicBoolean in the recursive and multiple methods.
361-
if (oldSchema.equals(newSchema)) {
362-
checkArgument(
363-
oldMaxColumnId == maxColumnId.get(),
364-
"The schema hasn't changed but the max column id has changed from %s to %s",
365-
oldMaxColumnId,
366-
maxColumnId.get());
367-
368+
if (oldSchema.equals(newSchema) && !shouldUpdateMaxId) {
368369
return Optional.empty();
369370
}
370371

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/ColumnMappingSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package io.delta.kernel.internal.util
1717

1818
import java.util
1919

20+
import scala.collection.JavaConverters.mapAsJavaMapConverter
21+
2022
import io.delta.kernel.exceptions.KernelException
2123
import io.delta.kernel.expressions.Column
2224
import io.delta.kernel.internal.actions.Metadata
@@ -607,6 +609,12 @@ class ColumnMappingSuite extends AnyFunSuite with ColumnMappingSuiteBase {
607609
if (enableIcebergCompatV2) {
608610
metadata = metadata.withIcebergCompatV2Enabled
609611
}
612+
if (!metadata.getConfiguration.containsKey(COLUMN_MAPPING_MAX_COLUMN_ID_KEY)) {
613+
// A hack, if the metadata doesn't have max column ID in it,
614+
// then new metadata is always returned.
615+
metadata =
616+
metadata.withMergedConfiguration(Map(COLUMN_MAPPING_MAX_COLUMN_ID_KEY -> "100").asJava)
617+
}
610618

611619
assertThat(updateColumnMappingMetadataIfNeeded(metadata, isNewTable)).isEmpty
612620
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaColumnMappingSuite.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import scala.collection.immutable.Seq
2121
import io.delta.kernel.Table
2222
import io.delta.kernel.exceptions.InvalidConfigurationValueException
2323
import io.delta.kernel.internal.TableConfig
24-
import io.delta.kernel.internal.util.ColumnMappingSuiteBase
25-
import io.delta.kernel.types.{IntegerType, StringType, StructType}
24+
import io.delta.kernel.internal.util.{ColumnMapping, ColumnMappingSuiteBase}
25+
import io.delta.kernel.types.{FieldMetadata, IntegerType, StringType, StructField, StructType}
2626

2727
class DeltaColumnMappingSuite extends DeltaTableWriteSuiteBase with ColumnMappingSuiteBase {
2828

@@ -86,11 +86,33 @@ class DeltaColumnMappingSuite extends DeltaTableWriteSuiteBase with ColumnMappin
8686
assertColumnMapping(structType.get("a"), 1)
8787
assertColumnMapping(structType.get("b"), 2)
8888

89+
assert(TableConfig.COLUMN_MAPPING_MAX_COLUMN_ID.fromMetadata(getMetadata(
90+
engine,
91+
tablePath)) == 2)
92+
8993
val protocol = getProtocol(engine, tablePath)
9094
assert(protocol.getMinReaderVersion == 2 && protocol.getMinWriterVersion == 7)
9195
}
9296
}
9397

98+
test("new table with existing column mappings in schema writes COLUMN_MAPPING_MAX_COLUMN_ID") {
99+
withTempDirAndEngine { (tablePath, engine) =>
100+
val props = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")
101+
val fieldMetadata = FieldMetadata.builder()
102+
.putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1)
103+
.putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "col-0").build()
104+
val structField = new StructField("col_name", IntegerType.INTEGER, false, fieldMetadata)
105+
val schema = new StructType(Seq(structField).asJava)
106+
createEmptyTable(engine, tablePath, schema, tableProperties = props)
107+
108+
val structtype = getMetadata(engine, tablePath).getSchema
109+
assertColumnMapping(structtype.get("col_name"), 1)
110+
assert(TableConfig.COLUMN_MAPPING_MAX_COLUMN_ID.fromMetadata(getMetadata(
111+
engine,
112+
tablePath)) == 1)
113+
}
114+
}
115+
94116
test("can update existing table to column mapping mode = name") {
95117
withTempDirAndEngine { (tablePath, engine) =>
96118
createEmptyTable(engine, tablePath, simpleTestSchema)

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import io.delta.kernel.utils.CloseableIterable.emptyIterable
3232
*/
3333
class DeltaTableSchemaEvolutionSuite extends DeltaTableWriteSuiteBase with ColumnMappingSuiteBase {
3434

35-
test("Add nullable column succeeds") {
35+
test("Add nullable column succeeds and correctly updates maxFieldId") {
3636
withTempDirAndEngine { (tablePath, engine) =>
3737
val table = Table.forPath(engine, tablePath)
3838
val initialSchema = new StructType()
@@ -70,6 +70,7 @@ class DeltaTableSchemaEvolutionSuite extends DeltaTableWriteSuiteBase with Colum
7070
assertColumnMapping(innerStruct.get("d"), 4, "d")
7171
assertColumnMapping(innerStruct.get("e"), 5, "e")
7272
assertColumnMapping(structType.get("c"), 2)
73+
assert(getMaxFieldId(engine, tablePath) == 5)
7374
}
7475
}
7576

@@ -87,6 +88,7 @@ class DeltaTableSchemaEvolutionSuite extends DeltaTableWriteSuiteBase with Colum
8788
tableProperties = Map(
8889
TableConfig.COLUMN_MAPPING_MODE.getKey -> "id",
8990
TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true"))
91+
assertColumnMapping(table.getLatestSnapshot(engine).getSchema.get("c"), 2)
9092

9193
val currentSchema = table.getLatestSnapshot(engine).getSchema()
9294
val newSchema = new StructType()
@@ -98,6 +100,7 @@ class DeltaTableSchemaEvolutionSuite extends DeltaTableWriteSuiteBase with Colum
98100

99101
val structType = table.getLatestSnapshot(engine).getSchema
100102
assertColumnMapping(structType.get("a"), 1)
103+
assert(getMaxFieldId(engine, tablePath) == 2)
101104
}
102105
}
103106

@@ -1209,4 +1212,9 @@ class DeltaTableSchemaEvolutionSuite extends DeltaTableWriteSuiteBase with Colum
12091212
assert(e.isInstanceOf[T])
12101213
assert(e.getMessage.contains(expectedMessageContained))
12111214
}
1215+
1216+
private def getMaxFieldId(engine: Engine, tablePath: String): Long = {
1217+
TableConfig.COLUMN_MAPPING_MAX_COLUMN_ID
1218+
.fromMetadata(getMetadata(engine, tablePath))
1219+
}
12121220
}

0 commit comments

Comments
 (0)