Skip to content

Commit 586c440

Browse files
AveryQi115averyqi-db
authored andcommitted
[SPARK-51884][SQL] Part 1.a Add outer scope attributes for SubqueryExpression
### What changes were proposed in this pull request? - Add OuterScopeReference as a new expression type - Use OuterScopeReference to differentiate between outerAttrs and outerScopeAttrs - Add outerScopeAttrs related methods to SubqueryExpression ### Why are the changes needed? Spark only supports one layer of correlation now and does not support nested correlation. For example, ```sql SELECT col1 FROM VALUES (1, 2) t1 (col1, col2) WHERE EXISTS ( SELECT col1 FROM VALUES (1, 2) t2 (col1, col2) WHERE t2.col2 == MAX(t1.col2) )GROUP BY col1; ``` is supported and ```sql SELECT col1 FROM VALUES (1, 2) t1 (col1, col2) WHERE EXISTS ( SELECT col1 FROM VALUES (1, 2) t2 (col1, col2) WHERE t2.col2 == ( SELECT MAX(t1.col2) ) )GROUP BY col1; ``` is not supported. The reason spark does not support it is because the Analyzer and Optimizer resolves and plans Subquery in a recursive way. The definition change for the SubqueryExpression adds the metadata OuterScopeAttrs which helps later rewrites for the Analyzer and Optimizer. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Current UT and Suite ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#50838 from AveryQi115/part_1_a_add_a_new_expression_type. Lead-authored-by: Avery <[email protected]> Co-authored-by: Avery Qi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a14d73e commit 586c440

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,29 @@ case class OuterReference(e: NamedExpression)
452452
final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
453453
}
454454

455+
/**
456+
* A place holder used to hold a reference that has been resolved to a field outside of the subquery
457+
* plan. We use it only for queries containing nested correlation and only in the
458+
* SubqueryExpression#outerAttrs to indicate that the correlated columns referenced by
459+
* this subquery expression are from the outer scopes, not the subquery plan or
460+
* the immediate outer plan of the subquery plan. For example,
461+
* SubqueryExpression(outerAttrs=[OuterScopeReference(a)] means a cannot be resolved by the
462+
* SubqueryExpression.plan or the plan holding this SubqueryExpression.
463+
*/
464+
case class OuterScopeReference(e: NamedExpression)
465+
extends LeafExpression with NamedExpression with Unevaluable {
466+
override def dataType: DataType = e.dataType
467+
override def nullable: Boolean = e.nullable
468+
override def prettyName: String = "outerScope"
469+
470+
override def sql: String = s"$prettyName(${e.sql})"
471+
override def name: String = e.name
472+
override def qualifier: Seq[String] = e.qualifier
473+
override def exprId: ExprId = e.exprId
474+
override def toAttribute: Attribute = e.toAttribute
475+
override def newInstance(): NamedExpression = OuterScopeReference(e.newInstance())
476+
}
477+
455478
/**
456479
* A placeholder used to hold a [[NamedExpression]] that has been temporarily resolved as the
457480
* reference to a lateral column alias. It will be restored back to [[UnresolvedAttribute]] if

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,29 @@ abstract class SubqueryExpression(
7979
exprId: ExprId,
8080
joinCond: Seq[Expression],
8181
hint: Option[HintInfo]) extends PlanExpression[LogicalPlan] {
82+
8283
override lazy val resolved: Boolean = childrenResolved && plan.resolved
84+
85+
lazy val (outerScopeAttrs, nonOuterScopeAttrs) =
86+
outerAttrs.partition(_.exists(_.isInstanceOf[OuterScopeReference]))
87+
8388
override lazy val references: AttributeSet =
84-
AttributeSet.fromAttributeSets(outerAttrs.map(_.references))
89+
AttributeSet.fromAttributeSets(nonOuterScopeAttrs.map(_.references))
90+
8591
override def children: Seq[Expression] = outerAttrs ++ joinCond
92+
8693
override def withNewPlan(plan: LogicalPlan): SubqueryExpression
94+
8795
def withNewOuterAttrs(outerAttrs: Seq[Expression]): SubqueryExpression
96+
97+
def getOuterAttrs: Seq[Expression] = outerAttrs
98+
99+
def getOuterScopeAttrs: Seq[Expression] = outerScopeAttrs
100+
88101
def isCorrelated: Boolean = outerAttrs.nonEmpty
102+
89103
def hint: Option[HintInfo]
104+
90105
def withNewHint(hint: Option[HintInfo]): SubqueryExpression
91106
}
92107

0 commit comments

Comments
 (0)