Skip to content

Commit 135ff16

Browse files
Reza Safikoeninger
authored andcommitted
[SPARK-25233][STREAMING] Give the user the option of specifying a minimum message per partition per batch when using kafka direct API with backpressure
After SPARK-18371, it is guaranteed that there would be at least one message per partition per batch using direct kafka API when new messages exist in the topics. This change will give the user the option of setting the minimum instead of just a hard coded 1 limit The related unit test is updated and some internal tests verified that the topic partitions with new messages will be progressed by the specified minimum. Author: Reza Safi <[email protected]> Closes apache#22223 from rezasafi/streaminglag.
1 parent 9e0f959 commit 135ff16

File tree

4 files changed

+20
-4
lines changed

4 files changed

+20
-4
lines changed

docs/configuration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1997,6 +1997,14 @@ showDF(properties, numRows = 200, truncate = FALSE)
19971997
for more details.
19981998
</td>
19991999
</tr>
2000+
<tr>
2001+
<td><code>spark.streaming.kafka.minRatePerPartition</code></td>
2002+
<td>1</td>
2003+
<td>
2004+
Minimum rate (number of records per second) at which data will be read from each Kafka
2005+
partition when using the new Kafka direct stream API.
2006+
</td>
2007+
</tr>
20002008
<tr>
20012009
<td><code>spark.streaming.kafka.maxRetries</code></td>
20022010
<td>1</td>

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
154154
if (effectiveRateLimitPerPartition.values.sum > 0) {
155155
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
156156
Some(effectiveRateLimitPerPartition.map {
157-
case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L)
157+
case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
158+
ppc.minRatePerPartition(tp))
158159
})
159160
} else {
160161
None

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ abstract class PerPartitionConfig extends Serializable {
3434
* from each Kafka partition.
3535
*/
3636
def maxRatePerPartition(topicPartition: TopicPartition): Long
37+
def minRatePerPartition(topicPartition: TopicPartition): Long = 1
3738
}
3839

3940
/**
@@ -42,6 +43,8 @@ abstract class PerPartitionConfig extends Serializable {
4243
private class DefaultPerPartitionConfig(conf: SparkConf)
4344
extends PerPartitionConfig {
4445
val maxRate = conf.getLong("spark.streaming.kafka.maxRatePerPartition", 0)
46+
val minRate = conf.getLong("spark.streaming.kafka.minRatePerPartition", 1)
4547

4648
def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate
49+
override def minRatePerPartition(topicPartition: TopicPartition): Long = minRate
4750
}

external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,8 @@ class DirectKafkaStreamSuite
664664
kafkaStream.stop()
665665
}
666666

667-
test("maxMessagesPerPartition with zero offset and rate equal to one") {
667+
test("maxMessagesPerPartition with zero offset and rate equal to the specified" +
668+
" minimum with default 1") {
668669
val topic = "backpressure"
669670
val kafkaParams = getKafkaParams()
670671
val batchIntervalMilliseconds = 60000
@@ -674,6 +675,8 @@ class DirectKafkaStreamSuite
674675
.setMaster("local[1]")
675676
.setAppName(this.getClass.getSimpleName)
676677
.set("spark.streaming.kafka.maxRatePerPartition", "100")
678+
.set("spark.streaming.kafka.minRatePerPartition", "5")
679+
677680

678681
// Setup the streaming context
679682
ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
@@ -704,12 +707,13 @@ class DirectKafkaStreamSuite
704707
)
705708
val result = kafkaStream.maxMessagesPerPartition(offsets)
706709
val expected = Map(
707-
new TopicPartition(topic, 0) -> 1L,
710+
new TopicPartition(topic, 0) -> 5L,
708711
new TopicPartition(topic, 1) -> 10L,
709712
new TopicPartition(topic, 2) -> 20L,
710713
new TopicPartition(topic, 3) -> 30L
711714
)
712-
assert(result.contains(expected), s"Number of messages per partition must be at least 1")
715+
assert(result.contains(expected), s"Number of messages per partition must be at least equal" +
716+
s" to the specified minimum")
713717
}
714718

715719
/** Get the generated offset ranges from the DirectKafkaStream */

0 commit comments

Comments
 (0)