Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit c4de90f

Browse files
committed
[SPARK-18852][SS] StreamingQuery.lastProgress should be null when recentProgress is empty
## What changes were proposed in this pull request? Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError. This PR just makes it return null instead. ## How was this patch tested? `test("lastProgress should be null when recentProgress is empty")` Author: Shixiong Zhu <[email protected]> Closes apache#16273 from zsxwing/SPARK-18852. (cherry picked from commit 1ac6567) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent e8866f9 commit c4de90f

File tree

6 files changed

+57
-14
lines changed

6 files changed

+57
-14
lines changed

python/pyspark/sql/streaming.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,15 @@ def recentProgress(self):
125125
@since(2.1)
126126
def lastProgress(self):
127127
"""
128-
Returns the most recent :class:`StreamingQueryProgress` update of this streaming query.
128+
Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or
129+
None if there were no progress updates
129130
:return: a map
130131
"""
131-
return json.loads(self._jsq.lastProgress().json())
132+
lastProgress = self._jsq.lastProgress()
133+
if lastProgress:
134+
return json.loads(lastProgress.json())
135+
else:
136+
return None
132137

133138
@since(2.0)
134139
def processAllAvailable(self):

python/pyspark/sql/tests.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1119,9 +1119,25 @@ def test_stream_status_and_progress(self):
11191119
self.assertTrue(df.isStreaming)
11201120
out = os.path.join(tmpPath, 'out')
11211121
chk = os.path.join(tmpPath, 'chk')
1122-
q = df.writeStream \
1122+
1123+
def func(x):
1124+
time.sleep(1)
1125+
return x
1126+
1127+
from pyspark.sql.functions import col, udf
1128+
sleep_udf = udf(func)
1129+
1130+
# Use "sleep_udf" to delay the progress update so that we can test `lastProgress` when there
1131+
# were no updates.
1132+
q = df.select(sleep_udf(col("value")).alias('value')).writeStream \
11231133
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
11241134
try:
1135+
# "lastProgress" will return None in most cases. However, as it may be flaky when
1136+
# Jenkins is very slow, we don't assert it. If there is something wrong, "lastProgress"
1137+
# may throw error with a high chance and make this test flaky, so we should still be
1138+
# able to detect broken codes.
1139+
q.lastProgress
1140+
11251141
q.processAllAvailable()
11261142
lastProgress = q.lastProgress
11271143
recentProgress = q.recentProgress

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ trait ProgressReporter extends Logging {
100100
progressBuffer.toArray
101101
}
102102

103-
/** Returns the most recent query progress update. */
103+
/** Returns the most recent query progress update or null if there were no progress updates. */
104104
def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
105-
progressBuffer.last
105+
progressBuffer.lastOption.orNull
106106
}
107107

108108
/** Begins recording statistics about query progress for a given trigger. */

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.scalatest.time.SpanSugar._
3232
import org.apache.spark.SparkException
3333
import org.apache.spark.sql.Dataset
3434
import org.apache.spark.sql.execution.streaming._
35+
import org.apache.spark.sql.streaming.util.BlockingSource
3536
import org.apache.spark.util.Utils
3637

3738
class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
@@ -217,7 +218,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
217218

218219
test("SPARK-18811: Source resolution should not block main thread") {
219220
failAfter(streamingTimeout) {
220-
StreamingQueryManagerSuite.latch = new CountDownLatch(1)
221+
BlockingSource.latch = new CountDownLatch(1)
221222
withTempDir { tempDir =>
222223
// if source resolution was happening on the main thread, it would block the start call,
223224
// now it should only be blocking the stream execution thread
@@ -231,7 +232,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
231232
eventually(Timeout(streamingTimeout)) {
232233
assert(sq.status.message.contains("Initializing sources"))
233234
}
234-
StreamingQueryManagerSuite.latch.countDown()
235+
BlockingSource.latch.countDown()
235236
sq.stop()
236237
}
237238
}
@@ -321,7 +322,3 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
321322
(inputData, mapped)
322323
}
323324
}
324-
325-
object StreamingQueryManagerSuite {
326-
var latch: CountDownLatch = null
327-
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20-
import scala.collection.JavaConverters._
20+
import java.util.concurrent.CountDownLatch
2121

2222
import org.apache.commons.lang3.RandomStringUtils
2323
import org.scalactic.TolerantNumerics
@@ -32,6 +32,7 @@ import org.apache.spark.SparkException
3232
import org.apache.spark.sql.execution.streaming._
3333
import org.apache.spark.sql.functions._
3434
import org.apache.spark.sql.internal.SQLConf
35+
import org.apache.spark.sql.streaming.util.BlockingSource
3536
import org.apache.spark.util.ManualClock
3637

3738

@@ -312,6 +313,24 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
312313
)
313314
}
314315

316+
test("lastProgress should be null when recentProgress is empty") {
317+
BlockingSource.latch = new CountDownLatch(1)
318+
withTempDir { tempDir =>
319+
val sq = spark.readStream
320+
.format("org.apache.spark.sql.streaming.util.BlockingSource")
321+
.load()
322+
.writeStream
323+
.format("org.apache.spark.sql.streaming.util.BlockingSource")
324+
.option("checkpointLocation", tempDir.toString)
325+
.start()
326+
// Creating source is blocked so recentProgress is empty and lastProgress should be null
327+
assert(sq.lastProgress === null)
328+
// Release the latch and stop the query
329+
BlockingSource.latch.countDown()
330+
sq.stop()
331+
}
332+
}
333+
315334
test("codahale metrics") {
316335
val inputData = MemoryStream[Int]
317336

sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala renamed to sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark.sql.streaming.util
1919

20+
import java.util.concurrent.CountDownLatch
21+
2022
import org.apache.spark.sql.{SQLContext, _}
2123
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source}
2224
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
23-
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
25+
import org.apache.spark.sql.streaming.OutputMode
2426
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
2527

2628
/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */
@@ -42,7 +44,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
4244
schema: Option[StructType],
4345
providerName: String,
4446
parameters: Map[String, String]): Source = {
45-
StreamingQueryManagerSuite.latch.await()
47+
BlockingSource.latch.await()
4648
new Source {
4749
override def schema: StructType = fakeSchema
4850
override def getOffset: Option[Offset] = Some(new LongOffset(0))
@@ -64,3 +66,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
6466
}
6567
}
6668
}
69+
70+
object BlockingSource {
71+
var latch: CountDownLatch = null
72+
}

0 commit comments

Comments
 (0)