Skip to content

Commit 4304663

Browse files
committed
[SPARK-46062][SQL] Sync the isStreaming flag between CTE definition and reference
### What changes were proposed in this pull request? This PR proposes to sync the flag `isStreaming` from CTE definition to CTE reference. The essential issue is that CTE reference node cannot determine the flag `isStreaming` by itself, and never be able to have a proper value and always takes the default as it does not have a parameter in constructor. The other flag `resolved` is handled, and we need to do the same for `isStreaming`. Once we add the parameter to the constructor, we will also need to make sure the flag is in sync with CTE definition. We have a rule `ResolveWithCTE` doing the sync, hence we add the logic to sync the flag `isStreaming` as well. ### Why are the changes needed? The bug may impact some rules which behaves differently depending on isStreaming flag. It would no longer be a problem once CTE reference is replaced with CTE definition at some point in "optimization phase", but all rules in analyzer and optimizer being triggered before the rule takes effect may misbehave based on incorrect isStreaming flag. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43966 from HeartSaVioR/SPARK-46062. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 9c21238 commit 4304663

File tree

24 files changed

+239
-176
lines changed

24 files changed

+239
-176
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
283283
d.child
284284
} else {
285285
// Add a `SubqueryAlias` for hint-resolving rules to match relation names.
286-
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output))
286+
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, d.isStreaming))
287287
}
288288
}.getOrElse(u)
289289

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
5151

5252
case ref: CTERelationRef if !ref.resolved =>
5353
cteDefMap.get(ref.cteId).map { cteDef =>
54-
CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output)
54+
CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming)
5555
}.getOrElse {
5656
ref
5757
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,8 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
381381
val subqueryCTE = header.plan.asInstanceOf[CTERelationDef]
382382
GetStructField(
383383
ScalarSubquery(
384-
CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output),
384+
CTERelationRef(subqueryCTE.id, _resolved = true, subqueryCTE.output,
385+
subqueryCTE.isStreaming),
385386
exprId = ssr.exprId),
386387
ssr.headerIndex)
387388
} else {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
141141
cteDef
142142
}
143143

