Skip to content

Commit ca118d1

Browse files
Richard-code-gigSola Richard Olorunfemi
andauthored
[Spark] Fix schema evolution issue with nested struct (within a map) and column renamed (delta-io#3886)
This PR fixes an issue with schema evolution in Delta Lake where adding a new field to a struct within a map and renaming an existing top level field caused the operation to fail. The fix includes logic to handle these transformations properly, ensuring that new fields are added without conflicts. It also resolved a ToDo of casting map types in the [DeltaAnalysis.scala](https://github.com/Richard-code-gig/delta/blob/feature/schema-evolution-with-map-fix/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala) module. ### Changes: - Updated schema evolution logic to support complex map transformations. - Enabled schema evolution for both map keys, simple and nested values - Added additional case statements to handle MapTypes in addCastToColumn method in DeltaAnalysis.scala module. - Modified TypeWideningInsertSchemaEvolutionSuite test to support schema evolution of maps. - Added an additional method (addCastsToMaps) to DeltaAnalysis.scala module. - Changed argument type of addCastToColumn from attributes to namedExpression - Added [EvolutionWithMap](https://github.com/Richard-code-gig/delta/blob/feature/schema-evolution-with-map-fix/examples/scala/src/main/scala/example/EvolutionWithMap.scala) in the example modules to demonstrate use case. - Modified nested struct type evolution with field upcast test in map in TypeWideningInsertSchemaEvolutionSuite.scala - Added new tests cases for maps to DeltaInsertIntoTableSuite.scala ### Related Issues: - Resolves: delta-io#3227 <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [✓] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? Tested through: - Integration Tests: Validated changes with Delta Lake and Spark integration. See [EvolutionWithMap](https://github.com/Richard-code-gig/delta/blob/feature/schema-evolution-with-map-fix/examples/scala/src/main/scala/example/EvolutionWithMap.scala). - Validated the test suites passed and [TypeWideningInsertSchemaEvolutionSuite](https://github.com/Richard-code-gig/delta/blob/feature/schema-evolution-with-map-fix/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala) to add support for maps. - Added additional tests cases in [DeltaInsertIntoTableSuite](https://github.com/Richard-code-gig/delta/blob/feature/schema-evolution-with-map-fix/spark/src/test/scala/org/apache/spark/sql/DeltaInsertIntoTableSuite.scala) to cover complex map transformations <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? No, it doesn't introduce any user-facing changes. It only resolved an issue even in the released versions of Delta Lake. The previous behaviour was an error message when attempting operations involving adding extra fields to StructField in maps: [[DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION](https://docs.databricks.com/error-messages/error-classes.html#datatype_mismatch.cast_without_suggestion)] Cannot resolve "metrics" due to data type mismatch: cannot cast "MAP<STRING, STRUCT<id: INT, value: INT, comment: STRING>>" to "MAP<STRING, STRUCT<id: INT, value: INT>>". <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> --------- Co-authored-by: Sola Richard Olorunfemi <[email protected]>
1 parent 634ba15 commit ca118d1

File tree

4 files changed

+407
-26
lines changed

4 files changed

+407
-26
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package example
18+
19+
import org.apache.spark.sql.types._
20+
import org.apache.spark.sql.functions._
21+
import org.apache.spark.sql.Row
22+
import org.apache.spark.sql.SparkSession
23+
24+
object EvolutionWithMap {
25+
def main(args: Array[String]): Unit = {
26+
val spark = SparkSession.builder()
27+
.appName("EvolutionWithMap")
28+
.master("local[*]")
29+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
30+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
31+
.getOrCreate()
32+
33+
import spark.implicits._
34+
35+
val tableName = "insert_map_schema_evolution"
36+
37+
try {
38+
// Define initial schema
39+
val initialSchema = StructType(Seq(
40+
StructField("key", IntegerType, nullable = false),
41+
StructField("metrics", MapType(StringType, StructType(Seq(
42+
StructField("id", IntegerType, nullable = false),
43+
StructField("value", IntegerType, nullable = false)
44+
))))
45+
))
46+
47+
val data = Seq(
48+
Row(1, Map("event" -> Row(1, 1)))
49+
)
50+
51+
val rdd = spark.sparkContext.parallelize(data)
52+
53+
val initialDf = spark.createDataFrame(rdd, initialSchema)
54+
55+
initialDf.write
56+
.option("overwriteSchema", "true")
57+
.mode("overwrite")
58+
.format("delta")
59+
.saveAsTable(s"$tableName")
60+
61+
// Define the schema with simulteneous change in a StructField name
62+
// And additional field in a map column
63+
val evolvedSchema = StructType(Seq(
64+
StructField("renamed_key", IntegerType, nullable = false),
65+
StructField("metrics", MapType(StringType, StructType(Seq(
66+
StructField("id", IntegerType, nullable = false),
67+
StructField("value", IntegerType, nullable = false),
68+
StructField("comment", StringType, nullable = true)
69+
))))
70+
))
71+
72+
val evolvedData = Seq(
73+
Row(1, Map("event" -> Row(1, 1, "deprecated")))
74+
)
75+
76+
val evolvedRDD = spark.sparkContext.parallelize(evolvedData)
77+
78+
val modifiedDf = spark.createDataFrame(evolvedRDD, evolvedSchema)
79+
80+
// The below would fail without schema evolution for map types
81+
modifiedDf.write
82+
.mode("append")
83+
.option("mergeSchema", "true")
84+
.format("delta")
85+
.insertInto(s"$tableName")
86+
87+
spark.sql(s"SELECT * FROM $tableName").show(false)
88+
89+
} finally {
90+
91+
// Cleanup
92+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
93+
94+
spark.stop()
95+
}
96+
97+
}
98+
}

spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ import org.apache.spark.sql.internal.SQLConf
6969
import org.apache.spark.sql.types._
7070
import org.apache.spark.sql.util.CaseInsensitiveStringMap
7171

72+
7273
/**
7374
* Analysis rules for Delta. Currently, these rules enable schema enforcement / evolution with
7475
* INSERT INTO.
@@ -913,8 +914,8 @@ class DeltaAnalysis(session: SparkSession)
913914
}
914915

915916
private def addCastToColumn(
916-
attr: Attribute,
917-
targetAttr: Attribute,
917+
attr: NamedExpression,
918+
targetAttr: NamedExpression,
918919
tblName: String,
919920
allowTypeWidening: Boolean): NamedExpression = {
920921
val expr = (attr.dataType, targetAttr.dataType) match {
@@ -930,6 +931,11 @@ class DeltaAnalysis(session: SparkSession)
930931
// Keep the type from the query, the target schema will be updated to widen the existing
931932
// type to match it.
932933
attr
934+
case (s: MapType, t: MapType)
935+
if !DataType.equalsStructurally(s, t, ignoreNullability = true) || allowTypeWidening =>
936+
// only trigger addCastsToMaps if exists differences like extra fields, renaming
937+
// Or allowTypeWidening is enabled
938+
addCastsToMaps(tblName, attr, s, t, allowTypeWidening)
933939
case _ =>
934940
getCastFunction(attr, targetAttr.dataType, targetAttr.name)
935941
}
@@ -1047,8 +1053,7 @@ class DeltaAnalysis(session: SparkSession)
10471053
}
10481054

10491055
/**
1050-
* Recursively casts structs in case it contains null types.
1051-
* TODO: Support other complex types like MapType and ArrayType
1056+
* Recursively casts struct data types in case the source/target type differs.
10521057
*/
10531058
private def addCastsToStructs(
10541059
tableName: String,
@@ -1124,6 +1129,64 @@ class DeltaAnalysis(session: SparkSession)
11241129
DeltaViewHelper.stripTempViewForMerge(plan, conf)
11251130
}
11261131

1132+
/**
1133+
* Recursively casts map data types in case the key/value type differs.
1134+
*/
1135+
private def addCastsToMaps(
1136+
tableName: String,
1137+
parent: NamedExpression,
1138+
sourceMapType: MapType,
1139+
targetMapType: MapType,
1140+
allowTypeWidening: Boolean): Expression = {
1141+
val transformedKeys =
1142+
if (sourceMapType.keyType != targetMapType.keyType) {
1143+
// Create a transformation for the keys
1144+
ArrayTransform(MapKeys(parent), {
1145+
val key = NamedLambdaVariable(
1146+
"key", sourceMapType.keyType, nullable = false)
1147+
1148+
val keyAttr = AttributeReference(
1149+
"key", targetMapType.keyType, nullable = false)()
1150+
1151+
val castedKey =
1152+
addCastToColumn(
1153+
key,
1154+
keyAttr,
1155+
tableName,
1156+
allowTypeWidening
1157+
)
1158+
LambdaFunction(castedKey, Seq(key))
1159+
})
1160+
} else {
1161+
MapKeys(parent)
1162+
}
1163+
1164+
val transformedValues =
1165+
if (sourceMapType.valueType != targetMapType.valueType) {
1166+
// Create a transformation for the values
1167+
ArrayTransform(MapValues(parent), {
1168+
val value = NamedLambdaVariable(
1169+
"value", sourceMapType.valueType, sourceMapType.valueContainsNull)
1170+
1171+
val valueAttr = AttributeReference(
1172+
"value", targetMapType.valueType, sourceMapType.valueContainsNull)()
1173+
1174+
val castedValue =
1175+
addCastToColumn(
1176+
value,
1177+
valueAttr,
1178+
tableName,
1179+
allowTypeWidening
1180+
)
1181+
LambdaFunction(castedValue, Seq(value))
1182+
})
1183+
} else {
1184+
MapValues(parent)
1185+
}
1186+
// Create new map from transformed keys and values
1187+
MapFromArrays(transformedKeys, transformedValues)
1188+
}
1189+
11271190
/**
11281191
* Verify the input plan for a SINGLE streaming query with the following:
11291192
* 1. Schema location must be under checkpoint location, if not lifted by flag

0 commit comments

Comments
 (0)