Skip to content

Commit a172276

Browse files
authored
Automatic type widening in INSERT (delta-io#2785)
#### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This change is part of the type widening table feature. Type widening feature request: delta-io#2622 Type Widening protocol RFC: delta-io#2624 It adds automatic type widening as part of schema evolution in INSERT. During resolution, when schema evolution and type widening are enabled, type differences between the input query and the target table are handled as follows: - If the type difference qualifies for automatic type evolution: the input type is left as is, the data will be inserted with the new type and the table schema will be updated in `ImplicitMetadataOperation` (already implemented as part of MERGE support) - If the type difference doesn't qualify for automatic type evolution: the current behavior is preserved: a cast is added from the input type to the existing target type. ## How was this patch tested? - Tests are added to `DeltaTypeWideningAutomaticSuite` to cover type evolution in INSERT ## This PR introduces the following *user-facing* changes The table feature is available in testing only, there's no user-facing changes as of now. When automatic schema evolution is enabled in INSERT and the source schema contains a type that is wider than the target schema: With type widening disabled: the type in the target schema is not changed. A cast is added to the input to insert to match the expected target type. With type widening enabled: the type in the target schema is updated to the wider source type. ``` -- target: key int, value short -- source: key int, value int INSERT INTO target SELECT * FROM source ``` After the INSERT operation, the target schema is `key int, value int`.
1 parent 36f95dd commit a172276

File tree

3 files changed

+577
-27
lines changed

3 files changed

+577
-27
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
6666
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
6767
import org.apache.spark.sql.execution.streaming.StreamingRelation
6868
import org.apache.spark.sql.internal.SQLConf
69-
import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, MapType, StructField, StructType}
69+
import org.apache.spark.sql.types._
7070
import org.apache.spark.sql.util.CaseInsensitiveStringMap
7171

7272
/**
@@ -81,8 +81,8 @@ class DeltaAnalysis(session: SparkSession)
8181
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
8282
// INSERT INTO by ordinal and df.insertInto()
8383
case a @ AppendDelta(r, d) if !a.isByName &&
84-
needsSchemaAdjustmentByOrdinal(d.name(), a.query, r.schema) =>
85-
val projection = resolveQueryColumnsByOrdinal(a.query, r.output, d.name())
84+
needsSchemaAdjustmentByOrdinal(d, a.query, r.schema) =>
85+
val projection = resolveQueryColumnsByOrdinal(a.query, r.output, d)
8686
if (projection != a.query) {
8787
a.copy(query = projection)
8888
} else {
@@ -208,8 +208,8 @@ class DeltaAnalysis(session: SparkSession)
208208

209209
// INSERT OVERWRITE by ordinal and df.insertInto()
210210
case o @ OverwriteDelta(r, d) if !o.isByName &&
211-
needsSchemaAdjustmentByOrdinal(d.name(), o.query, r.schema) =>
212-
val projection = resolveQueryColumnsByOrdinal(o.query, r.output, d.name())
211+
needsSchemaAdjustmentByOrdinal(d, o.query, r.schema) =>
212+
val projection = resolveQueryColumnsByOrdinal(o.query, r.output, d)
213213
if (projection != o.query) {
214214
val aliases = AttributeMap(o.query.output.zip(projection.output).collect {
215215
case (l: AttributeReference, r: AttributeReference) if !l.sameRef(r) => (l, r)
@@ -245,9 +245,9 @@ class DeltaAnalysis(session: SparkSession)
245245
case o @ DynamicPartitionOverwriteDelta(r, d) if o.resolved
246246
=>
247247
val adjustedQuery = if (!o.isByName &&
248-
needsSchemaAdjustmentByOrdinal(d.name(), o.query, r.schema)) {
248+
needsSchemaAdjustmentByOrdinal(d, o.query, r.schema)) {
249249
// INSERT OVERWRITE by ordinal and df.insertInto()
250-
resolveQueryColumnsByOrdinal(o.query, r.output, d.name())
250+
resolveQueryColumnsByOrdinal(o.query, r.output, d)
251251
} else if (o.isByName && o.origin.sqlText.nonEmpty &&
252252
needsSchemaAdjustmentByName(o.query, r.output, d)) {
253253
// INSERT OVERWRITE by name
@@ -850,12 +850,14 @@ class DeltaAnalysis(session: SparkSession)
850850
* type column/field.
851851
*/
852852
private def resolveQueryColumnsByOrdinal(
853-
query: LogicalPlan, targetAttrs: Seq[Attribute], tblName: String): LogicalPlan = {
853+
query: LogicalPlan, targetAttrs: Seq[Attribute], deltaTable: DeltaTableV2): LogicalPlan = {
854854
// always add a Cast. it will be removed in the optimizer if it is unnecessary.
855855
val project = query.output.zipWithIndex.map { case (attr, i) =>
856856
if (i < targetAttrs.length) {
857857
val targetAttr = targetAttrs(i)
858-
addCastToColumn(attr, targetAttr, tblName)
858+
addCastToColumn(attr, targetAttr, deltaTable.name(),
859+
allowTypeWidening = allowTypeWidening(deltaTable)
860+
)
859861
} else {
860862
attr
861863
}
@@ -890,47 +892,69 @@ class DeltaAnalysis(session: SparkSession)
890892
.getOrElse {
891893
throw DeltaErrors.missingColumn(attr, targetAttrs)
892894
}
893-
addCastToColumn(attr, targetAttr, deltaTable.name())
895+
addCastToColumn(attr, targetAttr, deltaTable.name(),
896+
allowTypeWidening = allowTypeWidening(deltaTable)
897+
)
894898
}
895899
Project(project, query)
896900
}
897901