144-
case cteRef @ CTERelationRef(cteId, _, output, _) =>
144+
case cteRef @ CTERelationRef(cteId, _, output, _, _) =>
145145
val (cteDef, _, _, newAttrSet) = cteMap(cteId)
146146
if (needsPruning(cteDef.child, newAttrSet)) {
147147
val indices = newAttrSet.toSeq.map(cteDef.output.indexOf)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,7 @@ case class CTERelationRef(
862862
cteId: Long,
863863
_resolved: Boolean,
864864
override val output: Seq[Attribute],
865+
override val isStreaming: Boolean,
865866
statsOpt: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation {
866867

867868
final override val nodePatterns: Seq[TreePattern] = Seq(CTE)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,7 +1573,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
15731573
test("SPARK-43030: deduplicate relations in CTE relation definitions") {
15741574
val join = testRelation.as("left").join(testRelation.as("right"))
15751575
val cteDef = CTERelationDef(join)
1576-
val cteRef = CTERelationRef(cteDef.id, false, Nil)
1576+
val cteRef = CTERelationRef(cteDef.id, false, Nil, false)
15771577

15781578
withClue("flat CTE") {
15791579
val plan = WithCTE(cteRef.select($"left.a"), Seq(cteDef)).analyze
@@ -1586,7 +1586,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
15861586

15871587
withClue("nested CTE") {
15881588
val cteDef2 = CTERelationDef(WithCTE(cteRef.join(testRelation), Seq(cteDef)))
1589-
val cteRef2 = CTERelationRef(cteDef2.id, false, Nil)
1589+
val cteRef2 = CTERelationRef(cteDef2.id, false, Nil, false)
15901590
val plan = WithCTE(cteRef2, Seq(cteDef2)).analyze
15911591
val relations = plan.collect {
15921592
case r: LocalRelation => r
@@ -1598,7 +1598,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
15981598

15991599
test("SPARK-43030: deduplicate CTE relation references") {
16001600
val cteDef = CTERelationDef(testRelation.select($"a"))
1601-
val cteRef = CTERelationRef(cteDef.id, false, Nil)
1601+
val cteRef = CTERelationRef(cteDef.id, false, Nil, false)
16021602

16031603
withClue("single reference") {
16041604
val plan = WithCTE(cteRef.where($"a" > 1), Seq(cteDef)).analyze
@@ -1621,7 +1621,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
16211621

16221622
withClue("CTE relation has duplicated attributes") {
16231623
val cteDef = CTERelationDef(testRelation.select($"a", $"a"))
1624-
val cteRef = CTERelationRef(cteDef.id, false, Nil)
1624+
val cteRef = CTERelationRef(cteDef.id, false, Nil, false)
16251625
val plan = WithCTE(cteRef.join(cteRef.select($"a")), Seq(cteDef)).analyze
16261626
val refs = plan.collect {
16271627
case r: CTERelationRef => r
@@ -1633,14 +1633,14 @@ class AnalysisSuite extends AnalysisTest with Matchers {
16331633
withClue("CTE relation has duplicate aliases") {
16341634
val alias = Alias($"a", "x")()
16351635
val cteDef = CTERelationDef(testRelation.select(alias, alias).where($"x" === 1))
1636-
val cteRef = CTERelationRef(cteDef.id, false, Nil)
1636+
val cteRef = CTERelationRef(cteDef.id, false, Nil, false)
16371637
// Should not fail with the assertion failure: Found duplicate rewrite attributes.
16381638
WithCTE(cteRef.join(cteRef), Seq(cteDef)).analyze
16391639
}
16401640

16411641
withClue("references in both CTE relation definition and main query") {
16421642
val cteDef2 = CTERelationDef(cteRef.where($"a" > 2))
1643-
val cteRef2 = CTERelationRef(cteDef2.id, false, Nil)
1643+
val cteRef2 = CTERelationRef(cteDef2.id, false, Nil, false)
16441644
val plan = WithCTE(cteRef.union(cteRef2), Seq(cteDef, cteDef2)).analyze
16451645
val refs = plan.collect {
16461646
case r: CTERelationRef => r
@@ -1747,4 +1747,19 @@ class AnalysisSuite extends AnalysisTest with Matchers {
17471747
// EventTimeWatermark node is NOT eliminated.
17481748
assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
17491749
}
1750+
1751+
test("SPARK-46062: isStreaming flag is synced from CTE definition to CTE reference") {
1752+
val cteDef = CTERelationDef(streamingRelation.select($"a", $"ts"))
1753+
// Intentionally marking the flag _resolved to false, so that analyzer has a chance to sync
1754+
// the flag isStreaming on syncing the flag _resolved.
1755+
val cteRef = CTERelationRef(cteDef.id, _resolved = false, Nil, isStreaming = false)
1756+
val plan = WithCTE(cteRef, Seq(cteDef)).analyze
1757+
1758+
val refs = plan.collect {
1759+
case r: CTERelationRef => r
1760+
}
1761+
assert(refs.length == 1)
1762+
assert(refs.head.resolved)
1763+
assert(refs.head.isStreaming)
1764+
}
17501765
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueriesSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ class MergeScalarSubqueriesSuite extends PlanTest {
4242
}
4343

4444
private def extractorExpression(cteIndex: Int, output: Seq[Attribute], fieldIndex: Int) = {
45-
GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, output)), fieldIndex)
45+
GetStructField(ScalarSubquery(
46+
CTERelationRef(cteIndex, _resolved = true, output, isStreaming = false)), fieldIndex)
4647
.as("scalarsubquery()")
4748
}
4849

sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ CreateViewCommand `myview`, [(c1,None)], WITH "v"("a") AS (SELECT 1) SELECT "a"
418418
: +- OneRowRelation
419419
+- Project [a#x]
420420
+- SubqueryAlias v
421-
+- CTERelationRef xxxx, true, [a#x]
421+
+- CTERelationRef xxxx, true, [a#x], false
422422

423423

424424
-- !query
@@ -438,7 +438,7 @@ Project [a1#x AS a2#x]
438438
: +- OneRowRelation
439439
+- Project [a#x]
440440
+- SubqueryAlias v
441-
+- CTERelationRef xxxx, true, [a#x]
441+
+- CTERelationRef xxxx, true, [a#x], false
442442

443443

444444
-- !query

sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`cte_tbl`, ErrorI
1010
: +- OneRowRelation
1111
+- Project [col#x]
1212
+- SubqueryAlias s
13-
+- CTERelationRef xxxx, true, [col#x]
13+
+- CTERelationRef xxxx, true, [col#x], false
1414

1515

1616
-- !query
@@ -32,7 +32,7 @@ CreateViewCommand `cte_view`, WITH s AS (SELECT 42 AS col) SELECT * FROM s, fals
3232
: +- OneRowRelation
3333
+- Project [col#x]
3434
+- SubqueryAlias s
35-
+- CTERelationRef xxxx, true, [col#x]
35+
+- CTERelationRef xxxx, true, [col#x], false
3636

3737

3838
-- !query
@@ -49,7 +49,7 @@ Project [col#x]
4949
: +- OneRowRelation
5050
+- Project [col#x]
5151
+- SubqueryAlias s
52-
+- CTERelationRef xxxx, true, [col#x]
52+
+- CTERelationRef xxxx, true, [col#x], false
5353

5454

5555
-- !query
@@ -64,7 +64,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d
6464
: +- OneRowRelation
6565
+- Project [col#x]
6666
+- SubqueryAlias S
67-
+- CTERelationRef xxxx, true, [col#x]
67+
+- CTERelationRef xxxx, true, [col#x], false
6868

6969

7070
-- !query
@@ -86,7 +86,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d
8686
: +- OneRowRelation
8787
+- Project [col#x]
8888
+- SubqueryAlias s
89-
+- CTERelationRef xxxx, true, [col#x]
89+
+- CTERelationRef xxxx, true, [col#x], false
9090

9191

9292
-- !query

sql/core/src/test/resources/sql-tests/analyzer-results/cte-nested.sql.out

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ WithCTE
1515
: +- SubqueryAlias t
1616
: +- Project [1#x]
1717
: +- SubqueryAlias t2
18-
: +- CTERelationRef xxxx, true, [1#x]
18+
: +- CTERelationRef xxxx, true, [1#x], false
1919
+- Project [1#x]
2020
+- SubqueryAlias t
21-
+- CTERelationRef xxxx, true, [1#x]
21+
+- CTERelationRef xxxx, true, [1#x], false
2222

2323

2424
-- !query
@@ -37,7 +37,7 @@ Aggregate [max(c#x) AS max(c)#x]
3737
: +- OneRowRelation
3838
+- Project [c#x]
3939
+- SubqueryAlias t
40-
+- CTERelationRef xxxx, true, [c#x]
40+
+- CTERelationRef xxxx, true, [c#x], false
4141

4242

4343
-- !query
@@ -54,7 +54,7 @@ Project [scalar-subquery#x [] AS scalarsubquery()#x]
5454
: : +- OneRowRelation
5555
: +- Project [1#x]
5656
: +- SubqueryAlias t
57-
: +- CTERelationRef xxxx, true, [1#x]
57+
: +- CTERelationRef xxxx, true, [1#x], false
5858
+- OneRowRelation
5959

6060

@@ -137,11 +137,11 @@ WithCTE
137137
: : : +- OneRowRelation
138138
: : +- Project [c#x]
139139
: : +- SubqueryAlias t
140-
: : +- CTERelationRef xxxx, true, [c#x]
140+
: : +- CTERelationRef xxxx, true, [c#x], false
141141
: +- OneRowRelation
142142
+- Project [scalarsubquery()#x]
143143
+- SubqueryAlias t2
144-
+- CTERelationRef xxxx, true, [scalarsubquery()#x]
144+
+- CTERelationRef xxxx, true, [scalarsubquery()#x], false
145145

146146

147147
-- !query
@@ -191,7 +191,7 @@ WithCTE
191191
+- SubqueryAlias __auto_generated_subquery_name
192192
+- Project [c#x]
193193
+- SubqueryAlias t
194-
+- CTERelationRef xxxx, true, [c#x]
194+
+- CTERelationRef xxxx, true, [c#x], false
195195

196196

197197
-- !query
@@ -220,7 +220,7 @@ WithCTE
220220
+- SubqueryAlias __auto_generated_subquery_name
221221
+- Project [c#x]
222222
+- SubqueryAlias t
223-
+- CTERelationRef xxxx, true, [c#x]
223+
+- CTERelationRef xxxx, true, [c#x], false
224224

225225

226226
-- !query
@@ -255,7 +255,7 @@ WithCTE
255255
+- SubqueryAlias __auto_generated_subquery_name
256256
+- Project [c#x]
257257
+- SubqueryAlias t
258-
+- CTERelationRef xxxx, true, [c#x]
258+
+- CTERelationRef xxxx, true, [c#x], false
259259

260260

261261
-- !query
@@ -358,14 +358,14 @@ WithCTE
358358
: +- SubqueryAlias t
359359
: +- Project [1#x]
360360
: +- SubqueryAlias t2
361-
: +- CTERelationRef xxxx, true, [1#x]
361+
: +- CTERelationRef xxxx, true, [1#x], false
362362
:- CTERelationDef xxxx, false
363363
: +- SubqueryAlias t2
364364
: +- Project [2 AS 2#x]
365365
: +- OneRowRelation
366366
+- Project [1#x]
367367
+- SubqueryAlias t
368-
+- CTERelationRef xxxx, true, [1#x]
368+
+- CTERelationRef xxxx, true, [1#x], false
369369

370370

371371
-- !query
@@ -428,15 +428,15 @@ WithCTE
428428
: +- SubqueryAlias t3
429429
: +- Project [1#x]
430430
: +- SubqueryAlias t1
431-
: +- CTERelationRef xxxx, true, [1#x]
431+
: +- CTERelationRef xxxx, true, [1#x], false
432432
:- CTERelationDef xxxx, false
433433
: +- SubqueryAlias t2
434434
: +- Project [1#x]
435435
: +- SubqueryAlias t3
436-
: +- CTERelationRef xxxx, true, [1#x]
436+
: +- CTERelationRef xxxx, true, [1#x], false
437437
+- Project [1#x]
438438
+- SubqueryAlias t2
439-
+- CTERelationRef xxxx, true, [1#x]
439+
+- CTERelationRef xxxx, true, [1#x], false
440440

441441

442442
-- !query
@@ -459,12 +459,12 @@ WithCTE
459459
: +- SubqueryAlias cte_inner
460460
: +- Project [1#x]
461461
: +- SubqueryAlias cte_outer
462-
: +- CTERelationRef xxxx, true, [1#x]
462+
: +- CTERelationRef xxxx, true, [1#x], false
463463
+- Project [1#x]
464464
+- SubqueryAlias __auto_generated_subquery_name
465465
+- Project [1#x]
466466
+- SubqueryAlias cte_inner
467-
+- CTERelationRef xxxx, true, [1#x]
467+
+- CTERelationRef xxxx, true, [1#x], false
468468

469469

470470
-- !query
@@ -492,19 +492,19 @@ WithCTE
492492
: +- SubqueryAlias cte_inner_inner
493493
: +- Project [1#x]
494494
: +- SubqueryAlias cte_outer
495-
: +- CTERelationRef xxxx, true, [1#x]
495+
: +- CTERelationRef xxxx, true, [1#x], false
496496
:- CTERelationDef xxxx, false
497497
: +- SubqueryAlias cte_inner
498498
: +- Project [1#x]
499499
: +- SubqueryAlias __auto_generated_subquery_name
500500
: +- Project [1#x]
501501
: +- SubqueryAlias cte_inner_inner
502-
: +- CTERelationRef xxxx, true, [1#x]
502+
: +- CTERelationRef xxxx, true, [1#x], false
503503
+- Project [1#x]
504504
+- SubqueryAlias __auto_generated_subquery_name
505505
+- Project [1#x]
506506
+- SubqueryAlias cte_inner
507-
+- CTERelationRef xxxx, true, [1#x]
507+
+- CTERelationRef xxxx, true, [1#x], false
508508

509509

510510
-- !query

0 commit comments

Comments
 (0)