Skip to content

Commit b86d4bb

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-30001][SQL] ResolveRelations should handle both V1 and V2 tables
### What changes were proposed in this pull request? This PR makes `Analyzer.ResolveRelations` responsible for looking up both v1 and v2 tables from the session catalog and create an appropriate relation. ### Why are the changes needed? Currently there are two issues: 1. As described in [SPARK-29966](https://issues.apache.org/jira/browse/SPARK-29966), the logic for resolving relation can load a table twice, which is a perf regression (e.g., Hive metastore can be accessed twice). 2. As described in [SPARK-30001](https://issues.apache.org/jira/browse/SPARK-30001), if a catalog name is specified for v1 tables, the query fails: ``` scala> sql("create table t using csv as select 1 as i") res2: org.apache.spark.sql.DataFrame = [] scala> sql("select * from t").show +---+ | i| +---+ | 1| +---+ scala> sql("select * from spark_catalog.t").show org.apache.spark.sql.AnalysisException: Table or view not found: spark_catalog.t; line 1 pos 14; 'Project [*] +- 'UnresolvedRelation [spark_catalog, t] ``` ### Does this PR introduce any user-facing change? Yes. Now the catalog name is resolved correctly: ``` scala> sql("create table t using csv as select 1 as i") res0: org.apache.spark.sql.DataFrame = [] scala> sql("select * from t").show +---+ | i| +---+ | 1| +---+ scala> sql("select * from spark_catalog.t").show +---+ | i| +---+ | 1| +---+ ``` ### How was this patch tested? Added new tests. Closes apache#26684 from imback82/resolve_relation. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a5ccbce commit b86d4bb

File tree

5 files changed

+143
-97
lines changed

5 files changed

+143
-97
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 85 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ class Analyzer(
199199
new ResolveCatalogs(catalogManager) ::
200200
ResolveInsertInto ::
201201
ResolveRelations ::
202+
ResolveTables ::
202203
ResolveReferences ::
203204
ResolveCreateNamedStruct ::
204205
ResolveDeserializer ::
@@ -725,21 +726,29 @@ class Analyzer(
725726
}
726727

727728
/**
728-
* Resolve relations to temp views. This is not an actual rule, and is only called by
729-
* [[ResolveTables]].
729+
* Resolve relations to temp views. This is not an actual rule, and is called by
730+
* [[ResolveTables]] and [[ResolveRelations]].
730731
*/
731732
object ResolveTempViews extends Rule[LogicalPlan] {
732733
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
733-
case u @ UnresolvedRelation(Seq(part1)) =>
734-
v1SessionCatalog.lookupTempView(part1).getOrElse(u)
735-
case u @ UnresolvedRelation(Seq(part1, part2)) =>
736-
v1SessionCatalog.lookupGlobalTempView(part1, part2).getOrElse(u)
734+
case u @ UnresolvedRelation(ident) =>
735+
lookupTempView(ident).getOrElse(u)
736+
case i @ InsertIntoStatement(UnresolvedRelation(ident), _, _, _, _) =>
737+
lookupTempView(ident)
738+
.map(view => i.copy(table = view))
739+
.getOrElse(i)
737740
}
741+
742+
def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] =
743+
identifier match {
744+
case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
745+
case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)
746+
case _ => None
747+
}
738748
}
739749

740750
/**
741-
* Resolve table relations with concrete relations from v2 catalog. This is not an actual rule,
742-
* and is only called by [[ResolveRelations]].
751+
* Resolve table relations with concrete relations from v2 catalog.
743752
*
744753
* [[ResolveRelations]] still resolves v1 tables.
745754
*/
@@ -772,16 +781,29 @@ class Analyzer(
772781
case u: UnresolvedV2Relation =>
773782
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
774783
}
784+
785+
/**
786+
* Performs the lookup of DataSourceV2 Tables from v2 catalog.
787+
*/
788+
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
789+
identifier match {
790+
case NonSessionCatalogAndIdentifier(catalog, ident) =>
791+
CatalogV2Util.loadTable(catalog, ident) match {
792+
case Some(table) => Some(DataSourceV2Relation.create(table))
793+
case None => None
794+
}
795+
case _ => None
796+
}
775797
}
776798

