Skip to content

Commit 14bdb25

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-18464][SQL][FOLLOWUP] support old table which doesn't store schema in table properties
## What changes were proposed in this pull request? This is a follow-up of apache#15900 , to fix one more bug: When table schema is empty and need to be inferred at runtime, we should not resolve parent plans before the schema has been inferred, or the parent plans will be resolved against an empty schema and may get wrong result for something like `select *` The fix logic is: introduce `UnresolvedCatalogRelation` as a placeholder. Then we replace it with `LogicalRelation` or `HiveTableRelation` during analysis, so that it's guaranteed that we won't resolve parent plans until the schema has been inferred. ## How was this patch tested? regression test Author: Wenchen Fan <[email protected]> Closes apache#18907 from cloud-fan/bug.
1 parent bc99025 commit 14bdb25

File tree

17 files changed

+90
-83
lines changed

17 files changed

+90
-83
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -678,12 +678,7 @@ class SessionCatalog(
678678
child = parser.parsePlan(viewText))
679679
SubqueryAlias(table, child)
680680
} else {
681-
val tableRelation = CatalogRelation(
682-
metadata,
683-
// we assume all the columns are nullable.
684-
metadata.dataSchema.asNullable.toAttributes,
685-
metadata.partitionSchema.asNullable.toAttributes)
686-
SubqueryAlias(table, tableRelation)
681+
SubqueryAlias(table, UnresolvedCatalogRelation(metadata))
687682
}
688683
} else {
689684
SubqueryAlias(table, tempTables(table))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.sql.AnalysisException
2828
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
3030
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
31-
import org.apache.spark.sql.catalyst.plans.QueryPlan
3231
import org.apache.spark.sql.catalyst.plans.logical._
3332
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3433
import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -405,11 +404,22 @@ object CatalogTypes {
405404
type TablePartitionSpec = Map[String, String]
406405
}
407406

407+
/**
408+
* A placeholder for a table relation, which will be replaced by concrete relation like
409+
* `LogicalRelation` or `HiveTableRelation`, during analysis.
410+
*/
411+
case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode {
412+
assert(tableMeta.identifier.database.isDefined)
413+
override lazy val resolved: Boolean = false
414+
override def output: Seq[Attribute] = Nil
415+
}
408416

409417
/**
410-
* A [[LogicalPlan]] that represents a table.
418+
* A `LogicalPlan` that represents a hive table.
419+
*
420+
* TODO: remove this after we completely make hive as a data source.
411421
*/
412-
case class CatalogRelation(
422+
case class HiveTableRelation(
413423
tableMeta: CatalogTable,
414424
dataCols: Seq[AttributeReference],
415425
partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
@@ -423,15 +433,15 @@ case class CatalogRelation(
423433
def isPartitioned: Boolean = partitionCols.nonEmpty
424434

425435
override def equals(relation: Any): Boolean = relation match {
426-
case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
436+
case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
427437
case _ => false
428438
}
429439

430440
override def hashCode(): Int = {
431441
Objects.hashCode(tableMeta.identifier, output)
432442
}
433443

434-
override lazy val canonicalized: LogicalPlan = copy(
444+
override lazy val canonicalized: HiveTableRelation = copy(
435445
tableMeta = tableMeta.copy(
436446
storage = CatalogStorageFormat.empty,
437447
createTime = -1
@@ -444,15 +454,12 @@ case class CatalogRelation(
444454
})
445455

446456
override def computeStats(): Statistics = {
447-
// For data source tables, we will create a `LogicalRelation` and won't call this method, for
448-
// hive serde tables, we will always generate a statistics.
449-
// TODO: unify the table stats generation.
450457
tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
451458
throw new IllegalStateException("table stats must be specified.")
452459
}
453460
}
454461

455-
override def newInstance(): LogicalPlan = copy(
462+
override def newInstance(): HiveTableRelation = copy(
456463
dataCols = dataCols.map(_.newInstance()),
457464
partitionCols = partitionCols.map(_.newInstance()))
458465
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2222
import org.apache.spark.sql.catalyst.analysis._
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
25-
import org.apache.spark.sql.catalyst.plans.PlanTest
2625
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
2726
import org.apache.spark.sql.internal.SQLConf
2827
import org.apache.spark.sql.types._
@@ -518,14 +517,14 @@ abstract class SessionCatalogSuite extends AnalysisTest {
518517
catalog.setCurrentDatabase("db2")
519518
// If we explicitly specify the database, we'll look up the relation in that database
520519
assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
521-
.asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
520+
.asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
522521
// Otherwise, we'll first look up a temporary table with the same name
523522
assert(catalog.lookupRelation(TableIdentifier("tbl1"))
524523
== SubqueryAlias("tbl1", tempTable1))
525524
// Then, if that does not exist, look up the relation in the current database
526525
catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
527526
assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head
528-
.asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
527+
.asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
529528
}
530529
}
531530

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ import scala.collection.JavaConverters._
2424
import org.apache.spark.annotation.InterfaceStability
2525
import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
27-
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
27+
import org.apache.spark.sql.catalyst.catalog._
2828
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
2929
import org.apache.spark.sql.execution.SQLExecution
3030
import org.apache.spark.sql.execution.command.DDLUtils
31-
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
31+
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
3232
import org.apache.spark.sql.sources.BaseRelation
3333
import org.apache.spark.sql.types.StructType
3434

@@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
372372
// Get all input data source or hive relations of the query.
373373
val srcRelations = df.logicalPlan.collect {
374374
case LogicalRelation(src: BaseRelation, _, _) => src
375-
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
376-
relation.tableMeta.identifier
375+
case relation: HiveTableRelation => relation.tableMeta.identifier
377376
}
378377

379378
val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
@@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
383382
throw new AnalysisException(
384383
s"Cannot overwrite table $tableName that is also being read from")
385384
// check hive table relation when overwrite mode
386-
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
387-
&& srcRelations.contains(relation.tableMeta.identifier) =>
385+
case relation: HiveTableRelation
386+
if srcRelations.contains(relation.tableMeta.identifier) =>
388387
throw new AnalysisException(
389388
s"Cannot overwrite table $tableName that is also being read from")
390389
case _ => // OK

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast
3636
import org.apache.spark.rdd.RDD
3737
import org.apache.spark.sql.catalyst._
3838
import org.apache.spark.sql.catalyst.analysis._
39-
import org.apache.spark.sql.catalyst.catalog.CatalogRelation
39+
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
4040
import org.apache.spark.sql.catalyst.encoders._
4141
import org.apache.spark.sql.catalyst.expressions._
4242
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
@@ -2965,7 +2965,7 @@ class Dataset[T] private[sql](
29652965
fsBasedRelation.inputFiles
29662966
case fr: FileRelation =>
29672967
fr.inputFiles
2968-
case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) =>
2968+
case r: HiveTableRelation =>
29692969
r.tableMeta.storage.locationUri.map(_.toString).toArray
29702970
}.flatten
29712971
files.toSet.toArray

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.sql.catalyst.InternalRow
21-
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
21+
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
2222
import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.expressions.aggregate._
2424
import org.apache.spark.sql.catalyst.plans.logical._
@@ -99,7 +99,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
9999
val partitionData = fsRelation.location.listFiles(Nil, Nil)
100100
LocalRelation(partAttrs, partitionData.map(_.values))
101101

102-
case relation: CatalogRelation =>
102+
case relation: HiveTableRelation =>
103103
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
104104
val caseInsensitiveProperties =
105105
CaseInsensitiveMap(relation.tableMeta.storage.properties)
@@ -135,7 +135,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
135135
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
136136
Some((AttributeSet(partAttrs), l))
137137

138-
case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
138+
case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
139139
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
140140
Some((AttributeSet(partAttrs), relation))
141141

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import org.apache.spark.sql._
2525
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName}
2626
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
2727
import org.apache.spark.sql.catalyst.analysis._
28-
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
28+
import org.apache.spark.sql.catalyst.catalog._
2929
import org.apache.spark.sql.catalyst.expressions
3030
import org.apache.spark.sql.catalyst.expressions._
3131
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
3232
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
33-
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
33+
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
3434
import org.apache.spark.sql.catalyst.rules.Rule
3535
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
3636
import org.apache.spark.sql.execution.command._
@@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
207207

208208

209209
/**
210-
* Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
210+
* Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans.
211+
*
212+
* TODO: we should remove the special handling for hive tables after completely making hive as a
213+
* data source.
211214
*/
212215
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
213-
private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
214-
val table = r.tableMeta
216+
private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
215217
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
216-
val catalogProxy = sparkSession.sessionState.catalog
217-
218-
val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
218+
val catalog = sparkSession.sessionState.catalog
219+
catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
219220
override def call(): LogicalPlan = {
220221
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
221222
val dataSource =
@@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
232233

233234
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
234235
}
235-
}).asInstanceOf[LogicalRelation]
236+
})
237+
}
236238

