Skip to content

Commit 80565ce

Browse files
jzhugecloud-fan
authored andcommitted
[SPARK-26946][SQL] Identifiers for multi-catalog
## What changes were proposed in this pull request? - Support N-part identifier in SQL - N-part identifier extractor in Analyzer ## How was this patch tested? - A new unit test suite ResolveMultipartRelationSuite - CatalogLoadingSuite rblue cloud-fan mccheah Closes apache#23848 from jzhuge/SPARK-26946. Authored-by: John Zhuge <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 0f4f816 commit 80565ce

File tree

13 files changed

+340
-6
lines changed

13 files changed

+340
-6
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ singleTableIdentifier
6363
: tableIdentifier EOF
6464
;
6565

66+
singleMultipartIdentifier
67+
: multipartIdentifier EOF
68+
;
69+
6670
singleFunctionIdentifier
6771
: functionIdentifier EOF
6872
;
@@ -554,6 +558,10 @@ rowFormat
554558
(NULL DEFINED AS nullDefinedAs=STRING)? #rowFormatDelimited
555559
;
556560

561+
multipartIdentifier
562+
: parts+=identifier ('.' parts+=identifier)*
563+
;
564+
557565
tableIdentifier
558566
: (db=identifier '.')? table=identifier
559567
;

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@ private Catalogs() {
4444
* @param name a String catalog name
4545
* @param conf a SQLConf
4646
* @return an initialized CatalogPlugin
47-
* @throws SparkException If the plugin class cannot be found or instantiated
47+
* @throws CatalogNotFoundException if the plugin class cannot be found
48+
* @throws SparkException if the plugin class cannot be instantiated
4849
*/
49-
public static CatalogPlugin load(String name, SQLConf conf) throws SparkException {
50+
public static CatalogPlugin load(String name, SQLConf conf)
51+
throws CatalogNotFoundException, SparkException {
5052
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
5153
if (pluginClassName == null) {
52-
throw new SparkException(String.format(
54+
throw new CatalogNotFoundException(String.format(
5355
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
5456
}
5557

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* Identifies an object in a catalog.
24+
*/
25+
@Experimental
26+
public interface Identifier {
27+
28+
static Identifier of(String[] namespace, String name) {
29+
return new IdentifierImpl(namespace, name);
30+
}
31+
32+
/**
33+
* @return the namespace in the catalog
34+
*/
35+
String[] namespace();
36+
37+
/**
38+
* @return the object name
39+
*/
40+
String name();
41+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* An {@link Identifier} implementation.
24+
*/
25+
@Experimental
26+
class IdentifierImpl implements Identifier {
27+
28+
private String[] namespace;
29+
private String name;
30+
31+
IdentifierImpl(String[] namespace, String name) {
32+
this.namespace = namespace;
33+
this.name = name;
34+
}
35+
36+
@Override
37+
public String[] namespace() {
38+
return namespace;
39+
}
40+
41+
@Override
42+
public String name() {
43+
return name;
44+
}
45+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2
19+
20+
import org.apache.spark.SparkException
21+
import org.apache.spark.annotation.Experimental
22+
23+
@Experimental
24+
class CatalogNotFoundException(message: String, cause: Throwable)
25+
extends SparkException(message, cause) {
26+
27+
def this(message: String) = this(message, null)
28+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2
19+
20+
import org.apache.spark.annotation.Experimental
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
23+
/**
24+
* A trait to encapsulate catalog lookup function and helpful extractors.
25+
*/
26+
@Experimental
27+
trait LookupCatalog {
28+
29+
def lookupCatalog: Option[(String) => CatalogPlugin] = None
30+
31+
type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
32+
33+
/**
34+
* Extract catalog plugin and identifier from a multi-part identifier.
35+
*/
36+
object CatalogObjectIdentifier {
37+
def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup =>
38+
parts match {
39+
case Seq(name) =>
40+
(None, Identifier.of(Array.empty, name))
41+
case Seq(catalogName, tail @ _*) =>
42+
try {
43+
val catalog = lookup(catalogName)
44+
(Some(catalog), Identifier.of(tail.init.toArray, tail.last))
45+
} catch {
46+
case _: CatalogNotFoundException =>
47+
(None, Identifier.of(parts.init.toArray, parts.last))
48+
}
49+
}
50+
}
51+
}
52+
53+
/**
54+
* Extract legacy table identifier from a multi-part identifier.
55+
*
56+
* For legacy support only. Please use
57+
* [[org.apache.spark.sql.catalog.v2.LookupCatalog.CatalogObjectIdentifier]] in DSv2 code paths.
58+
*/
59+
object AsTableIdentifier {
60+
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
61+
case CatalogObjectIdentifier(None, ident) =>
62+
ident.namespace match {
63+
case Array() =>
64+
Some(TableIdentifier(ident.name))
65+
case Array(database) =>
66+
Some(TableIdentifier(ident.name, Some(database)))
67+
case _ =>
68+
None
69+
}
70+
case _ =>
71+
None
72+
}
73+
}
74+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424
import scala.util.Random
2525

2626
import org.apache.spark.sql.AnalysisException
27+
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, LookupCatalog}
2728
import org.apache.spark.sql.catalyst._
2829
import org.apache.spark.sql.catalyst.catalog._
2930
import org.apache.spark.sql.catalyst.encoders.OuterScopes
@@ -95,13 +96,19 @@ object AnalysisContext {
9596
class Analyzer(
9697
catalog: SessionCatalog,
9798
conf: SQLConf,
98-
maxIterations: Int)
99-
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
99+
maxIterations: Int,
100+
override val lookupCatalog: Option[(String) => CatalogPlugin] = None)
101+
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {
100102

101103
def this(catalog: SessionCatalog, conf: SQLConf) = {
102104
this(catalog, conf, conf.optimizerMaxIterations)
103105
}
104106

107+
def this(lookupCatalog: Option[(String) => CatalogPlugin], catalog: SessionCatalog,
108+
conf: SQLConf) = {
109+
this(catalog, conf, conf.optimizerMaxIterations, lookupCatalog)
110+
}
111+
105112
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
106113
AnalysisHelper.markInAnalyzer {
107114
val analyzed = executeAndTrack(plan, tracker)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
8686
visitFunctionIdentifier(ctx.functionIdentifier)
8787
}
8888

89+
override def visitSingleMultipartIdentifier(
90+
ctx: SingleMultipartIdentifierContext): Seq[String] = withOrigin(ctx) {
91+
visitMultipartIdentifier(ctx.multipartIdentifier)
92+
}
93+
8994
override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = withOrigin(ctx) {
9095
visitSparkDataType(ctx.dataType)
9196
}
@@ -957,6 +962,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
957962
FunctionIdentifier(ctx.function.getText, Option(ctx.db).map(_.getText))
958963
}
959964

965+
/**
966+
* Create a multi-part identifier.
967+
*/
968+
override def visitMultipartIdentifier(
969+
ctx: MultipartIdentifierContext): Seq[String] = withOrigin(ctx) {
970+
ctx.parts.asScala.map(_.getText)
971+
}
972+
960973
/* ********************************************************************************************
961974
* Expression parsing
962975
* ******************************************************************************************** */

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
5757
}
5858
}
5959

60+
/** Creates a multi-part identifier for a given SQL string */
61+
override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
62+
parse(sqlText) { parser =>
63+
astBuilder.visitSingleMultipartIdentifier(parser.singleMultipartIdentifier())
64+
}
65+
}
66+
6067
/**
6168
* Creates StructType for a given SQL string, which is a comma separated list of field
6269
* definitions which will preserve the correct Hive metadata.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ trait ParserInterface {
5252
@throws[ParseException]("Text cannot be parsed to a FunctionIdentifier")
5353
def parseFunctionIdentifier(sqlText: String): FunctionIdentifier
5454

55+
/**
56+
* Parse a string to a multi-part identifier.
57+
*/
58+
@throws[ParseException]("Text cannot be parsed to a multi-part identifier")
59+
def parseMultipartIdentifier(sqlText: String): Seq[String]
60+
5561
/**
5662
* Parse a string to a [[StructType]]. The passed SQL string should be a comma separated list
5763
* of field definitions which will preserve the correct Hive metadata.

0 commit comments

Comments
 (0)