777799
/**
778800
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
779801
*/
780802
object ResolveRelations extends Rule[LogicalPlan] {
781803

782-
// If the unresolved relation is running directly on files, we just return the original
783-
// UnresolvedRelation, the plan will get resolved later. Else we look up the table from catalog
784-
// and change the default database name(in AnalysisContext) if it is a view.
804+
// If an unresolved relation is given, it is looked up from the session catalog and either v1
805+
// or v2 relation is returned. Otherwise, we look up the table from catalog
806+
// and change the default database name (in AnalysisContext) if it is a view.
785807
// We usually look up a table from the default database if the table identifier has an empty
786808
// database part, for a view the default database should be the currentDb when the view was
787809
// created. When the case comes to resolving a nested view, the view may have different default
@@ -806,14 +828,8 @@ class Analyzer(
806828
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
807829
// have empty defaultDatabase and all the relations in viewText have database part defined.
808830
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
809-
case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) =>
810-
val defaultDatabase = AnalysisContext.get.defaultDatabase
811-
val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase)
812-
if (foundRelation != u) {
813-
resolveRelation(foundRelation)
814-
} else {
815-
u
816-
}
831+
case u @ UnresolvedRelation(SessionCatalogAndIdentifier(catalog, ident)) =>
832+
lookupRelation(catalog, ident, recurse = true).getOrElse(u)
817833

818834
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
819835
// `viewText` should be defined, or else we throw an error on the generation of the View
@@ -836,47 +852,65 @@ class Analyzer(
836852
case _ => plan
837853
}
838854

