Skip to content

Commit 99ff4fd

Browse files
authored
Reject reads on table with unsupported type changes (delta-io#2787)
## Description Adds a guardrail for the type widening table feature to reject reads when an unsupported change was applied to the table. This should never happen unless an implementation doesn't respect the type widening feature specification, which explicitly lists type changes that are allowed. ## How was this patch tested? - Added a test manually committing an invalid type change.
1 parent 79a8558 commit 99ff4fd

File tree

7 files changed

+110
-0
lines changed

7 files changed

+110
-0
lines changed

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2655,6 +2655,12 @@
26552655
],
26562656
"sqlState" : "0AKDC"
26572657
},
2658+
"DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA" : {
2659+
"message" : [
2660+
"Unable to operate on this table because an unsupported type change was applied. Field <fieldName> was changed from <fromType> to <toType>."
2661+
],
2662+
"sqlState" : "0AKDC"
2663+
},
26582664
"DELTA_UNSUPPORTED_VACUUM_SPECIFIC_PARTITION" : {
26592665
"message" : [
26602666
"Please provide the base path (<baseDeltaPath>) when Vacuuming Delta tables. Vacuuming specific partitions is currently not supported."

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,17 @@ trait DeltaErrorsBase
659659
)
660660
}
661661

662+
def unsupportedTypeChangeInSchema(
663+
fieldPath: Seq[String],
664+
fromType: DataType,
665+
toType: DataType)
666+
: Throwable = {
667+
new DeltaIllegalStateException(
668+
errorClass = "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA",
669+
messageParameters = Array(SchemaUtils.prettyFieldName(fieldPath), fromType.sql, toType.sql)
670+
)
671+
}
672+
662673
def cannotWriteIntoView(table: TableIdentifier): Throwable = {
663674
new DeltaAnalysisException(
664675
errorClass = "DELTA_CANNOT_WRITE_INTO_VIEW",

spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ case class DeltaParquetFileFormat(
6868
"Wrong arguments for Delta table scan with deletion vectors")
6969
}
7070

71+
TypeWidening.assertTableReadable(protocol, metadata)
72+
7173
val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
7274
val referenceSchema: StructType = metadata.schema
7375

spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,29 @@ object TypeWidening {
8989
case _ => false
9090
}
9191

92+
/**
93+
* Asserts that the given table doesn't contain any unsupported type changes. This should never
94+
* happen unless a non-compliant writer applied a type change that is not part of the feature
95+
* specification.
96+
*/
97+
def assertTableReadable(protocol: Protocol, metadata: Metadata): Unit = {
98+
if (!isSupported(protocol) ||
99+
!TypeWideningMetadata.containsTypeWideningMetadata(metadata.schema)) {
100+
return
101+
}
102+
103+
TypeWideningMetadata.getAllTypeChanges(metadata.schema).foreach {
104+
case (_, TypeChange(_, from: AtomicType, to: AtomicType, _))
105+
if isTypeChangeSupported(from, to) =>
106+
case (fieldPath, invalidChange) =>
107+
throw DeltaErrors.unsupportedTypeChangeInSchema(
108+
fieldPath ++ invalidChange.fieldPath,
109+
invalidChange.fromType,
110+
invalidChange.toType
111+
)
112+
}
113+
}
114+
92115
/**
93116
* Filter the given list of files to only keep files that were written before the latest type
94117
* change, if any. These older files contain a column or field with a type that is different than

spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta
1818

1919
import scala.collection.mutable
2020

21+
import org.apache.spark.sql.delta.actions.Protocol
2122
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
2223
import org.apache.spark.sql.util.ScalaExtensions._
2324

@@ -225,6 +226,27 @@ private[delta] object TypeWideningMetadata {
225226
case _ => false
226227
}
227228

229+
/**
230+
* Return all type changes recorded in the table schema.
231+
* @return A list of tuples (field path, type change).
232+
*/
233+
def getAllTypeChanges(schema: StructType): Seq[(Seq[String], TypeChange)] = {
234+
if (!containsTypeWideningMetadata(schema)) return Seq.empty
235+
236+
val allStructFields = SchemaUtils.filterRecursively(schema, checkComplexTypes = true) {
237+
_ => true
238+
}
239+
240+
def getTypeChanges(field: StructField): Seq[TypeChange] =
241+
fromField(field)
242+
.map(_.typeChanges)
243+
.getOrElse(Seq.empty)
244+
245+
allStructFields.flatMap { case (fieldPath, field) =>
246+
getTypeChanges(field).map((fieldPath :+ field.name, _))
247+
}
248+
}
249+
228250
/** Return the version of the latest type change recorded in the schema metadata */
229251
def getLatestTypeChangeVersion(schema: StructType): Option[Long] = {
230252
val allStructFields = SchemaUtils.filterRecursively(schema, checkComplexTypes = true) {

spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,15 @@ trait DeltaErrorsSuiteBase
10681068
|""".stripMargin
10691069
))
10701070
}
1071+
{
1072+
val e = intercept[DeltaIllegalStateException] {
1073+
throw DeltaErrors.unsupportedTypeChangeInSchema(Seq("s", "a"), IntegerType, StringType)
1074+
}
1075+
checkErrorMessage(e, Some("DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA"), Some("0AKDC"),
1076+
Some("Unable to operate on this table because an unsupported type change was applied. " +
1077+
"Field s.a was changed from INT to STRING."
1078+
))
1079+
}
10711080
{
10721081
val e = intercept[DeltaAnalysisException] {
10731082
val classConf = Seq(("classKey", "classVal"))

spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta
1818

1919
import java.util.concurrent.TimeUnit
2020

21+
import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
2122
import org.apache.spark.sql.delta.actions.AddFile
2223
import org.apache.spark.sql.delta.catalog.DeltaTableV2
2324
import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand
@@ -979,4 +980,40 @@ trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach {
979980
}
980981
}
981982
}
983+
984+
test("unsupported type changes applied to the table") {
985+
sql(s"CREATE TABLE delta.`$tempDir` (a array<int>) USING DELTA")
986+
val metadata = new MetadataBuilder()
987+
.putMetadataArray("delta.typeChanges", Array(
988+
new MetadataBuilder()
989+
.putString("toType", "string")
990+
.putString("fromType", "int")
991+
.putLong("tableVersion", 2)
992+
.putString("fieldPath", "element")
993+
.build()
994+
)).build()
995+
996+
// Add an unsupported type change to the table schema. Only an implementation that isn't
997+
// compliant with the feature specification would allow this.
998+
deltaLog.withNewTransaction { txn =>
999+
txn.commit(
1000+
Seq(txn.snapshot.metadata.copy(
1001+
schemaString = new StructType()
1002+
.add("a", StringType, nullable = true, metadata).json
1003+
)),
1004+
ManualUpdate)
1005+
}
1006+
1007+
checkError(
1008+
exception = intercept[DeltaIllegalStateException] {
1009+
readDeltaTable(tempPath).collect()
1010+
},
1011+
errorClass = "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA",
1012+
parameters = Map(
1013+
"fieldName" -> "a.element",
1014+
"fromType" -> "INT",
1015+
"toType" -> "STRING"
1016+
)
1017+
)
1018+
}
9821019
}

0 commit comments

Comments
 (0)