898902
private def addCastToColumn(
899903
attr: Attribute,
900904
targetAttr: Attribute,
901-
tblName: String): NamedExpression = {
905+
tblName: String,
906+
allowTypeWidening: Boolean): NamedExpression = {
902907
val expr = (attr.dataType, targetAttr.dataType) match {
903908
case (s, t) if s == t =>
904909
attr
905910
case (s: StructType, t: StructType) if s != t =>
906-
addCastsToStructs(tblName, attr, s, t)
911+
addCastsToStructs(tblName, attr, s, t, allowTypeWidening)
907912
case (ArrayType(s: StructType, sNull: Boolean), ArrayType(t: StructType, tNull: Boolean))
908913
if s != t && sNull == tNull =>
909-
addCastsToArrayStructs(tblName, attr, s, t, sNull)
914+
addCastsToArrayStructs(tblName, attr, s, t, sNull, allowTypeWidening)
915+
case (s: AtomicType, t: AtomicType)
916+
if allowTypeWidening && TypeWidening.isTypeChangeSupportedForSchemaEvolution(t, s) =>
917+
// Keep the type from the query, the target schema will be updated to widen the existing
918+
// type to match it.
919+
attr
910920
case _ =>
911921
getCastFunction(attr, targetAttr.dataType, targetAttr.name)
912922
}
913923
Alias(expr, targetAttr.name)(explicitMetadata = Option(targetAttr.metadata))
914924
}
915925

926+
/**
927+
* Whether inserting values that have a wider type than the table has is allowed. In that case,
928+
* values are not downcasted to the current table type and the table schema is updated instead to
929+
* use the wider type.
930+
*/
931+
private def allowTypeWidening(deltaTable: DeltaTableV2): Boolean = {
932+
val options = new DeltaOptions(Map.empty[String, String], conf)
933+
options.canMergeSchema && TypeWidening.isEnabled(
934+
deltaTable.initialSnapshot.protocol,
935+
deltaTable.initialSnapshot.metadata
936+
)
937+
}
938+
916939
/**
917940
* With Delta, we ACCEPT_ANY_SCHEMA, meaning that Spark doesn't automatically adjust the schema
918941
* of INSERT INTO. This allows us to perform better schema enforcement/evolution. Since Spark
919942
* skips this step, we see if we need to perform any schema adjustment here.
920943
*/
921944
private def needsSchemaAdjustmentByOrdinal(
922-
tableName: String,
945+
deltaTable: DeltaTableV2,
923946
query: LogicalPlan,
924947
schema: StructType): Boolean = {
925948
val output = query.output
926949
if (output.length < schema.length) {
927-
throw DeltaErrors.notEnoughColumnsInInsert(tableName, output.length, schema.length)
950+
throw DeltaErrors.notEnoughColumnsInInsert(deltaTable.name(), output.length, schema.length)
928951
}
929952
// Now we should try our best to match everything that already exists, and leave the rest
930953
// for schema evolution to WriteIntoDelta
931954
val existingSchemaOutput = output.take(schema.length)
932955
existingSchemaOutput.map(_.name) != schema.map(_.name) ||
933-
!SchemaUtils.isReadCompatible(schema.asNullable, existingSchemaOutput.toStructType)
956+
!SchemaUtils.isReadCompatible(schema.asNullable, existingSchemaOutput.toStructType,
957+
allowTypeWidening = allowTypeWidening(deltaTable))
934958
}
935959