839-
def apply(plan: LogicalPlan): LogicalPlan = ResolveTables(plan).resolveOperatorsUp {
840-
case i @ InsertIntoStatement(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _)
841-
if child.resolved =>
842-
EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match {
855+
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
856+
case i @ InsertIntoStatement(table, _, _, _, _) if i.query.resolved =>
857+
val relation = table match {
858+
case u @ UnresolvedRelation(SessionCatalogAndIdentifier(catalog, ident)) =>
859+
lookupRelation(catalog, ident, recurse = false).getOrElse(u)
860+
case other => other
861+
}
862+
863+
EliminateSubqueryAliases(relation) match {
843864
case v: View =>
844-
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
865+
table.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
845866
case other => i.copy(table = other)
846867
}
868+
847869
case u: UnresolvedRelation => resolveRelation(u)
848870
}
849871

850-
// Look up the table with the given name from catalog. The database we used is decided by the
851-
// precedence:
852-
// 1. Use the database part of the table identifier, if it is defined;
853-
// 2. Use defaultDatabase, if it is defined(In this case, no temporary objects can be used,
854-
// and the default database is only used to look up a view);
855-
// 3. Use the currentDb of the SessionCatalog.
856-
private def lookupTableFromCatalog(
857-
tableIdentifier: TableIdentifier,
858-
u: UnresolvedRelation,
859-
defaultDatabase: Option[String] = None): LogicalPlan = {
860-
val tableIdentWithDb = tableIdentifier.copy(
861-
database = tableIdentifier.database.orElse(defaultDatabase))
862-
try {
863-
v1SessionCatalog.lookupRelation(tableIdentWithDb)
864-
} catch {
865-
case _: NoSuchTableException | _: NoSuchDatabaseException =>
866-
u
872+
// Look up a relation from the given session catalog with the following logic:
873+
// 1) If a relation is not found in the catalog, return None.
874+
// 2) If a v1 table is found, create a v1 relation. Otherwise, create a v2 relation.
875+
// If recurse is set to true, it will call `resolveRelation` recursively to resolve
876+
// relations with the correct database scope.
877+
private def lookupRelation(
878+
catalog: CatalogPlugin,
879+
ident: Identifier,
880+
recurse: Boolean): Option[LogicalPlan] = {
881+
val newIdent = withNewNamespace(ident)
882+
assert(newIdent.namespace.size == 1)
883+
884+
CatalogV2Util.loadTable(catalog, newIdent) match {
885+
case Some(v1Table: V1Table) =>
886+
val tableIdent = TableIdentifier(newIdent.name, newIdent.namespace.headOption)
887+
val relation = v1SessionCatalog.getRelation(v1Table.v1Table)
888+
if (recurse) {
889+
Some(resolveRelation(relation))
890+
} else {
891+
Some(relation)
892+
}
893+
case Some(table) =>
894+
Some(DataSourceV2Relation.create(table))
895+
case None => None
867896
}
868897
}
869898

870-
// If the database part is specified, and we support running SQL directly on files, and
871-
// it's not a temporary view, and the table does not exist, then let's just return the
872-
// original UnresolvedRelation. It is possible we are matching a query like "select *
873-
// from parquet.`/path/to/query`". The plan will get resolved in the rule `ResolveDataSource`.
874-
// Note that we are testing (!db_exists || !table_exists) because the catalog throws
875-
// an exception from tableExists if the database does not exist.
876-
private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean = {
877-
table.database.isDefined && conf.runSQLonFile && !v1SessionCatalog.isTemporaryTable(table) &&
878-
(!v1SessionCatalog.databaseExists(table.database.get)
879-
|| !v1SessionCatalog.tableExists(table))
899+
// The namespace used for lookup is decided by the following precedence:
900+
// 1. Use the existing namespace if it is defined.
901+
// 2. Use defaultDatabase fom AnalysisContext, if it is defined. In this case, no temporary
902+
// objects can be used, and the default database is only used to look up a view.
903+
// 3. Use the current namespace of the session catalog.
904+
private def withNewNamespace(ident: Identifier): Identifier = {
905+
if (ident.namespace.nonEmpty) {
906+
ident
907+
} else {
908+
val defaultNamespace = AnalysisContext.get.defaultDatabase match {
909+
case Some(db) => Array(db)
910+
case None => Array(v1SessionCatalog.getCurrentDatabase)
911+
}
912+
Identifier.of(defaultNamespace, ident.name)
913+
}
880914
}
881915
}
882916

@@ -2898,37 +2932,6 @@ class Analyzer(
28982932
}
28992933
}
29002934
}
2901-
2902-
/**
2903-
* Performs the lookup of DataSourceV2 Tables. The order of resolution is:
2904-
* 1. Check if this relation is a temporary table.
2905-
* 2. Check if it has a catalog identifier. Here we try to load the table.
2906-
* If we find the table, return the v2 relation and catalog.
2907-
* 3. Try resolving the relation using the V2SessionCatalog if that is defined.
2908-
* If the V2SessionCatalog returns a V1 table definition,
2909-
* return `None` so that we can fallback to the V1 code paths.
2910-
* If the V2SessionCatalog returns a V2 table, return the v2 relation and V2SessionCatalog.
2911-
*/
2912-
private def lookupV2RelationAndCatalog(
2913-
identifier: Seq[String]): Option[(DataSourceV2Relation, CatalogPlugin, Identifier)] =
2914-
identifier match {
2915-
case CatalogObjectIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) =>
2916-
CatalogV2Util.loadTable(catalog, ident) match {
2917-
case Some(table) => Some((DataSourceV2Relation.create(table), catalog, ident))
2918-
case None => None
2919-
}
2920-
case CatalogObjectIdentifier(catalog, ident) if CatalogV2Util.isSessionCatalog(catalog) =>
2921-
CatalogV2Util.loadTable(catalog, ident) match {
2922-
case Some(_: V1Table) => None
2923-
case Some(table) =>
2924-
Some((DataSourceV2Relation.create(table), catalog, ident))
2925-
case None => None
2926-
}
2927-
case _ => None
2928-
}
2929-
2930-
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
2931-
lookupV2RelationAndCatalog(identifier).map(_._1)
29322935
}
29332936

