Skip to content

Commit 16e7195

Browse files
fuwhudongjoon-hyun
authored andcommitted
[SPARK-29834][SQL] DESC DATABASE should look up catalog like v2 commands
### What changes were proposed in this pull request? Add DescribeNamespaceStatement, DescribeNamespace and DescribeNamespaceExec to make "DESC DATABASE" look up catalog like v2 commands. ### Why are the changes needed? It's important to make all the commands have the same catalog/namespace resolution behavior, to avoid confusing end-users. ### Does this PR introduce any user-facing change? Yes, add "DESC NAMESPACE" whose function is same as "DESC DATABASE" and "DESC SCHEMA". ### How was this patch tested? New unit test Closes apache#26513 from fuwhu/SPARK-29834. Authored-by: fuwhu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 7720781 commit 16e7195

File tree

12 files changed

+146
-35
lines changed

12 files changed

+146
-35
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ statement
199199
| SHOW CREATE TABLE multipartIdentifier #showCreateTable
200200
| SHOW CURRENT NAMESPACE #showCurrentNamespace
201201
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
202-
| (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase
202+
| (DESC | DESCRIBE) (database | NAMESPACE) EXTENDED?
203+
multipartIdentifier #describeNamespace
203204
| (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
204205
multipartIdentifier partitionSpec? describeColName? #describeTable
205206
| (DESC | DESCRIBE) QUERY? query #describeQuery

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
172172
case DropNamespaceStatement(NonSessionCatalog(catalog, nameParts), ifExists, cascade) =>
173173
DropNamespace(catalog, nameParts, ifExists, cascade)
174174

175+
case DescribeNamespaceStatement(NonSessionCatalog(catalog, nameParts), extended) =>
176+
DescribeNamespace(catalog, nameParts, extended)
177+
175178
case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
176179
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)
177180

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2541,6 +2541,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
25412541
Option(ctx.pattern).map(string))
25422542
}
25432543

