Skip to content

Commit 34e3cc7

Browse files
rdbluecloud-fan
authored andcommitted
[SPARK-27108][SQL] Add parsed SQL plans for create, CTAS.
## What changes were proposed in this pull request? This moves parsing `CREATE TABLE ... USING` statements into catalyst. Catalyst produces logical plans with the parsed information and those plans are converted to v1 `DataSource` plans in `DataSourceAnalysis`. This prepares for adding v2 create plans that should receive the information parsed from SQL without being translated to v1 plans first. This also makes it possible to parse in catalyst instead of breaking the parser across the abstract `AstBuilder` in catalyst and `SparkSqlParser` in core. For more information, see the [mailing list thread](https://lists.apache.org/thread.html/54f4e1929ceb9a2b0cac7cb058000feb8de5d6c667b2e0950804c613%3Cdev.spark.apache.org%3E). ## How was this patch tested? This uses existing tests to catch regressions. This introduces no behavior changes. Closes apache#24029 from rdblue/SPARK-27108-add-parsed-create-logical-plans. Authored-by: Ryan Blue <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 78d546f commit 34e3cc7

File tree

11 files changed

+1030
-458
lines changed

11 files changed

+1030
-458
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 191 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ import org.apache.spark.internal.Logging
3030
import org.apache.spark.sql.AnalysisException
3131
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
3232
import org.apache.spark.sql.catalyst.analysis._
33-
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
33+
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
3434
import org.apache.spark.sql.catalyst.expressions._
3535
import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
3636
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
3737
import org.apache.spark.sql.catalyst.plans._
3838
import org.apache.spark.sql.catalyst.plans.logical._
39+
import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement}
3940
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
4041
import org.apache.spark.sql.internal.SQLConf
4142
import org.apache.spark.sql.types._
@@ -1888,4 +1889,193 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
18881889
val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
18891890
if (STRING == null) structField else structField.withComment(string(STRING))
18901891
}
1892+
1893+
/**
1894+
* Create location string.
1895+
*/
1896+
override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) {
1897+
string(ctx.STRING)
1898+
}
1899+
1900+
/**
1901+
* Create a [[BucketSpec]].
1902+
*/
1903+
override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) {
1904+
BucketSpec(
1905+
ctx.INTEGER_VALUE.getText.toInt,
1906+
visitIdentifierList(ctx.identifierList),
1907+
Option(ctx.orderedIdentifierList)
1908+
.toSeq
1909+
.flatMap(_.orderedIdentifier.asScala)
1910+
.map { orderedIdCtx =>
1911+
Option(orderedIdCtx.ordering).map(_.getText).foreach { dir =>
1912+
if (dir.toLowerCase(Locale.ROOT) != "asc") {
1913+
operationNotAllowed(s"Column ordering must be ASC, was '$dir'", ctx)
1914+
}
1915+
}
1916+
1917+
orderedIdCtx.identifier.getText
1918+
})
1919+
}
1920+
1921+
/**
1922+
* Convert a table property list into a key-value map.
1923+
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
1924+
*/
1925+
override def visitTablePropertyList(
1926+
ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
1927+
val properties = ctx.tableProperty.asScala.map { property =>
1928+
val key = visitTablePropertyKey(property.key)
1929+
val value = visitTablePropertyValue(property.value)
1930+
key -> value
1931+
}
1932+
// Check for duplicate property names.
1933+
checkDuplicateKeys(properties, ctx)
1934+
properties.toMap
1935+
}
1936+
1937+
/**
1938+
* Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
1939+
*/
1940+
def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
1941+
val props = visitTablePropertyList(ctx)
1942+
val badKeys = props.collect { case (key, null) => key }
1943+
if (badKeys.nonEmpty) {
1944+
operationNotAllowed(
1945+
s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx)
1946+
}
1947+
props
1948+
}
1949+
1950+
/**
1951+
* Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified.
1952+
*/
1953+
def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = {
1954+
val props = visitTablePropertyList(ctx)
1955+
val badKeys = props.filter { case (_, v) => v != null }.keys
1956+
if (badKeys.nonEmpty) {
1957+
operationNotAllowed(
1958+
s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx)
1959+
}
1960+
props.keys.toSeq
1961+
}
1962+
1963+
/**
1964+
* A table property key can either be String or a collection of dot separated elements. This
1965+
* function extracts the property key based on whether its a string literal or a table property
1966+
* identifier.
1967+
*/
1968+
override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
1969+
if (key.STRING != null) {
1970+
string(key.STRING)
1971+
} else {
1972+
key.getText
1973+
}
1974+
}
1975+
1976+
/**
1977+
* A table property value can be String, Integer, Boolean or Decimal. This function extracts
1978+
* the property value based on whether its a string, integer, boolean or decimal literal.
1979+
*/
1980+
override def visitTablePropertyValue(value: TablePropertyValueContext): String = {
1981+
if (value == null) {
1982+
null
1983+
} else if (value.STRING != null) {
1984+
string(value.STRING)
1985+
} else if (value.booleanValue != null) {
1986+
value.getText.toLowerCase(Locale.ROOT)
1987+
} else {
1988+
value.getText
1989+
}
1990+
}
1991+
1992+
/**
1993+
* Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal).
1994+
*/
1995+
type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean)
1996+
1997+
/**
1998+
* Validate a create table statement and return the [[TableIdentifier]].
1999+
*/
2000+
override def visitCreateTableHeader(
2001+
ctx: CreateTableHeaderContext): TableHeader = withOrigin(ctx) {
2002+
val temporary = ctx.TEMPORARY != null
2003+
val ifNotExists = ctx.EXISTS != null
2004+
if (temporary && ifNotExists) {
2005+
operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx)
2006+
}
2007+
(visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, ctx.EXTERNAL != null)
2008+
}
2009+
2010+
/**
2011+
* Create a table, returning a [[CreateTableStatement]] logical plan.
2012+
*
2013+
* Expected format:
2014+
* {{{
2015+
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
2016+
* USING table_provider
2017+
* create_table_clauses
2018+
* [[AS] select_statement];
2019+
*
2020+
* create_table_clauses (order insensitive):
2021+
* [OPTIONS table_property_list]
2022+
* [PARTITIONED BY (col_name, col_name, ...)]
2023+
* [CLUSTERED BY (col_name, col_name, ...)
2024+
* [SORTED BY (col_name [ASC|DESC], ...)]
2025+
* INTO num_buckets BUCKETS
2026+
* ]
2027+
* [LOCATION path]
2028+
* [COMMENT table_comment]
2029+
* [TBLPROPERTIES (property_name=property_value, ...)]
2030+
* }}}
2031+
*/
2032+
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
2033+
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
2034+
if (external) {
2035+
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
2036+
}
2037+
2038+
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
2039+
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
2040+
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
2041+
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
2042+
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
2043+
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
2044+
2045+
val schema = Option(ctx.colTypeList()).map(createSchema)
2046+
val partitionCols: Seq[String] =
2047+
Option(ctx.partitionColumnNames).map(visitIdentifierList).getOrElse(Nil)
2048+
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
2049+
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
2050+
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
2051+
2052+
val provider = ctx.tableProvider.qualifiedName.getText
2053+
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
2054+
val comment = Option(ctx.comment).map(string)
2055+
2056+
Option(ctx.query).map(plan) match {
2057+
case Some(_) if temp =>
2058+
operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
2059+
2060+
case Some(_) if schema.isDefined =>
2061+
operationNotAllowed(
2062+
"Schema may not be specified in a Create Table As Select (CTAS) statement",
2063+
ctx)
2064+
2065+
case Some(query) =>
2066+
CreateTableAsSelectStatement(
2067+
table, query, partitionCols, bucketSpec, properties, provider, options, location, comment,
2068+
ifNotExists = ifNotExists)
2069+
2070+
case None if temp =>
2071+
// CREATE TEMPORARY TABLE ... USING ... is not supported by the catalyst parser.
2072+
// Use CREATE TEMPORARY VIEW ... USING ... instead.
2073+
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
2074+
2075+
case _ =>
2076+
CreateTableStatement(table, schema.getOrElse(new StructType), partitionCols, bucketSpec,
2077+
properties, provider, options, location, comment, ifNotExists = ifNotExists)
2078+
}
2079+
}
2080+
18912081
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical.sql
19+
20+
import org.apache.spark.sql.catalyst.TableIdentifier
21+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.types.StructType
25+
26+
/**
27+
* A CREATE TABLE command, as parsed from SQL.
28+
*
29+
* This is a metadata-only command and is not used to write data to the created table.
30+
*/
31+
case class CreateTableStatement(
32+
table: TableIdentifier,
33+
tableSchema: StructType,
34+
partitioning: Seq[String],
35+
bucketSpec: Option[BucketSpec],
36+
properties: Map[String, String],
37+
provider: String,
38+
options: Map[String, String],
39+
location: Option[String],
40+
comment: Option[String],
41+
ifNotExists: Boolean) extends ParsedStatement {
42+
43+
override def output: Seq[Attribute] = Seq.empty
44+
45+
override def children: Seq[LogicalPlan] = Seq.empty
46+
}
47+
48+
/**
49+
* A CREATE TABLE AS SELECT command, as parsed from SQL.
50+
*/
51+
case class CreateTableAsSelectStatement(
52+
table: TableIdentifier,
53+
asSelect: LogicalPlan,
54+
partitioning: Seq[String],
55+
bucketSpec: Option[BucketSpec],
56+
properties: Map[String, String],
57+
provider: String,
58+
options: Map[String, String],
59+
location: Option[String],
60+
comment: Option[String],
61+
ifNotExists: Boolean) extends ParsedStatement {
62+
63+
override def output: Seq[Attribute] = Seq.empty
64+
65+
override def children: Seq[LogicalPlan] = Seq(asSelect)
66+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical.sql
19+
20+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
21+
22+
/**
23+
* A logical plan node that contains exactly what was parsed from SQL.
24+
*
25+
* This is used to hold information parsed from SQL when there are multiple implementations of a
26+
* query or command. For example, CREATE TABLE may be implemented by different nodes for v1 and v2.
27+
* Instead of parsing directly to a v1 CreateTable that keeps metadata in CatalogTable, and then
28+
* converting that v1 metadata to the v2 equivalent, the sql [[CreateTableStatement]] plan is
29+
* produced by the parser and converted once into both implementations.
30+
*
31+
* Parsed logical plans are not resolved because they must be converted to concrete logical plans.
32+
*
33+
* Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be
34+
* kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]].
35+
*/
36+
private[sql] abstract class ParsedStatement extends LogicalPlan {
37+
// Redact properties and options when parsed nodes are used by generic methods like toString
38+
override def productIterator: Iterator[Any] = super.productIterator.map {
39+
case mapArg: Map[_, _] => conf.redactOptions(mapArg.asInstanceOf[Map[String, String]])
40+
case other => other
41+
}
42+
43+
final override lazy val resolved = false
44+
}

0 commit comments

Comments
 (0)