Skip to content

Commit 662f4ac

Browse files
authored
[Spark] Allow overriding StructField metadata in mergeDataTypes (delta-io#4890)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Currently, `SchemaMergingUtils.mergeDataTypes` will always use the existing metadata when merging a new `StructField` with the same name as an existing field. This change provides a new flag `overrideMetadata` that allows the update schema to override the metadata in the current schema. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> New unit test in `SchemaUtilsSuite`.scala ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 5f8d25a commit 662f4ac

File tree

2 files changed

+72
-7
lines changed

2 files changed

+72
-7
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ object SchemaMergingUtils {
163163
keepExistingType,
164164
typeWideningMode,
165165
caseSensitive,
166-
allowOverride = false
166+
allowOverride = false,
167+
overrideMetadata = false
167168
).asInstanceOf[StructType]
168169
}
169170

@@ -184,6 +185,8 @@ object SchemaMergingUtils {
184185
* @param caseSensitive Whether we should keep field mapping case-sensitively.
185186
* This should default to false for Delta, which is case insensitive.
186187
* @param allowOverride Whether to let incoming type override the existing type if unmatched.
188+
* @param overrideMetadata Whether to let metadata of new fields override the existing
189+
* metadata of matching fields
187190
*/
188191
def mergeDataTypes(
189192
current: DataType,
@@ -192,7 +195,8 @@ object SchemaMergingUtils {
192195
keepExistingType: Boolean,
193196
typeWideningMode: TypeWideningMode,
194197
caseSensitive: Boolean,
195-
allowOverride: Boolean): DataType = {
198+
allowOverride: Boolean,
199+
overrideMetadata: Boolean): DataType = {
196200
def merge(current: DataType, update: DataType): DataType = {
197201
(current, update) match {
198202
case (StructType(currentFields), StructType(updateFields)) =>
@@ -202,11 +206,14 @@ object SchemaMergingUtils {
202206
updateFieldMap.get(currentField.name) match {
203207
case Some(updateField) =>
204208
try {
209+
val updatedCurrentFieldMetadata =
210+
if (overrideMetadata) updateField.metadata
211+
else currentField.metadata
205212
StructField(
206213
currentField.name,
207214
merge(currentField.dataType, updateField.dataType),
208215
currentField.nullable,
209-
currentField.metadata)
216+
updatedCurrentFieldMetadata)
210217
} catch {
211218
case NonFatal(e) =>
212219
throw new DeltaAnalysisException(

spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2486,7 +2486,8 @@ class SchemaUtilsSuite extends QueryTest
24862486
keepExistingType = false,
24872487
typeWideningMode = TypeWideningMode.NoTypeWidening,
24882488
caseSensitive = false,
2489-
allowOverride = false)
2489+
allowOverride = false,
2490+
overrideMetadata = false)
24902491
assert(mergedType1 === ArrayType(new StructType().add("a", IntegerType).add("b", IntegerType)))
24912492

24922493
// Map root type
@@ -2506,7 +2507,8 @@ class SchemaUtilsSuite extends QueryTest
25062507
keepExistingType = false,
25072508
typeWideningMode = TypeWideningMode.NoTypeWidening,
25082509
caseSensitive = false,
2509-
allowOverride = false)
2510+
allowOverride = false,
2511+
overrideMetadata = false)
25102512
assert(mergedType2 ===
25112513
MapType(
25122514
new StructType().add("a", IntegerType).add("b", IntegerType),
@@ -2526,7 +2528,8 @@ class SchemaUtilsSuite extends QueryTest
25262528
keepExistingType = false,
25272529
typeWideningMode = TypeWideningMode.NoTypeWidening,
25282530
caseSensitive = false,
2529-
allowOverride = true)
2531+
allowOverride = true,
2532+
overrideMetadata = false)
25302533
assert(mergedSchema1 === ArrayType(LongType))
25312534

25322535
// override nested type
@@ -2540,7 +2543,8 @@ class SchemaUtilsSuite extends QueryTest
25402543
keepExistingType = false,
25412544
typeWideningMode = TypeWideningMode.NoTypeWidening,
25422545
caseSensitive = false,
2543-
allowOverride = true)
2546+
allowOverride = true,
2547+
overrideMetadata = false)
25442548
assert(mergedSchema2 ===
25452549
ArrayType(new StructType().add("a", MapType(StringType, StringType)).add("b", StringType)))
25462550
}
@@ -2824,6 +2828,60 @@ class SchemaUtilsSuite extends QueryTest
28242828
}
28252829
}
28262830

2831+
test("schema merging override field metadata") {
2832+
val base1 = new StructType()
2833+
.add("a", IntegerType)
2834+
val update1 = new StructType()
2835+
.add("a", IntegerType, nullable = true, new MetadataBuilder().putString("x", "1").build())
2836+
val mergedSchema1 =
2837+
mergeDataTypes(
2838+
current = base1,
2839+
update = update1,
2840+
allowImplicitConversions = false,
2841+
keepExistingType = false,
2842+
typeWideningMode = TypeWideningMode.NoTypeWidening,
2843+
caseSensitive = false,
2844+
allowOverride = false,
2845+
overrideMetadata = true
2846+
)
2847+
assert(mergedSchema1 ===
2848+
new StructType()
2849+
.add("a", IntegerType, nullable = true,
2850+
new MetadataBuilder().putString("x", "1").build()))
2851+
2852+
// override nested metadata
2853+
val base2 = ArrayType(new StructType()
2854+
.add("a", new StructType()
2855+
.add("b", IntegerType)
2856+
.add("c", IntegerType)))
2857+
val update2 = ArrayType(new StructType()
2858+
.add("a", new StructType()
2859+
.add("b", IntegerType)
2860+
.add("c", IntegerType, nullable = true,
2861+
new MetadataBuilder().putString("c_metadata", "2").build()),
2862+
nullable = true,
2863+
new MetadataBuilder().putString("a_metadata", "3").build()))
2864+
val mergedSchema2 =
2865+
mergeDataTypes(
2866+
current = base2,
2867+
update = update2,
2868+
allowImplicitConversions = false,
2869+
keepExistingType = false,
2870+
typeWideningMode = TypeWideningMode.NoTypeWidening,
2871+
caseSensitive = false,
2872+
allowOverride = false,
2873+
overrideMetadata = true
2874+
)
2875+
assert(mergedSchema2 ===
2876+
ArrayType(new StructType()
2877+
.add("a", new StructType()
2878+
.add("b", IntegerType)
2879+
.add("c", IntegerType, nullable = true,
2880+
new MetadataBuilder().putString("c_metadata", "2").build()),
2881+
nullable = true,
2882+
new MetadataBuilder().putString("a_metadata", "3").build())))
2883+
}
2884+
28272885
////////////////////////////
28282886
// transformColumns
28292887
////////////////////////////

0 commit comments

Comments
 (0)