Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit b6749ba

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-21165] [SQL] [2.2] Use executedPlan instead of analyzedPlan in INSERT AS SELECT [WIP]
### What changes were proposed in this pull request? The input query schema of INSERT AS SELECT could be changed after optimization. For example, the following query's output schema is changed by the rule `SimplifyCasts` and `RemoveRedundantAliases`. ```SQL SELECT word, length, cast(first as string) as first FROM view1 ``` This PR is to fix the issue in Spark 2.2. Instead of using the analyzed plan of the input query, this PR use its executed plan to determine the attributes in `FileFormatWriter`. The related issue in the master branch has been fixed by apache#18064. After this PR is merged, I will submit a separate PR to merge the test case to the master. ### How was this patch tested? Added a test case Author: Xiao Li <[email protected]> Author: gatorsmile <[email protected]> Closes apache#18386 from gatorsmile/newRC5.
1 parent b99c0e9 commit b6749ba

File tree

8 files changed

+48
-51
lines changed

8 files changed

+48
-51
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -408,16 +408,6 @@ case class DataSource(
408408
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
409409
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
410410

411-
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
412-
// not need to have the query as child, to avoid to analyze an optimized query,
413-
// because InsertIntoHadoopFsRelationCommand will be optimized first.
414-
val partitionAttributes = partitionColumns.map { name =>
415-
val plan = data.logicalPlan
416-
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
417-
throw new AnalysisException(
418-
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
419-
}.asInstanceOf[Attribute]
420-
}
421411
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
422412
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
423413
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
@@ -431,7 +421,7 @@ case class DataSource(
431421
outputPath = outputPath,
432422
staticPartitions = Map.empty,
433423
ifPartitionNotExists = false,
434-
partitionColumns = partitionAttributes,
424+
partitionColumns = partitionColumns,
435425
bucketSpec = bucketSpec,
436426
fileFormat = format,
437427
options = options,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,13 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
188188
"Cannot overwrite a path that is also being read from.")
189189
}
190190

