Skip to content

Commit a00774a

Browse files
viiryaHyukjinKwon
authored andcommitted
[SPARK-28054][SQL] Fix error when insert Hive partitioned table dynamically where partition name is upper case
## What changes were proposed in this pull request? When we use upper case partition name in Hive table, like: ``` CREATE TABLE src (KEY STRING, VALUE STRING) PARTITIONED BY (DS STRING) ``` Then, `insert into table` query doesn't work ``` INSERT INTO TABLE src PARTITION(ds) SELECT 'k' key, 'v' value, '1' ds // or INSERT INTO TABLE src PARTITION(DS) SELECT 'k' KEY, 'v' VALUE, '1' DS ``` ``` [info] org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException: Partition spec {ds=, DS=1} contains non-partition columns; ``` As Hive metastore is not case preserving and keeps partition columns with lower cased names, we lowercase column names in partition spec before passing to Hive client. But we write upper case column names in partition paths. However, when calling `loadDynamicPartitions` to do `insert into table` for dynamic partition, Hive calculates full path spec for partition paths. So it calculates a partition spec like `{ds=, DS=1}` in above case and fails partition column validation. This patch is proposed to fix the issue by lowercasing the column names in written partition paths for Hive partitioned table. This fix touchs `saveAsHiveFile` method, which is used in `InsertIntoHiveDirCommand` and `InsertIntoHiveTable` commands. Among them, only `InsertIntoHiveTable` passes `partitionAttributes` parameter. So I think this change only affects `InsertIntoHiveTable` command. ## How was this patch tested? Added test. Closes apache#24886 from viirya/SPARK-28054. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 1a915bf commit a00774a

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
8383
jobId = java.util.UUID.randomUUID().toString,
8484
outputPath = outputLocation)
8585

86+
// SPARK-28054: Hive metastore is not case preserving and keeps partition columns
87+
// with lower cased names, Hive will validate the column names in partition spec and
88+
// the partition paths. Besides lowercasing the column names in the partition spec,
89+
// we also need to lowercase the column names in written partition paths.
90+
// scalastyle:off caselocale
91+
val hiveCompatiblePartitionColumns = partitionAttributes.map { attr =>
92+
attr.withName(attr.name.toLowerCase)
93+
}
94+
// scalastyle:on caselocale
95+
8696
FileFormatWriter.write(
8797
sparkSession = sparkSession,
8898
plan = plan,
@@ -91,7 +101,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
91101
outputSpec =
92102
FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns),
93103
hadoopConf = hadoopConf,
94-
partitionColumns = partitionAttributes,
104+
partitionColumns = hiveCompatiblePartitionColumns,
95105
bucketSpec = None,
96106
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
97107
options = Map.empty)

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,6 +1188,24 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
11881188
}
11891189
}
11901190
}
1191+
1192+
test("SPARK-28054: Unable to insert partitioned table when partition name is upper case") {
1193+
withTable("spark_28054_test") {
1194+
sql("set hive.exec.dynamic.partition.mode=nonstrict")
1195+
sql("CREATE TABLE spark_28054_test (KEY STRING, VALUE STRING) PARTITIONED BY (DS STRING)")
1196+
1197+
sql("INSERT INTO TABLE spark_28054_test PARTITION(DS) SELECT 'k' KEY, 'v' VALUE, '1' DS")
1198+
1199+
assertResult(Array(Row("k", "v", "1"))) {
1200+
sql("SELECT * from spark_28054_test").collect()
1201+
}
1202+
1203+
sql("INSERT INTO TABLE spark_28054_test PARTITION(ds) SELECT 'k' key, 'v' value, '2' ds")
1204+
assertResult(Array(Row("k", "v", "1"), Row("k", "v", "2"))) {
1205+
sql("SELECT * from spark_28054_test").collect()
1206+
}
1207+
}
1208+
}
11911209
}
11921210

11931211
// for SPARK-2180 test

0 commit comments

Comments
 (0)