Skip to content

Commit e0d7665

Browse files
vinodkcgatorsmile
authored andcommitted
[SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support writing to Hive table which uses Avro schema url 'avro.schema.url'
## What changes were proposed in this pull request? SPARK-19580 Support for avro.schema.url while writing to hive table SPARK-19878 Add hive configuration when initialize hive serde in InsertIntoHiveTable.scala SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize, causing NullPointerException in AvroSerde when using avro.schema.url Support writing to Hive table which uses Avro schema url 'avro.schema.url' For ex: create external table avro_in (a string) stored as avro location '/avro-in/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); create external table avro_out (a string) stored as avro location '/avro-out/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); insert overwrite table avro_out select * from avro_in; // fails with java.lang.NullPointerException WARN AvroSerDe: Encountered exception determining schema. Returning signal schema to indicate problem java.lang.NullPointerException at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174) ## Changes proposed in this fix Currently 'null' value is passed to serializer, which causes NPE during insert operation, instead pass Hadoop configuration object ## How was this patch tested? Added new test case in VersionsSuite Author: vinodkc <[email protected]> Closes #19779 from vinodkc/br_Fix_SPARK-17920.
1 parent 881c5c8 commit e0d7665

File tree

2 files changed

+73
-3
lines changed

2 files changed

+73
-3
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class HiveOutputWriter(
116116

117117
private val serializer = {
118118
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
119-
serializer.initialize(null, tableDesc.getProperties)
119+
serializer.initialize(jobConf, tableDesc.getProperties)
120120
serializer
121121
}
122122

@@ -130,7 +130,7 @@ class HiveOutputWriter(
130130

131131
private val standardOI = ObjectInspectorUtils
132132
.getStandardObjectInspector(
133-
tableDesc.getDeserializer.getObjectInspector,
133+
tableDesc.getDeserializer(jobConf).getObjectInspector,
134134
ObjectInspectorCopyOption.JAVA)
135135
.asInstanceOf[StructObjectInspector]
136136

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hive.client
1919

20-
import java.io.{ByteArrayOutputStream, File, PrintStream}
20+
import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
2121
import java.net.URI
2222

2323
import org.apache.hadoop.conf.Configuration
@@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging {
841841
}
842842
}
843843

844+
test(s"$version: SPARK-17920: Insert into/overwrite avro table") {
845+
withTempDir { dir =>
846+
val path = dir.getAbsolutePath
847+
val schemaPath = s"""$path${File.separator}avroschemadir"""
848+
849+
new File(schemaPath).mkdir()
850+
val avroSchema =
851+
"""{
852+
| "name": "test_record",
853+
| "type": "record",
854+
| "fields": [ {
855+
| "name": "f0",
856+
| "type": [
857+
| "null",
858+
| {
859+
| "precision": 38,
860+
| "scale": 2,
861+
| "type": "bytes",
862+
| "logicalType": "decimal"
863+
| }
864+
| ]
865+
| } ]
866+
|}
867+
""".stripMargin
868+
val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc"""
869+
val schemaFile = new File(schemaPath, "avroDecimal.avsc")
870+
val writer = new PrintWriter(schemaFile)
871+
writer.write(avroSchema)
872+
writer.close()
873+
874+
val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
875+
val srcLocation = new File(url.getFile)
876+
val destTableName = "tab1"
877+
val srcTableName = "tab2"
878+
879+
withTable(srcTableName, destTableName) {
880+
versionSpark.sql(
881+
s"""
882+
|CREATE EXTERNAL TABLE $srcTableName
883+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
884+
|WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
885+
|STORED AS
886+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
887+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
888+
|LOCATION '$srcLocation'
889+
|TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
890+
""".stripMargin
891+
)
892+
893+
versionSpark.sql(
894+
s"""
895+
|CREATE TABLE $destTableName
896+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
897+
|WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
898+
|STORED AS
899+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
900+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
901+
|TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
902+
""".stripMargin
903+
)
904+
versionSpark.sql(
905+
s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName""")
906+
val result = versionSpark.table(srcTableName).collect()
907+
assert(versionSpark.table(destTableName).collect() === result)
908+
versionSpark.sql(
909+
s"""INSERT INTO TABLE $destTableName SELECT * FROM $srcTableName""")
910+
assert(versionSpark.table(destTableName).collect().toSeq === result ++ result)
911+
}
912+
}
913+
}
844914
// TODO: add more tests.
845915
}
846916
}

0 commit comments

Comments
 (0)