2544+
/**
2545+
* Create a [[DescribeNamespaceStatement]].
2546+
*
2547+
* For example:
2548+
* {{{
2549+
* DESCRIBE (DATABASE|SCHEMA|NAMESPACE) [EXTENDED] database;
2550+
* }}}
2551+
*/
2552+
override def visitDescribeNamespace(ctx: DescribeNamespaceContext): LogicalPlan =
2553+
withOrigin(ctx) {
2554+
DescribeNamespaceStatement(
2555+
visitMultipartIdentifier(ctx.multipartIdentifier()),
2556+
ctx.EXTENDED != null)
2557+
}
2558+
25442559
/**
25452560
* Create a table, returning a [[CreateTableStatement]] logical plan.
25462561
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,13 @@ case class DescribeTableStatement(
269269
partitionSpec: TablePartitionSpec,
270270
isExtended: Boolean) extends ParsedStatement
271271

272+
/**
273+
* A DESCRIBE NAMESPACE statement, as parsed from SQL.
274+
*/
275+
case class DescribeNamespaceStatement(
276+
namespace: Seq[String],
277+
extended: Boolean) extends ParsedStatement
278+
272279
/**
273280
* A DESCRIBE TABLE tbl_name col_name statement, as parsed from SQL.
274281
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
2323
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, SupportsNamespaces, TableCatalog, TableChange}
2424
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
2525
import org.apache.spark.sql.connector.expressions.Transform
26-
import org.apache.spark.sql.types.{DataType, StringType, StructType}
26+
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType}
2727

2828
/**
2929
* Base trait for DataSourceV2 write commands
@@ -255,6 +255,21 @@ case class DropNamespace(
255255
ifExists: Boolean,
256256
cascade: Boolean) extends Command
257257

258+
/**
259+
* The logical plan of the DESCRIBE NAMESPACE command that works for v2 catalogs.
260+
*/
261+
case class DescribeNamespace(
262+
catalog: CatalogPlugin,
263+
namespace: Seq[String],
264+
extended: Boolean) extends Command {
265+
266+
override def output: Seq[Attribute] = Seq(
267+
AttributeReference("name", StringType, nullable = false,
268+
new MetadataBuilder().putString("comment", "name of the column").build())(),
269+
AttributeReference("value", StringType, nullable = true,
270+
new MetadataBuilder().putString("comment", "value of the column").build())())
271+
}
272+
258273
/**
259274
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
260275
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,13 @@ class DDLParserSuite extends AnalysisTest {
653653
"DESC TABLE COLUMN for a specific partition is not supported"))
654654
}
655655

656+
test("describe database") {
657+
val sql1 = "DESCRIBE DATABASE EXTENDED a.b"
658+
val sql2 = "DESCRIBE DATABASE a.b"
659+
comparePlans(parsePlan(sql1), DescribeNamespaceStatement(Seq("a", "b"), extended = true))
660+
comparePlans(parsePlan(sql2), DescribeNamespaceStatement(Seq("a", "b"), extended = false))
661+
}
662+
656663
test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") {
657664
comparePlans(parsePlan("describe t"),
658665
DescribeTableStatement(Seq("t"), Map.empty, isExtended = false))

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,13 @@ class ResolveSessionCatalog(
158158
case AlterViewUnsetPropertiesStatement(SessionCatalog(catalog, tableName), keys, ifExists) =>
159159
AlterTableUnsetPropertiesCommand(tableName.asTableIdentifier, keys, ifExists, isView = true)
160160

161+
case d @ DescribeNamespaceStatement(SessionCatalog(_, nameParts), _) =>
162+
if (nameParts.length != 1) {
163+
throw new AnalysisException(
164+
s"The database name is not valid: ${nameParts.quoted}")
165+
}
166+
DescribeDatabaseCommand(nameParts.head, d.extended)
167+
161168
case DescribeTableStatement(
162169
nameParts @ SessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
163170
loadTable(catalog, tableName.asIdentifier).collect {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -258,18 +258,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
258258
)
259259
}
260260

261-
/**
262-
* Create a [[DescribeDatabaseCommand]] command.
263-
*
264-
* For example:
265-
* {{{
266-
* DESCRIBE DATABASE [EXTENDED] database;
267-
* }}}
268-
*/
269-
override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) {
270-
DescribeDatabaseCommand(ctx.db.getText, ctx.EXTENDED != null)
271-
}
272-
273261
/**
274262
* Create a plan for a DESCRIBE FUNCTION command.
275263
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
2222
import org.apache.spark.sql.{AnalysisException, Strategy}
2323
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
2424
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
25-
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
25+
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
2626
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
2727
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
2828
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
@@ -192,6 +192,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
192192
Nil
193193
}
194194

195+
case desc @ DescribeNamespace(catalog, namespace, extended) =>
196+
DescribeNamespaceExec(desc.output, catalog, namespace, extended) :: Nil
197+
195198
case desc @ DescribeTable(DataSourceV2Relation(table, _, _), isExtended) =>
196199
DescribeTableExec(desc.output, table, isExtended) :: Nil
197200

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import scala.collection.JavaConverters._
21+
import scala.collection.mutable.ArrayBuffer
22+
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.catalyst.encoders.RowEncoder
25+
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
26+
import org.apache.spark.sql.connector.catalog.CatalogPlugin
27+
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.COMMENT_TABLE_PROP
28+
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.LOCATION_TABLE_PROP
29+
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.RESERVED_PROPERTIES
30+
import org.apache.spark.sql.types.StructType
31+
32+
/**
33+
* Physical plan node for describing a namespace.
34+
*/
35+
case class DescribeNamespaceExec(
36+
output: Seq[Attribute],
37+
catalog: CatalogPlugin,
38+
namespace: Seq[String],
39+
isExtended: Boolean) extends V2CommandExec {
40+
41+
private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind()
42+
43+
override protected def run(): Seq[InternalRow] = {
44+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
45+
46+
val rows = new ArrayBuffer[InternalRow]()
47+
val nsCatalog = catalog.asNamespaceCatalog
48+
val ns = namespace.toArray
49+
val metadata = nsCatalog.loadNamespaceMetadata(ns)
50+
51+
rows += toCatalystRow("Namespace Name", ns.last)
52+
rows += toCatalystRow("Description", metadata.get(COMMENT_TABLE_PROP))
53+
rows += toCatalystRow("Location", metadata.get(LOCATION_TABLE_PROP))
54+
if (isExtended) {
55+
val properties = metadata.asScala.toSeq.filter(p => !RESERVED_PROPERTIES.contains(p._1))
56+
if (properties.nonEmpty) {
57+
rows += toCatalystRow("Properties", properties.mkString("(", ",", ")"))
58+
}
59+
}
60+
rows
61+
}
62+
63+
private def toCatalystRow(strs: String*): InternalRow = {
64+
encoder.toRow(new GenericRowWithSchema(strs.toArray, schema)).copy()
65+
}
66+
}

0 commit comments

Comments
 (0)