Skip to content

Commit a10b328

Browse files
skambhahvanhovell
authored andcommitted
[SPARK-22431][SQL] Ensure that the datatype in the schema for the table/view metadata is parseable by Spark before persisting it
## What changes were proposed in this pull request? * JIRA: [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431) : Creating Permanent view with illegal type **Description:** - It is possible in Spark SQL to create a permanent view that uses an nested field with an illegal name. - For example if we create the following view: ```create view x as select struct('a' as `$q`, 1 as b) q``` - A simple select fails with the following exception: ``` select * from x; org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int> at org.apache.spark.sql.hive.client.HiveClientImpl$.fromHiveColumn(HiveClientImpl.scala:812) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378) ... ``` **Issue/Analysis**: Right now, we can create a view with a schema that cannot be read back by Spark from the Hive metastore. For more details, please see the discussion about the analysis and proposed fix options in comment 1 and comment 2 in the [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431) **Proposed changes**: - Fix the hive table/view codepath to check whether the schema datatype is parseable by Spark before persisting it in the metastore. This change is localized to HiveClientImpl to do the check similar to the check in FromHiveColumn. This is fail-fast and we will avoid the scenario where we write something to the metastore that we are unable to read it back. - Added new unit tests - Ran the sql related unit test suites ( hive/test, sql/test, catalyst/test) OK With the fix: ``` create view x as select struct('a' as `$q`, 1 as b) q; 17/11/28 10:44:55 ERROR SparkSQLDriver: Failed in [create view x as select struct('a' as `$q`, 1 as b) q] org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int> at org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$getSparkSQLDataType(HiveClientImpl.scala:884) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906) at scala.collection.Iterator$class.foreach(Iterator.scala:893) ... ``` ## How was this patch tested? - New unit tests have been added. hvanhovell, Please review and share your thoughts/comments. Thank you so much. Author: Sunitha Kambhampati <[email protected]> Closes #19747 from skambha/spark22431.
1 parent da35574 commit a10b328

File tree

3 files changed

+111
-3
lines changed

3 files changed

+111
-3
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,21 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
117117
}
118118
}
119119

120+
test("SPARK-22431: table with nested type col with special char") {
121+
withTable("t") {
122+
spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING PARQUET")
123+
checkAnswer(spark.table("t"), Nil)
124+
}
125+
}
126+
127+
test("SPARK-22431: view with nested type") {
128+
withView("t", "v") {
129+
spark.sql("CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
130+
checkAnswer(spark.table("t"), Row(Row("a", 1)) :: Nil)
131+
spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
132+
checkAnswer(spark.table("t"), Row(Row("a", 1)) :: Nil)
133+
}
134+
}
120135
}
121136

122137
abstract class DDLSuite extends QueryTest with SQLTestUtils {

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ private[hive] class HiveClientImpl(
488488
}
489489

490490
override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
491+
verifyColumnDataType(table.dataSchema)
491492
client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists)
492493
}
493494