237-
if (r.output.isEmpty) {
238-
// It's possible that the table schema is empty and need to be inferred at runtime. For this
239-
// case, we don't need to change the output of the cached plan.
240-
plan
241-
} else {
242-
plan.copy(output = r.output)
243-
}
239+
private def readHiveTable(table: CatalogTable): LogicalPlan = {
240+
HiveTableRelation(
241+
table,
242+
// Hive table columns are always nullable.
243+
table.dataSchema.asNullable.toAttributes,
244+
table.partitionSchema.asNullable.toAttributes)
244245
}
245246

246247
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
247-
case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
248-
if DDLUtils.isDatasourceTable(r.tableMeta) =>
249-
i.copy(table = readDataSourceTable(r))
248+
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
249+
if DDLUtils.isDatasourceTable(tableMeta) =>
250+
i.copy(table = readDataSourceTable(tableMeta))
251+
252+
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
253+
i.copy(table = readHiveTable(tableMeta))
254+
255+
case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
256+
readDataSourceTable(tableMeta)
250257

251-
case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
252-
readDataSourceTable(r)
258+
case UnresolvedCatalogRelation(tableMeta) =>
259+
readHiveTable(tableMeta)
253260
}
254261
}
255262

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
382382
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
383383
case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
384384
table match {
385-
case relation: CatalogRelation =>
385+
case relation: HiveTableRelation =>
386386
val metadata = relation.tableMeta
387387
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
388388
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
@@ -427,7 +427,7 @@ object PreReadCheck extends (LogicalPlan => Unit) {
427427

428428
private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = {
429429
operator match {
430-
case _: CatalogRelation => 1
430+
case _: HiveTableRelation => 1
431431
case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1
432432
case _: LeafNode => 0
433433
// UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable
2424
import scala.util.Random
2525

2626
import org.apache.spark.sql.catalyst.TableIdentifier
27-
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
27+
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation}
2828
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
2929
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3030
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -171,7 +171,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
171171
// Analyze only one column.
172172
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
173173
val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
174-
case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
174+
case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta)
175175
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
176176
}.head
177177
val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
112112
}
113113

114114
def convertToLogicalRelation(
115-
relation: CatalogRelation,
115+
relation: HiveTableRelation,
116116
options: Map[String, String],
117117
fileFormatClass: Class[_ <: FileFormat],
118118
fileType: String): LogicalRelation = {
@@ -210,7 +210,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
210210
logicalRelation
211211
})
212212
}
213-
// The inferred schema may have different filed names as the table schema, we should respect
213+
// The inferred schema may have different field names as the table schema, we should respect
214214
// it, but also respect the exprId in table relation output.
215215
assert(result.output.length == relation.output.length &&
216216
result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
@@ -221,7 +221,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
221221
}
222222

223223
private def inferIfNeeded(
224-
relation: CatalogRelation,
224+
relation: HiveTableRelation,
225225
options: Map[String, String],
226226
fileFormat: FileFormat,
227227
fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {

0 commit comments

Comments
 (0)