Skip to content

Commit ea222a3

Browse files
dtenedorcloud-fan
authored andcommitted
[SPARK-49566][SQL] Add SQL pipe syntax for the EXTEND operator
### What changes were proposed in this pull request? This PR adds SQL pipe syntax support for the EXTEND operator. This operator preserves the existing input table and adds one or more new computed columns whose values are equal to evaluating the specified expressions. This is equivalent to `SELECT *, <newExpressions>` in the SQL compiler. It is provided as a convenience feature and some functionality overlap exists with lateral column aliases. For example: ``` CREATE TABLE t(x INT, y STRING) USING CSV; INSERT INTO t VALUES (0, 'abc'), (1, 'def'); TABLE t |> EXTEND x + LENGTH(y) AS z; +----+-----+-----+ | x | y | z | +----+-----+-----+ | 0 | abc | 3 | | 1 | def | 4 | +----+-----+-----+ ``` Like the `|> SELECT` operator, aggregate functions are not allowed in these expressions. During the course of developing reasonable error messages for this, I found that the SQL pipe syntax research paper also specified that the `|> AGGREGATE` operator should require that each non-grouping expression contains at least one aggregate function; I added a check and reasonable error message for this case as well. ### Why are the changes needed? The SQL pipe operator syntax will let users compose queries in a more flexible fashion. ### Does this PR introduce _any_ user-facing change? Yes, see above. ### How was this patch tested? This PR adds a few unit test cases, but mostly relies on golden file test coverage. I did this to make sure the answers are correct as this feature is implemented and also so we can look at the analyzer output plans to ensure they look right as well. ### Was this patch authored or co-authored using generative AI tooling? No Closes #48854 from dtenedor/pipe-syntax-projections. Authored-by: Daniel Tenedorio <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 0fbd34c commit ea222a3

File tree

15 files changed

+618
-123
lines changed

15 files changed

