Skip to content

Commit ec821b4

Browse files
rdbluecloud-fan
authored andcommitted
[SPARK-27919][SQL] Add v2 session catalog
## What changes were proposed in this pull request? This fixes a problem where it is possible to create a v2 table using the default catalog that cannot be loaded with the session catalog. A session catalog should be used when the v1 catalog is responsible for tables with no catalog in the table identifier. * Adds a v2 catalog implementation that delegates to the analyzer's SessionCatalog * Uses the v2 session catalog for CTAS and CreateTable when the provider is a v2 provider and no v2 catalog is in the table identifier * Updates catalog lookup to always provide the default if it is set for consistent behavior ## How was this patch tested? * Adds a new test suite for the v2 session catalog that validates the TableCatalog API * Adds test cases in PlanResolutionSuite to validate the v2 session catalog is used * Adds test suite for LookupCatalog with a default catalog Closes apache#24768 from rdblue/SPARK-27919-add-v2-session-catalog. Authored-by: Ryan Blue <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent d26642d commit ec821b4

File tree

13 files changed

+1353
-87
lines changed

13 files changed

+1353
-87
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala

Lines changed: 88 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,54 +17,127 @@
1717

1818
package org.apache.spark.sql.catalog.v2
1919

20+
import scala.util.control.NonFatal
21+
2022
import org.apache.spark.annotation.Experimental
23+
import org.apache.spark.internal.Logging
2124
import org.apache.spark.sql.catalyst.TableIdentifier
2225

