Skip to content

Commit fedf696

Browse files
committed
[SPARK-22094][SS] processAllAvailable should check the query state
## What changes were proposed in this pull request? `processAllAvailable` should also check the query state and if the query is stopped, it should return. ## How was this patch tested? The new unit test. Author: Shixiong Zhu <[email protected]> Closes apache#19314 from zsxwing/SPARK-22094.
1 parent f32a842 commit fedf696

File tree

2 files changed

+13
-1
lines changed

2 files changed

+13
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ class StreamExecution(
840840
if (streamDeathCause != null) {
841841
throw streamDeathCause
842842
}
843-
if (noNewData) {
843+
if (noNewData || !isActive) {
844844
return
845845
}
846846
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
640640
}
641641
}
642642

643+
test("processAllAvailable should not block forever when a query is stopped") {
644+
val input = MemoryStream[Int]
645+
input.addData(1)
646+
val query = input.toDF().writeStream
647+
.trigger(Trigger.Once())
648+
.format("console")
649+
.start()
650+
failAfter(streamingTimeout) {
651+
query.processAllAvailable()
652+
}
653+
}
654+
643655
/** Create a streaming DF that only execute one batch in which it returns the given static DF */
644656
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
645657
require(!triggerDF.isStreaming)

0 commit comments

Comments
 (0)