Skip to content

Commit ed8065c

Browse files
authored
[Spark] Fix a bug that prevents altering array/map/struct<varchar> column (delta-io#4499)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description ### Context Previously, there was a bug where char/varchar to string conversion didn't work correctly due to how they are internally represented. For example: - An `array<varchar<10>>` is represented as `(dataType = array<string>, char_varchar_metadata = "array<varchar<10>>")`. - If we convert `array<varchar<10>>` into `array<string>` by setting the `dataType` part with`array<string>`, nothing will change. This has been fixed in delta-io#3346. The idea is to first convert the representation into one without the metadata part. In the example above, `(dataType = array<string>, char_varchar_metadata = "array<varchar<10>>")` will first be converted into `(dataType = array<varchar<10>>, char_varchar_metadata = "")` before setting the `dataType` part to `string`. ### Problem In the previous fix, the non-metadata representation is left open and is converted back to the metadata-based representation where necessary. This is causing an issue where we incorrectly do this converting-back step that breaks ALTER COLUMN on any columns with the type `container<varchar>` where `container` can be `array`/`map`/`struct`. Specifically, the `verifyColumnChange` is taking in `change` which is a non-metadata-based representation, and a `oldColumnForVerification`, which is a metadata-based representation. This method has a check to prevent any change to complex data types (`array`/`map`/`struct`). Since it's a simple equality check, the check fails when comparing the non-metadata and metadata-based representation of the type. ### New Approach To avoid having to reason about what representation to use where, this PR takes a more targeted approach. It always defaults to the metadata-based representation of char/varchar and only do the representation conversion when setting the data type. ## How was this patch tested? New and existing unit tests ## Does this PR introduce _any_ user-facing changes? No
1 parent 029829c commit ed8065c

File tree

4 files changed

+169
-52
lines changed

4 files changed

+169
-52
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -747,14 +747,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
747747

748748
case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) =>
749749
// TODO: Theoretically we should be able to fetch the snapshot from a txn.
750-
val snapshotSchema = table.initialSnapshot.schema
751-
val schema = if (!spark.conf.get(DeltaSQLConf.DELTA_BYPASS_CHARVARCHAR_TO_STRING_FIX)) {
752-
// Convert (StringType, metadata = 'VARCHAR(n)') into (VARCHAR(n), metadata = '')
753-
// so that CHAR/VARCHAR to String conversion can be handled correctly.
754-
SchemaUtils.getRawSchemaWithoutCharVarcharMetadata(snapshotSchema)
755-
} else {
756-
snapshotSchema
757-
}
750+
val schema = table.initialSnapshot.schema
758751
def getColumn(fieldNames: Seq[String])
759752
: DeltaChangeColumnSpec = {
760753
columnUpdates.getOrElseUpdate(fieldNames, {
@@ -800,8 +793,9 @@ class DeltaCatalog extends DelegatingCatalogExtension
800793
case dataType: UpdateColumnType =>
801794
val field = dataType.fieldNames()
802795
val spec = getColumn(field)
803-
columnUpdates(field) = spec.copy(
804-
newColumn = spec.newColumn.copy(dataType = dataType.newDataType()))
796+
val newField = SchemaUtils.setFieldDataTypeCharVarcharSafe(
797+
spec.newColumn, dataType.newDataType())
798+
columnUpdates(field) = spec.copy(newColumn = newField)
805799

806800
case position: UpdateColumnPosition =>
807801
val field = position.fieldNames()

spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -765,11 +765,7 @@ case class AlterTableChangeColumnDeltaCommand(
765765
val metadata = txn.metadata
766766
val bypassCharVarcharToStringFix =
767767
sparkSession.conf.get(DeltaSQLConf.DELTA_BYPASS_CHARVARCHAR_TO_STRING_FIX)
768-
val oldSchema = if (bypassCharVarcharToStringFix) {
769-
metadata.schema
770-
} else {
771-
SchemaUtils.getRawSchemaWithoutCharVarcharMetadata(metadata.schema)
772-
}
768+
val oldSchema = metadata.schema
773769
val resolver = sparkSession.sessionState.conf.resolver
774770

775771
columnChanges.foreach(change => {
@@ -792,16 +788,7 @@ case class AlterTableChangeColumnDeltaCommand(
792788
transformSchema(prevSchema, Some(columnName)) {
793789
case (`columnPath`, struct @ StructType(fields), _) =>
794790
val oldColumn = struct(columnName)
795-
796-
// Analyzer already validates the char/varchar type change of ALTER COLUMN in
797-
// `CheckAnalysis.checkAlterTableCommand`. We should normalize char/varchar type
798-
// to string type first, then apply Delta-specific checks.
799-
val oldColumnForVerification = if (bypassCharVarcharToStringFix) {
800-
oldColumn
801-
} else {
802-
CharVarcharUtils.replaceCharVarcharWithStringInSchema(StructType(Seq(oldColumn))).head
803-
}
804-
verifyColumnChange(change, sparkSession, oldColumnForVerification, resolver, txn)
791+
verifyColumnChange(change, sparkSession, oldColumn, resolver, txn)
805792

806793
val newField = {
807794
if (change.syncIdentity) {
@@ -828,13 +815,10 @@ case class AlterTableChangeColumnDeltaCommand(
828815
case Some(newDefaultValue) => result.withCurrentDefaultValue(newDefaultValue)
829816
case None => result.clearCurrentDefaultValue()
830817
}
831-
832-
result
833-
.copy(
834-
name = newColumn.name,
835-
dataType =
836-
SchemaUtils.changeDataType(oldColumn.dataType, newColumn.dataType, resolver),
837-
nullable = newColumn.nullable)
818+
result = SchemaUtils.changeFieldDataTypeCharVarcharSafe(result, newColumn, resolver)
819+
result.copy(
820+
name = newColumn.name,
821+
nullable = newColumn.nullable)
838822
}
839823
}
840824

@@ -854,19 +838,23 @@ case class AlterTableChangeColumnDeltaCommand(
854838
case (`columnPath`, m: MapType, _) if columnName == "key" =>
855839
val originalField = StructField(columnName, m.keyType, nullable = false)
856840
verifyMapArrayChange(change, sparkSession, originalField, resolver, txn)
857-
m.copy(keyType = SchemaUtils.changeDataType(m.keyType, newColumn.dataType, resolver))
841+
val fieldWithNewDataType = SchemaUtils.changeFieldDataTypeCharVarcharSafe(
842+
originalField, newColumn, resolver)
843+
m.copy(keyType = fieldWithNewDataType.dataType)
858844

859845
case (`columnPath`, m: MapType, _) if columnName == "value" =>
860846
val originalField = StructField(columnName, m.valueType, nullable = m.valueContainsNull)
861847
verifyMapArrayChange(change, sparkSession, originalField, resolver, txn)
862-
m.copy(
863-
valueType = SchemaUtils.changeDataType(m.valueType, newColumn.dataType, resolver))
848+
val fieldWithNewDataType = SchemaUtils.changeFieldDataTypeCharVarcharSafe(
849+
originalField, newColumn, resolver)
850+
m.copy(valueType = fieldWithNewDataType.dataType)
864851

865852
case (`columnPath`, a: ArrayType, _) if columnName == "element" =>
866853
val originalField = StructField(columnName, a.elementType, nullable = a.containsNull)
867854
verifyMapArrayChange(change, sparkSession, originalField, resolver, txn)
868-
a.copy(elementType =
869-
SchemaUtils.changeDataType(a.elementType, newColumn.dataType, resolver))
855+
val fieldWithNewDataType = SchemaUtils.changeFieldDataTypeCharVarcharSafe(
856+
originalField, newColumn, resolver)
857+
a.copy(elementType = fieldWithNewDataType.dataType)
870858

871859
case (_, other @ (_: StructType | _: ArrayType | _: MapType), _) => other
872860
}

spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala

Lines changed: 97 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -163,23 +163,68 @@ object SchemaUtils extends DeltaLogging {
163163
}
164164

165165
/**
166-
* Converts StringType to CHAR/VARCHAR if that is the true type as per the metadata
167-
* and also strips this metadata from fields.
166+
* A char(x)/varchar(x) related types are internally stored as string type with the constraint
167+
* information stored in the metadata. For example:
168+
* + char(10) -> (string, char_varchar_metadata = "char(10)")
169+
* + array[varchar(10)]
170+
* -> (array[string], char_varchar_metadata = "array[varchar(10)]")
171+
* This method converts the string + metadata representation to the actual type.
172+
* + (string, metadata = "char(10)") -> (char(10), char_varchar_metadata = "")
173+
* + (array[string], metadata = "array[varchar(10)]")
174+
* -> (array[varchar(10)], char_varchar_metadata = "")
168175
*/
169-
def getRawSchemaWithoutCharVarcharMetadata(schema: StructType): StructType = {
170-
val fields = schema.map { field =>
171-
val rawField = CharVarcharUtils.getRawType(field.metadata)
172-
.map(dt => field.copy(dataType = dt))
173-
.getOrElse(field)
174-
val throwAwayAttrRef = AttributeReference(
175-
rawField.name,
176-
rawField.dataType,
177-
nullable = rawField.nullable,
178-
rawField.metadata)()
179-
val cleanedMetadata = CharVarcharUtils.cleanAttrMetadata(throwAwayAttrRef).metadata
180-
rawField.copy(metadata = cleanedMetadata)
176+
private def getRawFieldWithoutCharVarcharMetadata(field: StructField): StructField = {
177+
val rawField = CharVarcharUtils.getRawType(field.metadata)
178+
.map(dt => field.copy(dataType = dt))
179+
.getOrElse(field)
180+
val throwAwayAttrRef = AttributeReference(
181+
rawField.name,
182+
rawField.dataType,
183+
nullable = rawField.nullable,
184+
rawField.metadata)()
185+
val cleanedMetadata = CharVarcharUtils.cleanAttrMetadata(throwAwayAttrRef).metadata
186+
rawField.copy(metadata = cleanedMetadata)
187+
}
188+
189+
/**
190+
* Sets a data type to a field in a char/varchar-safe manner. A char(x)/varchar(x) related types
191+
* consists of two parts: a string-based type and the constraint information stored in the
192+
* metadata. Simply setting the data type will lead to unexpected results.
193+
*
194+
* For example, an array[varchar(10)] type is internally represented as
195+
* (array[string], char_varchar_metadata = "array[varchar(10)]"). If we convert it into a string
196+
* simply by setting the data type part, the metadata part will still be there, and the type will
197+
* still stay as varchar(10).
198+
*
199+
* This method first converts the field into its raw type without the metadata part, then sets the
200+
* data type to the new data type, and finally converts it back to the original representation.
201+
*
202+
* In the above example, this methods will convert the array[varchar(10)] representation into
203+
* (array[varchar(10)], char_varchar_metadata = ""), set the data type to string
204+
* (string, char_varchar_metadata = ""), and finally convert it back to
205+
* (string, char_varchar_metadata = ""), which happens to be the same.
206+
*/
207+
def setFieldDataTypeCharVarcharSafe(field: StructField, newDataType: DataType): StructField = {
208+
val byPassCharVarcharToStringFix =
209+
SparkSession.active.conf.get(DeltaSQLConf.DELTA_BYPASS_CHARVARCHAR_TO_STRING_FIX)
210+
// Convert the field into its raw type without the metadata part
211+
val rawField = if (byPassCharVarcharToStringFix) {
212+
field
213+
} else {
214+
getRawFieldWithoutCharVarcharMetadata(field)
215+
}
216+
217+
// Set the new data type
218+
val rawFieldWithNewDataType = rawField.copy(dataType = newDataType)
219+
220+
// Convert it back to the original representation
221+
if (byPassCharVarcharToStringFix) {
222+
rawFieldWithNewDataType
223+
} else {
224+
val throwAwayStructType = StructType(Seq(rawFieldWithNewDataType))
225+
CharVarcharUtils.replaceCharVarcharWithStringInSchema(throwAwayStructType)
226+
.head
181227
}
182-
StructType(fields)
183228
}
184229

185230
/**
@@ -1066,6 +1111,43 @@ def normalizeColumnNamesInDataType(
10661111
}
10671112
}
10681113

1114+
/**
1115+
* Copy the nested data type between two data types in a char/varchar safe manner.
1116+
* See documentation of [[getRawFieldWithoutCharVarcharMetadata]] and
1117+
* [[setFieldDataTypeCharVarcharSafe]] for more context.
1118+
*
1119+
* This method uses [[getRawFieldWithoutCharVarcharMetadata]] on both the source and
1120+
* target fields to ensure that the metadata information is included in the data type
1121+
* before changing the data type. For example, to convert from a varchar(1) to varchar(10),
1122+
* we first change their representation:
1123+
*
1124+
* Source: (string, char_varchar_metadata = "varchar(1)")
1125+
* -> (varchar(1), char_varchar_metadata = "")
1126+
* Target: (string, char_varchar_metadata = "varchar(10)")
1127+
* -> (varchar(10), char_varchar_metadata = "")
1128+
*
1129+
* Then, we change the data type of the target:
1130+
* (varchar(1), char_varchar_metadata = "") -> (varchar(10), char_varchar_metadata = "")
1131+
*
1132+
* Finally, we set the metadata back to the target field:
1133+
* (varchar(10), char_varchar_metadata = "") -> (string, char_varchar_metadata = "varchar(10)")
1134+
*/
1135+
def changeFieldDataTypeCharVarcharSafe(
1136+
fromField: StructField,
1137+
toField: StructField,
1138+
resolver: Resolver): StructField = {
1139+
val (safeFromField, safeToField) =
1140+
if (SparkSession.active.conf.get(DeltaSQLConf.DELTA_BYPASS_CHARVARCHAR_TO_STRING_FIX)) {
1141+
(fromField, toField)
1142+
} else {
1143+
(getRawFieldWithoutCharVarcharMetadata(fromField),
1144+
getRawFieldWithoutCharVarcharMetadata(toField))
1145+
}
1146+
val newDataType = SchemaUtils.changeDataType(
1147+
safeFromField.dataType, safeToField.dataType, resolver)
1148+
setFieldDataTypeCharVarcharSafe(fromField, newDataType)
1149+
}
1150+
10691151
/**
10701152
* Copy the nested data type between two data types.
10711153
*/

spark/src/test/scala/org/apache/spark/sql/delta/DeltaAlterTableTests.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1447,6 +1447,59 @@ trait DeltaAlterTableTests extends DeltaAlterTableTestBase {
14471447
}
14481448
}
14491449

1450+
ddlTest("CHANGE COLUMN - set comment on a array/map/struct<varchar> column") {
1451+
val schema = """
1452+
|arr_v array<varchar(1)>,
1453+
|map_vv map<varchar(1), varchar(1)>,
1454+
|map_sv map<string, varchar(1)>,
1455+
|map_vs map<varchar(1), string>,
1456+
|struct_v struct<v: varchar(1)>""".stripMargin
1457+
def testCommentOnVarcharInContainer(
1458+
colName: String,
1459+
expectedType: String,
1460+
goodInsertValue: String,
1461+
badInsertValue: String
1462+
): Unit = {
1463+
withDeltaTable(schema = schema) { tableName =>
1464+
sql(s"ALTER TABLE $tableName CHANGE COLUMN $colName COMMENT 'test comment'")
1465+
val expectedResult = Row(colName, expectedType, "test comment") :: Nil
1466+
checkAnswer(
1467+
sql(s"DESCRIBE $tableName").filter(s"col_name = '$colName'"),
1468+
expectedResult)
1469+
sql(s"INSERT into $tableName($colName) values ($goodInsertValue)")
1470+
val e = intercept[DeltaInvariantViolationException] {
1471+
sql(s"INSERT into $tableName($colName) values ($badInsertValue)")
1472+
}
1473+
assert(e.getMessage.contains("exceeds char/varchar type length limitation"))
1474+
}
1475+
}
1476+
testCommentOnVarcharInContainer(
1477+
colName = "arr_v",
1478+
expectedType = "array<string>",
1479+
goodInsertValue = "array('1')",
1480+
badInsertValue = "array('12')")
1481+
testCommentOnVarcharInContainer(
1482+
colName = "map_vv",
1483+
expectedType = "map<string,string>",
1484+
goodInsertValue = "map('1', '1')",
1485+
badInsertValue = "map('12', '12')")
1486+
testCommentOnVarcharInContainer(
1487+
colName = "map_sv",
1488+
expectedType = "map<string,string>",
1489+
goodInsertValue = "map('123', '1')",
1490+
badInsertValue = "map('123', '12')")
1491+
testCommentOnVarcharInContainer(
1492+
colName = "map_vs",
1493+
expectedType = "map<string,string>",
1494+
goodInsertValue = "map('1', '123')",
1495+
badInsertValue = "map('12', '123')")
1496+
testCommentOnVarcharInContainer(
1497+
colName = "struct_v",
1498+
expectedType = "struct<v:string>",
1499+
goodInsertValue = "named_struct('v', '1')",
1500+
badInsertValue = "named_struct('v', '12')")
1501+
}
1502+
14501503
ddlTest("CHANGE COLUMN - set a default value for a varchar column") {
14511504
withDeltaTable(schema = "v varchar(1)") { tableName =>
14521505
sql(s"ALTER TABLE $tableName " +

0 commit comments

Comments
 (0)