2326
/**
2427
* A trait to encapsulate catalog lookup function and helpful extractors.
2528
*/
2629
@Experimental
27-
trait LookupCatalog {
30+
trait LookupCatalog extends Logging {
31+
32+
import LookupCatalog._
2833

34+
protected def defaultCatalogName: Option[String] = None
2935
protected def lookupCatalog(name: String): CatalogPlugin
3036

31-
type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
37+
/**
38+
* Returns the default catalog. When set, this catalog is used for all identifiers that do not
39+
* set a specific catalog. When this is None, the session catalog is responsible for the
40+
* identifier.
41+
*
42+
* If this is None and a table's provider (source) is a v2 provider, the v2 session catalog will
43+
* be used.
44+
*/
45+
def defaultCatalog: Option[CatalogPlugin] = {
46+
try {
47+
defaultCatalogName.map(lookupCatalog)
48+
} catch {
49+
case NonFatal(e) =>
50+
logError(s"Cannot load default v2 catalog: ${defaultCatalogName.get}", e)
51+
None
52+
}
53+
}
3254

3355
/**
34-
* Extract catalog plugin and identifier from a multi-part identifier.
56+
* This catalog is a v2 catalog that delegates to the v1 session catalog. it is used when the
57+
* session catalog is responsible for an identifier, but the source requires the v2 catalog API.
58+
* This happens when the source implementation extends the v2 TableProvider API and is not listed
59+
* in the fallback configuration, spark.sql.sources.write.useV1SourceList
3560
*/
36-
object CatalogObjectIdentifier {
37-
def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match {
38-
case Seq(name) =>
39-
Some((None, Identifier.of(Array.empty, name)))
61+
def sessionCatalog: Option[CatalogPlugin] = {
62+
try {
63+
Some(lookupCatalog(SESSION_CATALOG_NAME))
64+
} catch {
65+
case NonFatal(e) =>
66+
logError("Cannot load v2 session catalog", e)
67+
None
68+
}
69+
}
70+
71+
/**
72+
* Extract catalog plugin and remaining identifier names.
73+
*
74+
* This does not substitute the default catalog if no catalog is set in the identifier.
75+
*/
76+
private object CatalogAndIdentifier {
77+
def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {
78+
case Seq(_) =>
79+
Some((None, parts))
4080
case Seq(catalogName, tail @ _*) =>
4181
try {
42-
Some((Some(lookupCatalog(catalogName)), Identifier.of(tail.init.toArray, tail.last)))
82+
Some((Some(lookupCatalog(catalogName)), tail))
4383
} catch {
4484
case _: CatalogNotFoundException =>
45-
Some((None, Identifier.of(parts.init.toArray, parts.last)))
85+
Some((None, parts))
4686
}
4787
}
4888
}
4989

90+
type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
91+
92+
/**
93+
* Extract catalog and identifier from a multi-part identifier with the default catalog if needed.
94+
*/
95+
object CatalogObjectIdentifier {
96+
def unapply(parts: Seq[String]): Some[CatalogObjectIdentifier] = parts match {
97+
case CatalogAndIdentifier(maybeCatalog, nameParts) =>
98+
Some((
99+
maybeCatalog.orElse(defaultCatalog),
100+
Identifier.of(nameParts.init.toArray, nameParts.last)
101+
))
102+
}
103+
}
104+
50105
/**
51106
* Extract legacy table identifier from a multi-part identifier.
52107
*
53108
* For legacy support only. Please use [[CatalogObjectIdentifier]] instead on DSv2 code paths.
54109
*/
55110
object AsTableIdentifier {
56111
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
57-
case CatalogObjectIdentifier(None, ident) =>
58-
ident.namespace match {
59-
case Array() =>
60-
Some(TableIdentifier(ident.name))
61-
case Array(database) =>
62-
Some(TableIdentifier(ident.name, Some(database)))
112+
case CatalogAndIdentifier(None, names) if defaultCatalog.isEmpty =>
113+
names match {
114+
case Seq(name) =>
115+
Some(TableIdentifier(name))
116+
case Seq(database, name) =>
117+
Some(TableIdentifier(name, Some(database)))
63118
case _ =>
64119
None
65120
}
66121
case _ =>
67122
None
68123
}
69124
}
125+
126+
/**
127+
* For temp views, extract a table identifier from a multi-part identifier if it has no catalog.
128+
*/
129+
object AsTemporaryViewIdentifier {
130+
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
131+
case CatalogAndIdentifier(None, Seq(table)) =>
132+
Some(TableIdentifier(table))
133+
case CatalogAndIdentifier(None, Seq(database, table)) =>
134+
Some(TableIdentifier(table, Some(database)))
135+
case _ =>
136+
None
137+
}
138+
}
139+
}
140+
141+
object LookupCatalog {
142+
val SESSION_CATALOG_NAME: String = "session"
70143
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ class Analyzer(
104104
this(catalog, conf, conf.optimizerMaxIterations)
105105
}
106106

107+
override protected def defaultCatalogName: Option[String] = conf.defaultV2Catalog
108+
107109
override protected def lookupCatalog(name: String): CatalogPlugin =
108110
throw new CatalogNotFoundException("No catalog lookup function")
109111

@@ -667,6 +669,10 @@ class Analyzer(
667669
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._
668670

669671
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
672+
case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident))
673+
if catalog.isTemporaryTable(ident) =>
674+
u // temporary views take precedence over catalog table names
675+
670676
case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) =>
671677
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
672678
}
@@ -704,6 +710,10 @@ class Analyzer(
704710
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
705711
// have empty defaultDatabase and all the relations in viewText have database part defined.
706712
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
713+
case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident))
714+
if catalog.isTemporaryTable(ident) =>
715+
resolveRelation(lookupTableFromCatalog(ident, u, AnalysisContext.get.defaultDatabase))
716+
707717
case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) =>
708718
val defaultDatabase = AnalysisContext.get.defaultDatabase
709719
val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ object UpdateAttributeNullability extends Rule[LogicalPlan] {
3737
case p if !p.resolved => p
3838
// Skip leaf node, as it has no child and no need to update nullability.
3939
case p: LeafNode => p
40-
case p: LogicalPlan =>
40+
case p: LogicalPlan if p.childrenResolved =>
4141
val nullabilities = p.children.flatMap(c => c.output).groupBy(_.exprId).map {
4242
// If there are multiple Attributes having the same ExprId, we need to resolve
4343
// the conflict of nullable field. We do not really expect this to happen.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ case class CreateTableAsSelect(
432432

433433
override def children: Seq[LogicalPlan] = Seq(query)
434434

435-
override lazy val resolved: Boolean = {
435+
override lazy val resolved: Boolean = childrenResolved && {
436436
// the table schema is created from the query schema, so the only resolution needed is to check
437437
// that the columns referenced by the table's partitioning exist in the query schema
438438
val references = partitioning.flatMap(_.references).toSet

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,6 +1833,11 @@ object SQLConf {
18331833
.stringConf
18341834
.createOptional
18351835

1836+
val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session")
1837+
.doc("Name of the default v2 catalog, used when a catalog is not identified in queries")
1838+
.stringConf
1839+
.createWithDefault("org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog")
1840+
18361841
val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.looseUpcast")
18371842
.doc("When true, the upcast will be loose and allows string to atomic types.")
18381843
.booleanConf

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/v2/LookupCatalogSuite.scala

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,111 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
8585
}
8686
}
8787
}
88+
89+
test("temporary table identifier") {
90+
Seq(
91+
("tbl", TableIdentifier("tbl")),
92+
("db.tbl", TableIdentifier("tbl", Some("db"))),
93+
("`db.tbl`", TableIdentifier("db.tbl")),
94+
("parquet.`file:/tmp/db.tbl`", TableIdentifier("file:/tmp/db.tbl", Some("parquet"))),
95+
("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`",
96+
TableIdentifier("s3://buck/tmp/abc.json", Some("org.apache.spark.sql.json")))).foreach {
97+
case (sqlIdent: String, expectedTableIdent: TableIdentifier) =>
98+
// when there is no catalog and the namespace has one part, the rule should match
99+
inside(parseMultipartIdentifier(sqlIdent)) {
100+
case AsTemporaryViewIdentifier(ident) =>
101+
ident shouldEqual expectedTableIdent
102+
}
103+
}
104+
105+
Seq("prod.func", "prod.db.tbl", "test.db.tbl", "ns1.ns2.tbl", "test.ns1.ns2.ns3.tbl")
106+
.foreach { sqlIdent =>
107+
inside(parseMultipartIdentifier(sqlIdent)) {
108+
case AsTemporaryViewIdentifier(_) =>
109+
fail("AsTemporaryViewIdentifier should not match when " +
110+
"the catalog is set or the namespace has multiple parts")
111+
case _ =>
112+
// expected
113+
}
114+
}
115+
}
116+
}
117+
118+
class LookupCatalogWithDefaultSuite extends SparkFunSuite with LookupCatalog with Inside {
119+
import CatalystSqlParser._
120+
121+
private val catalogs = Seq("prod", "test").map(x => x -> new TestCatalogPlugin(x)).toMap
122+
123+
override def defaultCatalogName: Option[String] = Some("prod")
124+
125+
override def lookupCatalog(name: String): CatalogPlugin =
126+
catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))
127+
128+
test("catalog object identifier") {
129+
Seq(
130+
("tbl", catalogs.get("prod"), Seq.empty, "tbl"),
131+
("db.tbl", catalogs.get("prod"), Seq("db"), "tbl"),
132+
("prod.func", catalogs.get("prod"), Seq.empty, "func"),
133+
("ns1.ns2.tbl", catalogs.get("prod"), Seq("ns1", "ns2"), "tbl"),
134+
("prod.db.tbl", catalogs.get("prod"), Seq("db"), "tbl"),
135+
("test.db.tbl", catalogs.get("test"), Seq("db"), "tbl"),
136+
("test.ns1.ns2.ns3.tbl", catalogs.get("test"), Seq("ns1", "ns2", "ns3"), "tbl"),
137+
("`db.tbl`", catalogs.get("prod"), Seq.empty, "db.tbl"),
138+
("parquet.`file:/tmp/db.tbl`", catalogs.get("prod"), Seq("parquet"), "file:/tmp/db.tbl"),
139+
("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", catalogs.get("prod"),
140+
Seq("org.apache.spark.sql.json"), "s3://buck/tmp/abc.json")).foreach {
141+
case (sql, expectedCatalog, namespace, name) =>
142+
inside(parseMultipartIdentifier(sql)) {
143+
case CatalogObjectIdentifier(catalog, ident) =>
144+
catalog shouldEqual expectedCatalog
145+
ident shouldEqual Identifier.of(namespace.toArray, name)
146+
}
147+
}
148+
}
149+
150+
test("table identifier") {
151+
Seq(
152+
"tbl",
153+
"db.tbl",
154+
"`db.tbl`",
155+
"parquet.`file:/tmp/db.tbl`",
156+
"`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`",
157+
"prod.func",
158+
"prod.db.tbl",
159+
"ns1.ns2.tbl").foreach { sql =>
160+
parseMultipartIdentifier(sql) match {
161+
case AsTableIdentifier(_) =>
162+
fail(s"$sql should not be resolved as TableIdentifier")
163+
case _ =>
164+
}
165+
}
166+
}
167+
168+
test("temporary table identifier") {
169+
Seq(
170+
("tbl", TableIdentifier("tbl")),
171+
("db.tbl", TableIdentifier("tbl", Some("db"))),
172+
("`db.tbl`", TableIdentifier("db.tbl")),
173+
("parquet.`file:/tmp/db.tbl`", TableIdentifier("file:/tmp/db.tbl", Some("parquet"))),
174+
("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`",
175+
TableIdentifier("s3://buck/tmp/abc.json", Some("org.apache.spark.sql.json")))).foreach {
176+
case (sqlIdent: String, expectedTableIdent: TableIdentifier) =>
177+
// when there is no catalog and the namespace has one part, the rule should match
178+
inside(parseMultipartIdentifier(sqlIdent)) {
179+
case AsTemporaryViewIdentifier(ident) =>
180+
ident shouldEqual expectedTableIdent
181+
}
182+
}
183+
184+
Seq("prod.func", "prod.db.tbl", "test.db.tbl", "ns1.ns2.tbl", "test.ns1.ns2.ns3.tbl")
185+
.foreach { sqlIdent =>
186+
inside(parseMultipartIdentifier(sqlIdent)) {
187+
case AsTemporaryViewIdentifier(_) =>
188+
fail("AsTemporaryViewIdentifier should not match when " +
189+
"the catalog is set or the namespace has multiple parts")
190+
case _ =>
191+
// expected
192+
}
193+
}
194+
}
88195
}

0 commit comments

Comments
 (0)