Skip to content

Commit 5fccdae

Browse files
jerryshaokoeninger
authored andcommitted
[SPARK-22968][DSTREAM] Throw an exception on partition revoking issue
## What changes were proposed in this pull request? Kafka partitions can be revoked when new consumers joined in the consumer group to rebalance the partitions. But current Spark Kafka connector code makes sure there's no partition revoking scenarios, so trying to get latest offset from revoked partitions will throw exceptions as JIRA mentioned. Partition revoking happens when new consumer joined the consumer group, which means different streaming apps are trying to use same group id. This is fundamentally not correct, different apps should use different consumer group. So instead of throwing an confused exception from Kafka, improve the exception message by identifying revoked partition and directly throw an meaningful exception when partition is revoked. Besides, this PR also fixes bugs in `DirectKafkaWordCount`, this example simply cannot be worked without the fix. ``` 8/01/05 09:48:27 INFO internals.ConsumerCoordinator: Revoking previously assigned partitions [kssh-7, kssh-4, kssh-3, kssh-6, kssh-5, kssh-0, kssh-2, kssh-1] for group use_a_separate_group_id_for_each_stream 18/01/05 09:48:27 INFO internals.AbstractCoordinator: (Re-)joining group use_a_separate_group_id_for_each_stream 18/01/05 09:48:27 INFO internals.AbstractCoordinator: Successfully joined group use_a_separate_group_id_for_each_stream with generation 4 18/01/05 09:48:27 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [kssh-7, kssh-4, kssh-6, kssh-5] for group use_a_separate_group_id_for_each_stream ``` ## How was this patch tested? This is manually verified in local cluster, unfortunately I'm not sure how to simulate it in UT, so propose the PR without UT added. Author: jerryshao <[email protected]> Closes apache#21038 from jerryshao/SPARK-22968.
1 parent 1ca3c50 commit 5fccdae

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
// scalastyle:off println
1919
package org.apache.spark.examples.streaming
2020

21+
import org.apache.kafka.clients.consumer.ConsumerConfig
22+
import org.apache.kafka.common.serialization.StringDeserializer
23+
2124
import org.apache.spark.SparkConf
2225
import org.apache.spark.streaming._
2326
import org.apache.spark.streaming.kafka010._
@@ -26,18 +29,20 @@ import org.apache.spark.streaming.kafka010._
2629
* Consumes messages from one or more topics in Kafka and does wordcount.
2730
* Usage: DirectKafkaWordCount <brokers> <topics>
2831
* <brokers> is a list of one or more Kafka brokers
32+
* <groupId> is a consumer group name to consume from topics
2933
* <topics> is a list of one or more kafka topics to consume from
3034
*
3135
* Example:
3236
* $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
33-
* topic1,topic2
37+
* consumer-group topic1,topic2
3438
*/
3539
object DirectKafkaWordCount {
3640
def main(args: Array[String]) {
37-
if (args.length < 2) {
41+
if (args.length < 3) {
3842
System.err.println(s"""
3943
|Usage: DirectKafkaWordCount <brokers> <topics>
4044
| <brokers> is a list of one or more Kafka brokers
45+
| <groupId> is a consumer group name to consume from topics
4146
| <topics> is a list of one or more kafka topics to consume from
4247
|
4348
""".stripMargin)
@@ -46,15 +51,19 @@ object DirectKafkaWordCount {
4651

4752
StreamingExamples.setStreamingLogLevels()
4853

49-
val Array(brokers, topics) = args
54+
val Array(brokers, groupId, topics) = args
5055

5156
// Create context with 2 second batch interval
5257
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
5358
val ssc = new StreamingContext(sparkConf, Seconds(2))
5459

5560
// Create direct kafka stream with brokers and topics
5661
val topicsSet = topics.split(",").toSet
57-
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
62+
val kafkaParams = Map[String, Object](
63+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
64+
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
65+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
66+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
5867
val messages = KafkaUtils.createDirectStream[String, String](
5968
ssc,
6069
LocationStrategies.PreferConsistent,

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,20 @@ private[spark] class DirectKafkaInputDStream[K, V](
190190

191191
// make sure new partitions are reflected in currentOffsets
192192
val newPartitions = parts.diff(currentOffsets.keySet)
193+
194+
// Check if there's any partition been revoked because of consumer rebalance.
195+
val revokedPartitions = currentOffsets.keySet.diff(parts)
196+
if (revokedPartitions.nonEmpty) {
197+
throw new IllegalStateException(s"Previously tracked partitions " +
198+
s"${revokedPartitions.mkString("[", ",", "]")} been revoked by Kafka because of consumer " +
199+
s"rebalance. This is mostly due to another stream with same group id joined, " +
200+
s"please check if there're different streaming application misconfigure to use same " +
201+
s"group id. Fundamentally different stream should use different group id")
202+
}
203+
193204
// position for new partitions determined by auto.offset.reset if no commit
194205
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
206+
195207
// don't want to consume messages, so pause
196208
c.pause(newPartitions.asJava)
197209
// find latest available offsets

0 commit comments

Comments
 (0)