191-
val partitionSchema = actualQuery.resolve(
192-
t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
193191
val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
194192

195193
InsertIntoHadoopFsRelationCommand(
196194
outputPath,
197195
staticPartitions,
198196
i.ifPartitionNotExists,
199-
partitionSchema,
197+
partitionColumns = t.partitionSchema.map(_.name),
200198
t.bucketSpec,
201199
t.fileFormat,
202200
t.options,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ object FileFormatWriter extends Logging {
101101
committer: FileCommitProtocol,
102102
outputSpec: OutputSpec,
103103
hadoopConf: Configuration,
104-
partitionColumns: Seq[Attribute],
104+
partitionColumnNames: Seq[String],
105105
bucketSpec: Option[BucketSpec],
106106
refreshFunction: (Seq[TablePartitionSpec]) => Unit,
107107
options: Map[String, String]): Unit = {
@@ -111,9 +111,18 @@ object FileFormatWriter extends Logging {
111111
job.setOutputValueClass(classOf[InternalRow])
112112
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
113113

114-
val allColumns = queryExecution.logical.output
114+
val allColumns = queryExecution.executedPlan.output
115+
// Get the actual partition columns as attributes after matching them by name with
116+
// the given columns names.
117+
val partitionColumns = partitionColumnNames.map { col =>
118+
val nameEquality = sparkSession.sessionState.conf.resolver
119+
allColumns.find(f => nameEquality(f.name, col)).getOrElse {
120+
throw new RuntimeException(
121+
s"Partition column $col not found in schema ${queryExecution.executedPlan.schema}")
122+
}
123+
}
115124
val partitionSet = AttributeSet(partitionColumns)
116-
val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains)
125+
val dataColumns = allColumns.filterNot(partitionSet.contains)
117126

118127
val bucketIdExpression = bucketSpec.map { spec =>
119128
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ case class InsertIntoHadoopFsRelationCommand(
4444
outputPath: Path,
4545
staticPartitions: TablePartitionSpec,
4646
ifPartitionNotExists: Boolean,
47-
partitionColumns: Seq[Attribute],
47+
partitionColumns: Seq[String],
4848
bucketSpec: Option[BucketSpec],
4949
fileFormat: FileFormat,
5050
options: Map[String, String],
@@ -150,7 +150,7 @@ case class InsertIntoHadoopFsRelationCommand(
150150
outputSpec = FileFormatWriter.OutputSpec(
151151
qualifiedOutputPath.toString, customPartitionLocations),
152152
hadoopConf = hadoopConf,
153-
partitionColumns = partitionColumns,
153+
partitionColumnNames = partitionColumns,
154154
bucketSpec = bucketSpec,
155155
refreshFunction = refreshPartitionsCallback,
156156
options = options)
@@ -176,10 +176,10 @@ case class InsertIntoHadoopFsRelationCommand(
176176
customPartitionLocations: Map[TablePartitionSpec, String],
177177
committer: FileCommitProtocol): Unit = {
178178
val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
179-
"/" + partitionColumns.flatMap { p =>
180-
staticPartitions.get(p.name) match {
179+
"/" + partitionColumns.flatMap { col =>
180+
staticPartitions.get(col) match {
181181
case Some(value) =>
182-
Some(escapePathName(p.name) + "=" + escapePathName(value))
182+
Some(escapePathName(col) + "=" + escapePathName(value))
183183
case None =>
184184
None
185185
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
127127
val resolver = sparkSession.sessionState.conf.resolver
128128
val tableCols = existingTable.schema.map(_.name)
129129

130-
// As we are inserting into an existing table, we should respect the existing schema and
131-
// adjust the column order of the given dataframe according to it, or throw exception
132-
// if the column names do not match.
130+
// As we are inserting into an existing table, we should respect the existing schema, preserve
131+
// the case and adjust the column order of the given DataFrame according to it, or throw
132+
// an exception if the column names do not match.
133133
val adjustedColumns = tableCols.map { col =>
134-
query.resolve(Seq(col), resolver).getOrElse {
134+
query.resolve(Seq(col), resolver).map(Alias(_, col)()).getOrElse {
135135
val inputColumns = query.schema.map(_.name).mkString(", ")
136136
throw new AnalysisException(
137137
s"cannot resolve '$col' given input columns: [$inputColumns]")
@@ -168,15 +168,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
168168
""".stripMargin)
169169
}
170170

171-
val newQuery = if (adjustedColumns != query.output) {
172-
Project(adjustedColumns, query)
173-
} else {
174-
query
175-
}
176-
177171
c.copy(
178172
tableDesc = existingTable,
179-
query = Some(newQuery))
173+
query = Some(Project(adjustedColumns, query)))
180174

181175
// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
182176
// config, and do various checks:

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,23 +111,14 @@ class FileStreamSink(
111111
case _ => // Do nothing
112112
}
113113

114-
// Get the actual partition columns as attributes after matching them by name with
115-
// the given columns names.
116-
val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col =>
117-
val nameEquality = data.sparkSession.sessionState.conf.resolver
118-
data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
119-
throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}")
120-
}
121-
}
122-
123114
FileFormatWriter.write(
124115
sparkSession = sparkSession,
125116
queryExecution = data.queryExecution,
126117
fileFormat = fileFormat,
127118
committer = committer,
128119
outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
129120
hadoopConf = hadoopConf,
130-
partitionColumns = partitionColumns,
121+
partitionColumnNames = partitionColumnNames,
131122
bucketSpec = None,
132123
refreshFunction = _ => (),
133124
options = options)

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -314,21 +314,14 @@ case class InsertIntoHiveTable(
314314
outputPath = tmpLocation.toString,
315315
isAppend = false)
316316

317-
val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
318-
query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
319-
throw new AnalysisException(
320-
s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
321-
}.asInstanceOf[Attribute]
322-
}
323-
324317
FileFormatWriter.write(
325318
sparkSession = sparkSession,
326319
queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
327320
fileFormat = new HiveFileFormat(fileSinkConf),
328321
committer = committer,
329322
outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty),
330323
hadoopConf = hadoopConf,
331-
partitionColumns = partitionAttributes,
324+
partitionColumnNames = partitionColumnNames.takeRight(numDynamicPartitions),
332325
bucketSpec = None,
333326
refreshFunction = _ => (),
334327
options = Map.empty)

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,28 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
468468
}
469469
}
470470

471+
test("SPARK-21165: the query schema of INSERT is changed after optimization") {
472+
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
473+
withTable("tab1", "tab2") {
474+
Seq(("a", "b", 3)).toDF("word", "first", "length").write.saveAsTable("tab1")
475+
476+
spark.sql(
477+
"""
478+
|CREATE TABLE tab2 (word string, length int)
479+
|PARTITIONED BY (first string)
480+
""".stripMargin)
481+
482+
spark.sql(
483+
"""
484+
|INSERT INTO TABLE tab2 PARTITION(first)
485+
|SELECT word, length, cast(first as string) as first FROM tab1
486+
""".stripMargin)
487+
488+
checkAnswer(spark.table("tab2"), Row("a", 3, "b"))
489+
}
490+
}
491+
}
492+
471493
testPartitionedTable("insertInto() should reject extra columns") {
472494
tableName =>
473495
sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")

0 commit comments

Comments
 (0)