936960
/**
@@ -984,7 +1008,10 @@ class DeltaAnalysis(session: SparkSession)
9841008
}
9851009
val specifiedTargetAttrs = targetAttrs.filter(col => userSpecifiedNames.contains(col.name))
9861010
!SchemaUtils.isReadCompatible(
987-
specifiedTargetAttrs.toStructType.asNullable, query.output.toStructType)
1011+
specifiedTargetAttrs.toStructType.asNullable,
1012+
query.output.toStructType,
1013+
allowTypeWidening = allowTypeWidening(deltaTable)
1014+
)
9881015
}
9891016

9901017
// Get cast operation for the level of strictness in the schema a user asked for
@@ -1014,7 +1041,8 @@ class DeltaAnalysis(session: SparkSession)
10141041
tableName: String,
10151042
parent: NamedExpression,
10161043
source: StructType,
1017-
target: StructType): NamedExpression = {
1044+
target: StructType,
1045+
allowTypeWidening: Boolean): NamedExpression = {
10181046
if (source.length < target.length) {
10191047
throw DeltaErrors.notEnoughColumnsInInsert(
10201048
tableName, source.length, target.length, Some(parent.qualifiedName))
@@ -1025,12 +1053,20 @@ class DeltaAnalysis(session: SparkSession)
10251053
case t: StructType =>
10261054
val subField = Alias(GetStructField(parent, i, Option(name)), target(i).name)(
10271055
explicitMetadata = Option(metadata))
1028-
addCastsToStructs(tableName, subField, nested, t)
1056+
addCastsToStructs(tableName, subField, nested, t, allowTypeWidening)
10291057
case o =>
10301058
val field = parent.qualifiedName + "." + name
10311059
val targetName = parent.qualifiedName + "." + target(i).name
10321060
throw DeltaErrors.cannotInsertIntoColumn(tableName, field, targetName, o.simpleString)
10331061
}
1062+
1063+
case (StructField(name, dt: AtomicType, _, _), i) if i < target.length && allowTypeWidening &&
1064+
TypeWidening.isTypeChangeSupportedForSchemaEvolution(
1065+
target(i).dataType.asInstanceOf[AtomicType], dt) =>
1066+
val targetAttr = target(i)
1067+
Alias(
1068+
GetStructField(parent, i, Option(name)),
1069+
targetAttr.name)(explicitMetadata = Option(targetAttr.metadata))
10341070
case (other, i) if i < target.length =>
10351071
val targetAttr = target(i)
10361072
Alias(
@@ -1054,9 +1090,11 @@ class DeltaAnalysis(session: SparkSession)
10541090
parent: NamedExpression,
10551091
source: StructType,
10561092
target: StructType,
1057-
sourceNullable: Boolean): Expression = {
1093+
sourceNullable: Boolean,
1094+
allowTypeWidening: Boolean): Expression = {
10581095
val structConverter: (Expression, Expression) => Expression = (_, i) =>
1059-
addCastsToStructs(tableName, Alias(GetArrayItem(parent, i), i.toString)(), source, target)
1096+
addCastsToStructs(
1097+
tableName, Alias(GetArrayItem(parent, i), i.toString)(), source, target, allowTypeWidening)
10601098
val transformLambdaFunc = {
10611099
val elementVar = NamedLambdaVariable("elementVar", source, sourceNullable)
10621100
val indexVar = NamedLambdaVariable("indexVar", IntegerType, false)

spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,8 @@ def normalizeColumnNamesInDataType(
352352
* new schema of a Delta table can be used with a previously analyzed LogicalPlan. Our
353353
* rules are to return false if:
354354
* - Dropping any column that was present in the existing schema, if not allowMissingColumns
355-
* - Any change of datatype
355+
* - Any change of datatype, if not allowTypeWidening. Any non-widening change of datatype
356+
* otherwise.
356357
* - Change of partition columns. Although analyzed LogicalPlan is not changed,
357358
* physical structure of data is changed and thus is considered not read compatible.
358359
* - If `forbidTightenNullability` = true:
@@ -373,6 +374,7 @@ def normalizeColumnNamesInDataType(
373374
readSchema: StructType,
374375
forbidTightenNullability: Boolean = false,
375376
allowMissingColumns: Boolean = false,
377+
allowTypeWidening: Boolean = false,
376378
newPartitionColumns: Seq[String] = Seq.empty,
377379
oldPartitionColumns: Seq[String] = Seq.empty): Boolean = {
378380

@@ -387,7 +389,7 @@ def normalizeColumnNamesInDataType(
387389
def isDatatypeReadCompatible(existing: DataType, newtype: DataType): Boolean = {
388390
(existing, newtype) match {
389391
case (e: StructType, n: StructType) =>
390-
isReadCompatible(e, n, forbidTightenNullability)
392+
isReadCompatible(e, n, forbidTightenNullability, allowTypeWidening = allowTypeWidening)
391393
case (e: ArrayType, n: ArrayType) =>
392394
// if existing elements are non-nullable, so should be the new element
393395
isNullabilityCompatible(e.containsNull, n.containsNull) &&
@@ -397,6 +399,8 @@ def normalizeColumnNamesInDataType(
397399
isNullabilityCompatible(e.valueContainsNull, n.valueContainsNull) &&
398400
isDatatypeReadCompatible(e.keyType, n.keyType) &&
399401
isDatatypeReadCompatible(e.valueType, n.valueType)
402+
case (e: AtomicType, n: AtomicType) if allowTypeWidening =>
403+
TypeWidening.isTypeChangeSupportedForSchemaEvolution(e, n)
400404
case (a, b) => a == b
401405
}
402406
}

0 commit comments

Comments
 (0)