Skip to content

Commit bc0498d

Browse files
maryannxuegatorsmile
authored andcommitted
[SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand
## What changes were proposed in this pull request? Change insert input schema type: "insertRelationType" -> "insertRelationType.asNullable", in order to avoid nullable being overridden. ## How was this patch tested? Added one test in InsertSuite. Author: Maryann Xue <[email protected]> Closes apache#21585 from maryannxue/spark-24583.
1 parent 2cb9763 commit bc0498d

File tree

2 files changed

+52
-4
lines changed

2 files changed

+52
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,8 @@ case class InsertIntoDataSourceCommand(
3838
override def run(sparkSession: SparkSession): Seq[Row] = {
3939
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
4040
val data = Dataset.ofRows(sparkSession, query)
41-
// Apply the schema of the existing table to the new data.
42-
val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
43-
relation.insert(df, overwrite)
41+
// Data has been casted to the target relation's schema by the PreprocessTableInsertion rule.
42+
relation.insert(data, overwrite)
4443

4544
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to this
4645
// data source relation.

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,36 @@ package org.apache.spark.sql.sources
2020
import java.io.File
2121

2222
import org.apache.spark.SparkException
23-
import org.apache.spark.sql.{AnalysisException, Row}
23+
import org.apache.spark.sql._
24+
import org.apache.spark.sql.catalyst.TableIdentifier
25+
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
2426
import org.apache.spark.sql.internal.SQLConf
2527
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
2628
import org.apache.spark.sql.test.SharedSQLContext
29+
import org.apache.spark.sql.types._
2730
import org.apache.spark.util.Utils
2831

32+
class SimpleInsertSource extends SchemaRelationProvider {
33+
override def createRelation(
34+
sqlContext: SQLContext,
35+
parameters: Map[String, String],
36+
schema: StructType): BaseRelation = {
37+
SimpleInsert(schema)(sqlContext.sparkSession)
38+
}
39+
}
40+
41+
case class SimpleInsert(userSpecifiedSchema: StructType)(@transient val sparkSession: SparkSession)
42+
extends BaseRelation with InsertableRelation {
43+
44+
override def sqlContext: SQLContext = sparkSession.sqlContext
45+
46+
override def schema: StructType = userSpecifiedSchema
47+
48+
override def insert(input: DataFrame, overwrite: Boolean): Unit = {
49+
input.collect
50+
}
51+
}
52+
2953
class InsertSuite extends DataSourceTest with SharedSQLContext {
3054
import testImplicits._
3155

@@ -520,4 +544,29 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
520544
}
521545
}
522546
}
547+
548+
test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") {
549+
withTable("test_table") {
550+
val schema = new StructType()
551+
.add("i", LongType, false)
552+
.add("s", StringType, false)
553+
val newTable = CatalogTable(
554+
identifier = TableIdentifier("test_table", None),
555+
tableType = CatalogTableType.EXTERNAL,
556+
storage = CatalogStorageFormat(
557+
locationUri = None,
558+
inputFormat = None,
559+
outputFormat = None,
560+
serde = None,
561+
compressed = false,
562+
properties = Map.empty),
563+
schema = schema,
564+
provider = Some(classOf[SimpleInsertSource].getName))
565+
566+
spark.sessionState.catalog.createTable(newTable, false)
567+
568+
sql("INSERT INTO TABLE test_table SELECT 1, 'a'")
569+
sql("INSERT INTO TABLE test_table SELECT 2, null")
570+
}
571+
}
523572
}

0 commit comments

Comments
 (0)