Skip to content

Commit a703dac

Browse files
committed
[SPARK-46064][SQL][SS] Move out EliminateEventTimeWatermark to the analyzer and change to only take effect on resolved child
### What changes were proposed in this pull request? This PR proposes to move out EliminateEventTimeWatermark to the analyzer (one of the analysis rule), and also make a change to eliminate EventTimeWatermark node only when the child of EventTimeWatermark is "resolved". ### Why are the changes needed? Currently, we apply EliminateEventTimeWatermark immediately when withWatermark is called, which means the rule is applied immediately against the child, regardless whether child is resolved or not. It is not an issue for the usage of DataFrame API initiated by read / readStream, because streaming sources have the flag isStreaming set to true even it is yet resolved. But mix-up of SQL and DataFrame API would expose the issue; we may not know the exact value of isStreaming flag on unresolved node and it is subject to change upon resolution. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43971 from HeartSaVioR/SPARK-46064. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 4221c66 commit a703dac

File tree

6 files changed

+48
-8
lines changed

6 files changed

+48
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
347347
Batch("Cleanup", fixedPoint,
348348
CleanupAliases),
349349
Batch("HandleSpecialCommand", Once,
350-
HandleSpecialCommand)
350+
HandleSpecialCommand),
351+
Batch("Remove watermark for batch query", Once,
352+
EliminateEventTimeWatermark)
351353
)
352354

353355
/**
@@ -3938,7 +3940,7 @@ object CleanupAliases extends Rule[LogicalPlan] with AliasHelper {
39383940
object EliminateEventTimeWatermark extends Rule[LogicalPlan] {
39393941
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
39403942
_.containsPattern(EVENT_TIME_WATERMARK)) {
3941-
case EventTimeWatermark(_, _, child) if !child.isStreaming => child
3943+
case EventTimeWatermark(_, _, child) if child.resolved && !child.isStreaming => child
39423944
}
39433945
}
39443946

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1724,4 +1724,27 @@ class AnalysisSuite extends AnalysisTest with Matchers {
17241724
checkAnalysis(ident2.select($"a"), testRelation.select($"a").analyze)
17251725
}
17261726
}
1727+
1728+
test("SPARK-46064 Basic functionality of elimination for watermark node in batch query") {
1729+
val dfWithEventTimeWatermark = EventTimeWatermark($"ts",
1730+
IntervalUtils.fromIntervalString("10 seconds"), batchRelationWithTs)
1731+
1732+
val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new QueryPlanningTracker)
1733+
1734+
// EventTimeWatermark node is eliminated via EliminateEventTimeWatermark.
1735+
assert(!analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
1736+
}
1737+
1738+
test("SPARK-46064 EliminateEventTimeWatermark properly handles the case where the child of " +
1739+
"EventTimeWatermark changes the isStreaming flag during resolution") {
1740+
// UnresolvedRelation which is batch initially and will be resolved as streaming
1741+
val dfWithTempView = UnresolvedRelation(TableIdentifier("streamingTable"))
1742+
val dfWithEventTimeWatermark = EventTimeWatermark($"ts",
1743+
IntervalUtils.fromIntervalString("10 seconds"), dfWithTempView)
1744+
1745+
val analyzed = getAnalyzer.executeAndCheck(dfWithEventTimeWatermark, new QueryPlanningTracker)
1746+
1747+
// EventTimeWatermark node is NOT eliminated.
1748+
assert(analyzed.exists(_.isInstanceOf[EventTimeWatermark]))
1749+
}
17271750
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ trait AnalysisTest extends PlanTest {
8282
createTempView(catalog, "TaBlE3", TestRelations.testRelation3, overrideIfExists = true)
8383
createGlobalTempView(catalog, "TaBlE4", TestRelations.testRelation4, overrideIfExists = true)
8484
createGlobalTempView(catalog, "TaBlE5", TestRelations.testRelation5, overrideIfExists = true)
85+
createTempView(catalog, "streamingTable", TestRelations.streamingRelation,
86+
overrideIfExists = true)
8587
new Analyzer(catalog) {
8688
override val extendedResolutionRules = extendedAnalysisRules
8789
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,18 @@ object TestRelations {
6868

6969
val mapRelation = LocalRelation(
7070
AttributeReference("map", MapType(IntegerType, IntegerType))())
71+
72+
val streamingRelation = LocalRelation(
73+
Seq(
74+
AttributeReference("a", IntegerType)(),
75+
AttributeReference("ts", TimestampType)()
76+
),
77+
isStreaming = true)
78+
79+
val batchRelationWithTs = LocalRelation(
80+
Seq(
81+
AttributeReference("a", IntegerType)(),
82+
AttributeReference("ts", TimestampType)()
83+
),
84+
isStreaming = false)
7185
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,7 +1190,7 @@ class FilterPushdownSuite extends PlanTest {
11901190

11911191
test("watermark pushdown: no pushdown on watermark attribute #1") {
11921192
val interval = new CalendarInterval(2, 2, 2000L)
1193-
val relation = LocalRelation(attrA, $"b".timestamp, attrC)
1193+
val relation = LocalRelation(Seq(attrA, $"b".timestamp, attrC), Nil, isStreaming = true)
11941194

11951195
// Verify that all conditions except the watermark touching condition are pushed down
11961196
// by the optimizer and others are not.
@@ -1205,7 +1205,7 @@ class FilterPushdownSuite extends PlanTest {
12051205

12061206
test("watermark pushdown: no pushdown for nondeterministic filter") {
12071207
val interval = new CalendarInterval(2, 2, 2000L)
1208-
val relation = LocalRelation(attrA, attrB, $"c".timestamp)
1208+
val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, isStreaming = true)
12091209

12101210
// Verify that all conditions except the watermark touching condition are pushed down
12111211
// by the optimizer and others are not.
@@ -1221,7 +1221,7 @@ class FilterPushdownSuite extends PlanTest {
12211221

12221222
test("watermark pushdown: full pushdown") {
12231223
val interval = new CalendarInterval(2, 2, 2000L)
1224-
val relation = LocalRelation(attrA, attrB, $"c".timestamp)
1224+
val relation = LocalRelation(Seq(attrA, attrB, $"c".timestamp), Nil, isStreaming = true)
12251225

12261226
// Verify that all conditions except the watermark touching condition are pushed down
12271227
// by the optimizer and others are not.
@@ -1236,7 +1236,7 @@ class FilterPushdownSuite extends PlanTest {
12361236

12371237
test("watermark pushdown: no pushdown on watermark attribute #2") {
12381238
val interval = new CalendarInterval(2, 2, 2000L)
1239-
val relation = LocalRelation($"a".timestamp, attrB, attrC)
1239+
val relation = LocalRelation(Seq($"a".timestamp, attrB, attrC), Nil, isStreaming = true)
12401240

12411241
val originalQuery = EventTimeWatermark($"a", interval, relation)
12421242
.where($"a" === new java.sql.Timestamp(0) && $"b" === 10)

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -778,8 +778,7 @@ class Dataset[T] private[sql](
778778
val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold)
779779
require(!IntervalUtils.isNegative(parsedDelay),
780780
s"delay threshold ($delayThreshold) should not be negative.")
781-
EliminateEventTimeWatermark(
782-
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan))
781+
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
783782
}
784783
}
785784

0 commit comments

Comments
 (0)