Skip to content

Commit 14291b0

Browse files
jerryshaotdas
authored andcommitted
[SPARK-23748][SS] Fix SS continuous process doesn't support SubqueryAlias issue
## What changes were proposed in this pull request? Current SS continuous doesn't support processing on temp table or `df.as("xxx")`, SS will throw an exception as LogicalPlan not supported, details described in [here](https://issues.apache.org/jira/browse/SPARK-23748). So here propose to add this support. ## How was this patch tested? new UT. Author: jerryshao <[email protected]> Closes apache#21017 from jerryshao/SPARK-23748.
1 parent 682002b commit 14291b0

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ object UnsupportedOperationChecker {
345345
plan.foreachUp { implicit subPlan =>
346346
subPlan match {
347347
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
348-
_: DeserializeToObject | _: SerializeFromObject) =>
348+
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias) =>
349349
case node if node.nodeName == "StreamingRelationV2" =>
350350
case node =>
351351
throwError(s"Continuous processing does not support ${node.nodeName} operations.")

sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,25 @@ class ContinuousSuite extends ContinuousSuiteBase {
171171
"Continuous processing does not support current time operations."))
172172
}
173173

174+
test("subquery alias") {
175+
val df = spark.readStream
176+
.format("rate")
177+
.option("numPartitions", "5")
178+
.option("rowsPerSecond", "5")
179+
.load()
180+
.createOrReplaceTempView("rate")
181+
val test = spark.sql("select value from rate where value > 5")
182+
183+
testStream(test, useV2Sink = true)(
184+
StartStream(longContinuousTrigger),
185+
AwaitEpoch(0),
186+
Execute(waitForRateSourceTriggers(_, 2)),
187+
IncrementEpoch(),
188+
Execute(waitForRateSourceTriggers(_, 4)),
189+
IncrementEpoch(),
190+
CheckAnswerRowsContains(scala.Range(6, 20).map(Row(_))))
191+
}
192+
174193
test("repeatedly restart") {
175194
val df = spark.readStream
176195
.format("rate")

0 commit comments

Comments
 (0)