+618
-123
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3995,9 +3995,15 @@
39953995
],
39963996
"sqlState" : "42K03"
39973997
},
3998-
"PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION" : {
3998+
"PIPE_OPERATOR_AGGREGATE_EXPRESSION_CONTAINS_NO_AGGREGATE_FUNCTION" : {
39993999
"message" : [
4000-
"Aggregate function <expr> is not allowed when using the pipe operator |> SELECT clause; please use the pipe operator |> AGGREGATE clause instead"
4000+
"Non-grouping expression <expr> is provided as an argument to the |> AGGREGATE pipe operator but does not contain any aggregate function; please update it to include an aggregate function and then retry the query again."
4001+
],
4002+
"sqlState" : "0A000"
4003+
},
4004+
"PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION" : {
4005+
"message" : [
4006+
"Aggregate function <expr> is not allowed when using the pipe operator |> <clause> clause; please use the pipe operator |> AGGREGATE clause instead."
40014007
],
40024008
"sqlState" : "0A000"
40034009
},

docs/sql-ref-ansi-compliance.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ Below is a list of all the keywords in Spark SQL.
514514
|EXISTS|non-reserved|non-reserved|reserved|
515515
|EXPLAIN|non-reserved|non-reserved|non-reserved|
516516
|EXPORT|non-reserved|non-reserved|non-reserved|
517+
|EXTEND|non-reserved|non-reserved|non-reserved|
517518
|EXTENDED|non-reserved|non-reserved|non-reserved|
518519
|EXTERNAL|non-reserved|non-reserved|reserved|
519520
|EXTRACT|non-reserved|non-reserved|reserved|

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ EXCLUDE: 'EXCLUDE';
228228
EXISTS: 'EXISTS';
229229
EXPLAIN: 'EXPLAIN';
230230
EXPORT: 'EXPORT';
231+
EXTEND: 'EXTEND';
231232
EXTENDED: 'EXTENDED';
232233
EXTERNAL: 'EXTERNAL';
233234
EXTRACT: 'EXTRACT';

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,6 +1503,7 @@ version
15031503

15041504
operatorPipeRightSide
15051505
: selectClause windowClause?
1506+
| EXTEND extendList=namedExpressionSeq
15061507
// Note that the WINDOW clause is not allowed in the WHERE pipe operator, but we add it here in
15071508
// the grammar simply for purposes of catching this invalid syntax and throwing a specific
15081509
// dedicated error message.
@@ -1617,6 +1618,7 @@ ansiNonReserved
16171618
| EXISTS
16181619
| EXPLAIN
16191620
| EXPORT
1621+
| EXTEND
16201622
| EXTENDED
16211623
| EXTERNAL
16221624
| EXTRACT
@@ -1963,6 +1965,7 @@ nonReserved
19631965
| EXISTS
19641966
| EXPLAIN
19651967
| EXPORT
1968+
| EXTEND
19661969
| EXTENDED
19671970
| EXTERNAL
19681971
| EXTRACT

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,55 @@
1818
package org.apache.spark.sql.catalyst.expressions
1919

2020
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
21-
import org.apache.spark.sql.catalyst.trees.TreePattern.{PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE, TreePattern}
2221
import org.apache.spark.sql.errors.QueryCompilationErrors
2322

2423
/**
25-
* Represents a SELECT clause when used with the |> SQL pipe operator.
26-
* We use this to make sure that no aggregate functions exist in the SELECT expressions.
24+
* Represents an expression when used with a SQL pipe operator.
25+
* We use this to check invariants about whether aggregate functions may exist in these expressions.
26+
* @param child The child expression.
27+
* @param isAggregate Whether the pipe operator is |> AGGREGATE.
28+
* If true, the child expression must contain at least one aggregate function.
29+
* If false, the child expression must not contain any aggregate functions.
30+
* @param clause The clause of the pipe operator. This is used to generate error messages.
2731
*/
28-
case class PipeSelect(child: Expression)
32+
case class PipeExpression(child: Expression, isAggregate: Boolean, clause: String)
2933
extends UnaryExpression with RuntimeReplaceable {
30-
final override val nodePatterns: Seq[TreePattern] = Seq(PIPE_OPERATOR_SELECT, RUNTIME_REPLACEABLE)
31-
override def withNewChildInternal(newChild: Expression): Expression = PipeSelect(newChild)
34+
override def withNewChildInternal(newChild: Expression): Expression =
35+
PipeExpression(newChild, isAggregate, clause)
3236
override lazy val replacement: Expression = {
33-
def visit(e: Expression): Unit = e match {
34-
case a: AggregateFunction =>
35-
// If we used the pipe operator |> SELECT clause to specify an aggregate function, this is
36-
// invalid; return an error message instructing the user to use the pipe operator
37-
// |> AGGREGATE clause for this purpose instead.
38-
throw QueryCompilationErrors.pipeOperatorSelectContainsAggregateFunction(a)
39-
case _: WindowExpression =>
40-
// Window functions are allowed in pipe SELECT operators, so do not traverse into children.
41-
case _ =>
42-
e.children.foreach(visit)
37+
val firstAggregateFunction: Option[AggregateFunction] = findFirstAggregate(child)
38+
if (isAggregate && firstAggregateFunction.isEmpty) {
39+
throw QueryCompilationErrors.pipeOperatorAggregateExpressionContainsNoAggregateFunction(child)
40+
} else if (!isAggregate) {
41+
firstAggregateFunction.foreach { a =>
42+
throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, clause)
43+
}
4344
}
44-
visit(child)
4545
child
4646
}
47+
48+
/** Returns the first aggregate function in the given expression, or None if not found. */
49+
private def findFirstAggregate(e: Expression): Option[AggregateFunction] = e match {
50+
case a: AggregateFunction =>
51+
Some(a)
52+
case _: WindowExpression =>
53+
// Window functions are allowed in these pipe operators, so do not traverse into children.
54+
None
55+
case _ =>
56+
e.children.flatMap(findFirstAggregate).headOption
57+
}
4758
}
4859

4960
object PipeOperators {
5061
// These are definitions of query result clauses that can be used with the pipe operator.
62+
val aggregateClause = "AGGREGATE"
5163
val clusterByClause = "CLUSTER BY"
5264
val distributeByClause = "DISTRIBUTE BY"
65+
val extendClause = "EXTEND"
5366
val limitClause = "LIMIT"
5467
val offsetClause = "OFFSET"
5568
val orderByClause = "ORDER BY"
69+
val selectClause = "SELECT"
5670
val sortByClause = "SORT BY"
5771
val sortByDistributeByClause = "SORT BY ... DISTRIBUTE BY ..."
5872
val windowClause = "WINDOW"

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,7 +1236,7 @@ class AstBuilder extends DataTypeAstBuilder
12361236
* Add a regular (SELECT) query specification to a logical plan. The query specification
12371237
* is the core of the logical plan, this is where sourcing (FROM clause), projection (SELECT),
12381238
* aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
1239-
* If 'isPipeOperatorSelect' is true, wraps each projected expression with a [[PipeSelect]]
1239+
* If 'isPipeOperatorSelect' is true, wraps each projected expression with a [[PipeExpression]]
12401240
* expression for future validation of the expressions during analysis.
12411241
*
12421242
* Note that query hints are ignored (both by the parser and the builder).
@@ -1293,11 +1293,12 @@ class AstBuilder extends DataTypeAstBuilder
12931293

12941294
def createProject() = if (namedExpressions.nonEmpty) {
12951295
val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) {
1296-
// If this is a pipe operator |> SELECT clause, add a [[PipeSelect]] expression wrapping
1296+
// If this is a pipe operator |> SELECT clause, add a [[PipeExpression]] wrapping
12971297
// each alias in the project list, so the analyzer can check invariants later.
12981298
namedExpressions.map {
12991299
case a: Alias =>
1300-
a.withNewChildren(Seq(PipeSelect(a.child)))
1300+
a.withNewChildren(Seq(
1301+
PipeExpression(a.child, isAggregate = false, PipeOperators.selectClause)))
13011302
.asInstanceOf[NamedExpression]
13021303
case other =>
13031304
other
@@ -5933,6 +5934,24 @@ class AstBuilder extends DataTypeAstBuilder
59335934
windowClause = ctx.windowClause,
59345935
relation = left,
59355936
isPipeOperatorSelect = true)
5937+
}.getOrElse(Option(ctx.EXTEND).map { _ =>
5938+
// Visit each expression in the EXTEND operator, and add a PipeExpression expression on top of
5939+
// it to generate clear error messages if the expression contains any aggregate functions
5940+
// (this is not allowed in the EXTEND operator).
5941+
val extendExpressions: Seq[NamedExpression] =
5942+
Option(ctx.extendList).map { n: NamedExpressionSeqContext =>
5943+
visitNamedExpressionSeq(n).map {
5944+
case (a: Alias, _) =>
5945+
a.copy(
5946+
child = PipeExpression(a.child, isAggregate = false, PipeOperators.extendClause))(
5947+
a.exprId, a.qualifier, a.explicitMetadata, a.nonInheritableMetadataKeys)
5948+
case (e: Expression, aliasFunc) =>
5949+
UnresolvedAlias(
5950+
PipeExpression(e, isAggregate = false, PipeOperators.extendClause), aliasFunc)
5951+
}
5952+
}.get
5953+
val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) ++ extendExpressions
5954+
Project(projectList, left)
59365955
}.getOrElse(Option(ctx.whereClause).map { c =>
59375956
if (ctx.windowClause() != null) {
59385957
throw QueryParsingErrors.windowClauseInPipeOperatorWhereClauseNotAllowedError(ctx)
@@ -5959,7 +5978,7 @@ class AstBuilder extends DataTypeAstBuilder
59595978
withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
59605979
}.getOrElse(
59615980
visitOperatorPipeAggregate(ctx, left)
5962-
))))))))
5981+
)))))))))
59635982
}
59645983