29342937
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -747,26 +747,34 @@ class SessionCatalog(
747747
}.getOrElse(throw new NoSuchTableException(db, table))
748748
} else if (name.database.isDefined || !tempViews.contains(table)) {
749749
val metadata = externalCatalog.getTable(db, table)
750-
if (metadata.tableType == CatalogTableType.VIEW) {
751-
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
752-
logDebug(s"'$viewText' will be used for the view($table).")
753-
// The relation is a view, so we wrap the relation by:
754-
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
755-
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
756-
val child = View(
757-
desc = metadata,
758-
output = metadata.schema.toAttributes,
759-
child = parser.parsePlan(viewText))
760-
SubqueryAlias(table, db, child)
761-
} else {
762-
SubqueryAlias(table, db, UnresolvedCatalogRelation(metadata))
763-
}
750+
getRelation(metadata)
764751
} else {
765752
SubqueryAlias(table, tempViews(table))
766753
}
767754
}
768755
}
769756

757+
def getRelation(metadata: CatalogTable): LogicalPlan = {
758+
val name = metadata.identifier
759+
val db = formatDatabaseName(name.database.getOrElse(currentDb))
760+
val table = formatTableName(name.table)
761+
762+
if (metadata.tableType == CatalogTableType.VIEW) {
763+
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
764+
logDebug(s"'$viewText' will be used for the view($table).")
765+
// The relation is a view, so we wrap the relation by:
766+
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
767+
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
768+
val child = View(
769+
desc = metadata,
770+
output = metadata.schema.toAttributes,
771+
child = parser.parsePlan(viewText))
772+
SubqueryAlias(table, db, child)
773+
} else {
774+
SubqueryAlias(table, db, UnresolvedCatalogRelation(metadata))
775+
}
776+
}
777+
770778
def lookupTempView(table: String): Option[SubqueryAlias] = {
771779
val formattedTable = formatTableName(table)
772780
getTempView(formattedTable).map { view =>

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,28 @@ private[sql] trait LookupCatalog extends Logging {
6565
}
6666
}
6767

68+
/**
69+
* Extract session catalog and identifier from a multi-part identifier.
70+
*/
71+
object SessionCatalogAndIdentifier {
72+
def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = parts match {
73+
case CatalogObjectIdentifier(catalog, ident) if CatalogV2Util.isSessionCatalog(catalog) =>
74+
Some(catalog, ident)
75+
case _ => None
76+
}
77+
}
78+
79+
/**
80+
* Extract non-session catalog and identifier from a multi-part identifier.
81+
*/
82+
object NonSessionCatalogAndIdentifier {
83+
def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = parts match {
84+
case CatalogObjectIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) =>
85+
Some(catalog, ident)
86+
case _ => None
87+
}
88+
}
89+
6890
/**
6991
* Extract catalog and namespace from a multi-part identifier with the current catalog if needed.
7092
* Catalog name takes precedence over namespaces.

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,6 +1813,18 @@ class DataSourceV2SQLSuite
18131813
}
18141814
}
18151815

1816+
test("SPARK-30001: session catalog name can be specified in SQL statements") {
1817+
// unset this config to use the default v2 session catalog.
1818+
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
1819+
1820+
withTable("t") {
1821+
sql("CREATE TABLE t USING json AS SELECT 1 AS i")
1822+
checkAnswer(sql("select * from t"), Row(1))
1823+
checkAnswer(sql("select * from spark_catalog.t"), Row(1))
1824+
checkAnswer(sql("select * from spark_catalog.default.t"), Row(1))
1825+
}
1826+
}
1827+
18161828
private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
18171829
val e = intercept[AnalysisException] {
18181830
sql(s"$sqlCommand $sqlParams")

sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ class PlanResolutionSuite extends AnalysisTest {
141141
CTESubstitution,
142142
new ResolveCatalogs(catalogManager),
143143
new ResolveSessionCatalog(catalogManager, conf, _ == Seq("v")),
144-
analyzer.ResolveTables)
144+
analyzer.ResolveTables,
145+
analyzer.ResolveRelations)
145146
rules.foldLeft(parsePlan(query)) {
146147
case (plan, rule) => rule.apply(plan)
147148
}

0 commit comments

Comments
 (0)