Skip to content

Commit 448d248

Browse files
liu-zhaokunkoeninger
authored andcommitted
[SPARK-21168] KafkaRDD should always set kafka clientId.
[https://issues.apache.org/jira/browse/SPARK-21168](https://issues.apache.org/jira/browse/SPARK-21168) There are no a number of other places that a client ID should be set,and I think we should use consumer.clientId in the clientId method,because the fetch request will be used by the same consumer behind. Author: liuzhaokun <[email protected]> Closes apache#19887 from liu-zhaokun/master1205.
1 parent 293a0f2 commit 448d248

File tree

1 file changed

+1
-0
lines changed
  • external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka

1 file changed

+1
-0
lines changed

external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ class KafkaRDD[
191191

192192
private def fetchBatch: Iterator[MessageAndOffset] = {
193193
val req = new FetchRequestBuilder()
194+
.clientId(consumer.clientId)
194195
.addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
195196
.build()
196197
val resp = consumer.fetch(req)

0 commit comments

Comments
 (0)