Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 08e4272

Browse files
brkyvzzsxwing
authored andcommitted
[SPARK-18868][FLAKY-TEST] Deflake StreamingQueryListenerSuite: single listener, check trigger...
## What changes were proposed in this pull request? Use `recentProgress` instead of `lastProgress` and filter out last non-zero value. Also add eventually to the latest assertQuery similar to first `assertQuery` ## How was this patch tested? Ran test 1000 times Author: Burak Yavuz <[email protected]> Closes apache#16287 from brkyvz/SPARK-18868. (cherry picked from commit 9c7f83b) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent a7364a8 commit 08e4272

File tree

1 file changed

+16
-9
lines changed

1 file changed

+16
-9
lines changed

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
8484
CheckAnswer(10, 5),
8585
AssertOnQuery { query =>
8686
assert(listener.progressEvents.nonEmpty)
87-
assert(listener.progressEvents.last.json === query.lastProgress.json)
87+
// SPARK-18868: We can't use query.lastProgress, because in progressEvents, we filter
88+
// out non-zero input rows, but the lastProgress may be a zero input row trigger
89+
val lastNonZeroProgress = query.recentProgress.filter(_.numInputRows > 0).lastOption
90+
.getOrElse(fail("No progress updates received in StreamingQuery!"))
91+
assert(listener.progressEvents.last.json === lastNonZeroProgress.json)
8892
assert(listener.terminationEvent === null)
8993
true
9094
},
@@ -109,14 +113,17 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
109113
AdvanceManualClock(100),
110114
ExpectFailure[SparkException],
111115
AssertOnQuery { query =>
112-
assert(listener.terminationEvent !== null)
113-
assert(listener.terminationEvent.id === query.id)
114-
assert(listener.terminationEvent.exception.nonEmpty)
115-
// Make sure that the exception message reported through listener
116-
// contains the actual exception and relevant stack trace
117-
assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
118-
assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
119-
assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
116+
eventually(Timeout(streamingTimeout)) {
117+
assert(listener.terminationEvent !== null)
118+
assert(listener.terminationEvent.id === query.id)
119+
assert(listener.terminationEvent.exception.nonEmpty)
120+
// Make sure that the exception message reported through listener
121+
// contains the actual exception and relevant stack trace
122+
assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
123+
assert(
124+
listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
125+
assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
126+
}
120127
listener.checkAsyncErrors()
121128
true
122129
}

0 commit comments

Comments
 (0)