@@ -507,6 +508,7 @@ private[hive] class HiveClientImpl(
507508
// these properties are still available to the others that share the same Hive metastore.
508509
// If users explicitly alter these Hive-specific properties through ALTER TABLE DDL, we respect
509510
// these user-specified values.
511+
verifyColumnDataType(table.dataSchema)
510512
val hiveTable = toHiveTable(
511513
table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName))
512514
// Do not use `table.qualifiedName` here because this may be a rename
@@ -520,6 +522,7 @@ private[hive] class HiveClientImpl(
520522
newDataSchema: StructType,
521523
schemaProps: Map[String, String]): Unit = withHiveState {
522524
val oldTable = client.getTable(dbName, tableName)
525+
verifyColumnDataType(newDataSchema)
523526
val hiveCols = newDataSchema.map(toHiveColumn)
524527
oldTable.setFields(hiveCols.asJava)
525528

@@ -872,15 +875,19 @@ private[hive] object HiveClientImpl {
872875
new FieldSchema(c.name, typeString, c.getComment().orNull)
873876
}
874877

875-
/** Builds the native StructField from Hive's FieldSchema. */
876-
def fromHiveColumn(hc: FieldSchema): StructField = {
877-
val columnType = try {
878+
/** Get the Spark SQL native DataType from Hive's FieldSchema. */
879+
private def getSparkSQLDataType(hc: FieldSchema): DataType = {
880+
try {
878881
CatalystSqlParser.parseDataType(hc.getType)
879882
} catch {
880883
case e: ParseException =>
881884
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
882885
}
886+
}
883887

888+
/** Builds the native StructField from Hive's FieldSchema. */
889+
def fromHiveColumn(hc: FieldSchema): StructField = {
890+
val columnType = getSparkSQLDataType(hc)
884891
val metadata = if (hc.getType != columnType.catalogString) {
885892
new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
886893
} else {
@@ -895,6 +902,10 @@ private[hive] object HiveClientImpl {
895902
Option(hc.getComment).map(field.withComment).getOrElse(field)
896903
}
897904

905+
private def verifyColumnDataType(schema: StructType): Unit = {
906+
schema.foreach(col => getSparkSQLDataType(toHiveColumn(col)))
907+
}
908+
898909
private def toInputFormat(name: String) =
899910
Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
900911

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
174174
test("alter datasource table add columns - partitioned - orc") {
175175
testAddColumnPartitioned("orc")
176176
}
177+
178+
test("SPARK-22431: illegal nested type") {
179+
val queries = Seq(
180+
"CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
181+
"CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
182+
"CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
183+
184+
queries.foreach(query => {
185+
val err = intercept[SparkException] {
186+
spark.sql(query)
187+
}.getMessage
188+
assert(err.contains("Cannot recognize hive type string"))
189+
})
190+
191+
withView("v") {
192+
spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
193+
checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)
194+
195+
val err = intercept[SparkException] {
196+
spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
197+
}.getMessage
198+
assert(err.contains("Cannot recognize hive type string"))
199+
}
200+
}
201+
202+
test("SPARK-22431: table with nested type") {
203+
withTable("t", "x") {
204+
spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING PARQUET")
205+
checkAnswer(spark.table("t"), Nil)
206+
spark.sql("CREATE TABLE x (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
207+
checkAnswer(spark.table("x"), Nil)
208+
}
209+
}
210+
211+
test("SPARK-22431: view with nested type") {
212+
withView("v") {
213+
spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
214+
checkAnswer(spark.table("v"), Row(Row("a", 1)) :: Nil)
215+
216+
spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `b`, 1 AS b) q1")
217+
val df = spark.table("v")
218+
assert("q1".equals(df.schema.fields(0).name))
219+
checkAnswer(df, Row(Row("a", 1)) :: Nil)
220+
}
221+
}
222+
223+
test("SPARK-22431: alter table tests with nested types") {
224+
withTable("t1", "t2", "t3") {
225+
spark.sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
226+
spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`col1`:STRING, col2:Int>)")
227+
val newcol = spark.sql("SELECT * FROM t1").schema.fields(2).name
228+
assert("newcol1".equals(newcol))
229+
230+
spark.sql("CREATE TABLE t2(q STRUCT<`a`:INT, col2:STRING>, i1 INT) USING PARQUET")
231+
spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
232+
spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, col2:Int>)")
233+
234+
val df2 = spark.table("t2")
235+
checkAnswer(df2, Nil)
236+
assert("newcol1".equals(df2.schema.fields(2).name))
237+
assert("newcol2".equals(df2.schema.fields(3).name))
238+
239+
spark.sql("CREATE TABLE t3(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING PARQUET")
240+
spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
241+
spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, col2:Int>)")
242+
243+
val df3 = spark.table("t3")
244+
checkAnswer(df3, Nil)
245+
assert("newcol1".equals(df3.schema.fields(2).name))
246+
assert("newcol2".equals(df3.schema.fields(3).name))
247+
}
248+
}
249+
250+
test("SPARK-22431: negative alter table tests with nested types") {
251+
withTable("t1") {
252+
spark.sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
253+
val err = intercept[SparkException] {
254+
spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
255+
}.getMessage
256+
assert(err.contains("Cannot recognize hive type string:"))
257+
}
258+
}
177259
}
178260

179261
class HiveDDLSuite

0 commit comments

Comments
 (0)