Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 9bdc835

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1
### What changes were proposed in this pull request? Before the PR, Spark is unable to read the partitioned table created by Spark 2.1 when the table schema does not put the partitioning column at the end of the schema. [assert(partitionFields.map(_.name) == partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236) When reading the table metadata from the metastore, we also need to reorder the columns. ### How was this patch tested? Added test cases to check both Hive-serde and data source tables. Author: gatorsmile <[email protected]> Closes apache#18295 from gatorsmile/reorderReadSchema. (cherry picked from commit 0c88e8d) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 42cc830 commit 9bdc835

File tree

2 files changed

+52
-5
lines changed

2 files changed

+52
-5
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
717717
properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) })
718718
}
719719

720+
// Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition
721+
// columns are not put at the end of schema. We need to reorder it when reading the schema
722+
// from the table properties.
723+
private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = {
724+
val partitionFields = partColumnNames.map { partCol =>
725+
schema.find(_.name == partCol).getOrElse {
726+
throw new AnalysisException("The metadata is corrupted. Unable to find the " +
727+
s"partition column names from the schema. schema: ${schema.catalogString}. " +
728+
s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
729+
}
730+
}
731+
StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
732+
}
733+
720734
private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
721735
val hiveTable = table.copy(
722736
provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -726,10 +740,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
726740
// schema from table properties.
727741
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
728742
val schemaFromTableProps = getSchemaFromTableProperties(table)
729-
if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
743+
val partColumnNames = getPartitionColumnsFromTableProperties(table)
744+
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
745+
746+
if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
730747
hiveTable.copy(
731-
schema = schemaFromTableProps,
732-
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
748+
schema = reorderedSchema,
749+
partitionColumnNames = partColumnNames,
733750
bucketSpec = getBucketSpecFromTableProperties(table))
734751
} else {
735752
// Hive metastore may change the table schema, e.g. schema inference. If the table
@@ -759,11 +776,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
759776
}
760777
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
761778

779+
val schemaFromTableProps = getSchemaFromTableProperties(table)
780+
val partColumnNames = getPartitionColumnsFromTableProperties(table)
781+
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
782+
762783
table.copy(
763784
provider = Some(provider),
764785
storage = storageWithLocation,
765-
schema = getSchemaFromTableProperties(table),
766-
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
786+
schema = reorderedSchema,
787+
partitionColumnNames = partColumnNames,
767788
bucketSpec = getBucketSpecFromTableProperties(table),
768789
tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG))
769790
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,30 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
6363
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
6464
assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
6565
}
66+
67+
Seq("parquet", "hive").foreach { format =>
68+
test(s"Partition columns should be put at the end of table schema for the format $format") {
69+
val catalog = newBasicCatalog()
70+
val newSchema = new StructType()
71+
.add("col1", "int")
72+
.add("col2", "string")
73+
.add("partCol1", "int")
74+
.add("partCol2", "string")
75+
val table = CatalogTable(
76+
identifier = TableIdentifier("tbl", Some("db1")),
77+
tableType = CatalogTableType.MANAGED,
78+
storage = CatalogStorageFormat.empty,
79+
schema = new StructType()
80+
.add("col1", "int")
81+
.add("partCol1", "int")
82+
.add("partCol2", "string")
83+
.add("col2", "string"),
84+
provider = Some(format),
85+
partitionColumnNames = Seq("partCol1", "partCol2"))
86+
catalog.createTable(table, ignoreIfExists = false)
87+
88+
val restoredTable = externalCatalog.getTable("db1", "tbl")
89+
assert(restoredTable.schema == newSchema)
90+
}
91+
}
6692
}

0 commit comments

Comments
 (0)