59655984
private def visitOperatorPipeAggregate(
@@ -5970,11 +5989,18 @@ class AstBuilder extends DataTypeAstBuilder
59705989
"The AGGREGATE clause requires a list of aggregate expressions " +
59715990
"or a list of grouping expressions, or both", ctx)
59725991
}
5992+
// Visit each aggregate expression, and add a PipeAggregate expression on top of it to generate
5993+
// clear error messages if the expression does not contain at least one aggregate function.
59735994
val aggregateExpressions: Seq[NamedExpression] =
59745995
Option(ctx.namedExpressionSeq()).map { n: NamedExpressionSeqContext =>
59755996
visitNamedExpressionSeq(n).map {
5976-
case (e: NamedExpression, _) => e
5977-
case (e: Expression, aliasFunc) => UnresolvedAlias(e, aliasFunc)
5997+
case (a: Alias, _) =>
5998+
a.copy(child =
5999+
PipeExpression(a.child, isAggregate = true, PipeOperators.aggregateClause))(
6000+
a.exprId, a.qualifier, a.explicitMetadata, a.nonInheritableMetadataKeys)
6001+
case (e: Expression, aliasFunc) =>
6002+
UnresolvedAlias(
6003+
PipeExpression(e, isAggregate = true, PipeOperators.aggregateClause), aliasFunc)
59786004
}
59796005
}.getOrElse(Seq.empty)
59806006
Option(ctx.aggregationClause()).map { c: AggregationClauseContext =>

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4135,14 +4135,23 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
41354135
)
41364136
}
41374137

4138-
def pipeOperatorSelectContainsAggregateFunction(expr: Expression): Throwable = {
4138+
def pipeOperatorAggregateExpressionContainsNoAggregateFunction(expr: Expression): Throwable = {
41394139
new AnalysisException(
4140-
errorClass = "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
4140+
errorClass = "PIPE_OPERATOR_AGGREGATE_EXPRESSION_CONTAINS_NO_AGGREGATE_FUNCTION",
41414141
messageParameters = Map(
41424142
"expr" -> expr.toString),
41434143
origin = expr.origin)
41444144
}
41454145

4146+
def pipeOperatorContainsAggregateFunction(expr: Expression, clause: String): Throwable = {
4147+
new AnalysisException(
4148+
errorClass = "PIPE_OPERATOR_CONTAINS_AGGREGATE_FUNCTION",
4149+
messageParameters = Map(
4150+
"expr" -> expr.toString,
4151+
"clause" -> clause),
4152+
origin = expr.origin)
4153+
}
4154+
41464155
def inlineTableContainsScalarSubquery(inlineTable: LogicalPlan): Throwable = {
41474156
new AnalysisException(
41484157
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.SCALAR_SUBQUERY_IN_VALUES",

0 commit comments

Comments
 (0)