Skip to content

Commit 8bb0df2

Browse files
viiryajerryshao
authored andcommitted
[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener
## What changes were proposed in this pull request? The `StreamingListener` in PySpark side seems to be lack of `onStreamingStarted` method. This patch adds it and a test for it. This patch also includes a trivial doc improvement for `createDirectStream`. Original PR is apache#21057. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <[email protected]> Closes apache#21098 from viirya/SPARK-24014.
1 parent 0c94e48 commit 8bb0df2

File tree

3 files changed

+15
-1
lines changed

3 files changed

+15
-1
lines changed

python/pyspark/streaming/kafka.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
104104
:param topics: list of topic_name to consume.
105105
:param kafkaParams: Additional params for Kafka.
106106
:param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting
107-
point of the stream.
107+
point of the stream (a dictionary mapping `TopicAndPartition` to
108+
integers).
108109
:param keyDecoder: A function used to decode key (default is utf8_decoder).
109110
:param valueDecoder: A function used to decode value (default is utf8_decoder).
110111
:param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess

python/pyspark/streaming/listener.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ class StreamingListener(object):
2323
def __init__(self):
2424
pass
2525

26+
def onStreamingStarted(self, streamingStarted):
27+
"""
28+
Called when the streaming has been started.
29+
"""
30+
pass
31+
2632
def onReceiverStarted(self, receiverStarted):
2733
"""
2834
Called when a receiver has been started

python/pyspark/streaming/tests.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,10 @@ def __init__(self):
507507
self.batchInfosCompleted = []
508508
self.batchInfosStarted = []
509509
self.batchInfosSubmitted = []
510+
self.streamingStartedTime = []
511+
512+
def onStreamingStarted(self, streamingStarted):
513+
self.streamingStartedTime.append(streamingStarted.time)
510514

511515
def onBatchSubmitted(self, batchSubmitted):
512516
self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
@@ -530,9 +534,12 @@ def func(dstream):
530534
batchInfosSubmitted = batch_collector.batchInfosSubmitted
531535
batchInfosStarted = batch_collector.batchInfosStarted
532536
batchInfosCompleted = batch_collector.batchInfosCompleted
537+
streamingStartedTime = batch_collector.streamingStartedTime
533538

534539
self.wait_for(batchInfosCompleted, 4)
535540

541+
self.assertEqual(len(streamingStartedTime), 1)
542+
536543
self.assertGreaterEqual(len(batchInfosSubmitted), 4)
537544
for info in batchInfosSubmitted:
538545
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)

0 commit comments

Comments
 (0)