Skip to content

Commit b7fdf8e

Browse files
Yuval Itzchakovkoeninger
authored andcommitted
[SPARK-24987][SS] - Fix Kafka consumer leak when no new offsets for TopicPartition
## What changes were proposed in this pull request? This small fix adds a `consumer.release()` call to `KafkaSourceRDD` in the case where we've retrieved offsets from Kafka, but the `fromOffset` is equal to the `lastOffset`, meaning there is no new data to read for a particular topic partition. Up until now, we'd just return an empty iterator without closing the consumer which would cause a FD leak. If accepted, this pull request should be merged into master as well. ## How was this patch tested? Haven't ran any specific tests, would love help on how to test methods running inside `RDD.compute`. Author: Yuval Itzchakov <[email protected]> Closes apache#21997 from YuvalItzchakov/master.
1 parent 55e3ae6 commit b7fdf8e

File tree

1 file changed

+1
-2
lines changed

1 file changed

+1
-2
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,6 @@ private[kafka010] class KafkaSourceRDD(
124124
thePart: Partition,
125125
context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
126126
val sourcePartition = thePart.asInstanceOf[KafkaSourceRDDPartition]
127-
val topic = sourcePartition.offsetRange.topic
128-
val kafkaPartition = sourcePartition.offsetRange.partition
129127
val consumer = KafkaDataConsumer.acquire(
130128
sourcePartition.offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
131129

@@ -138,6 +136,7 @@ private[kafka010] class KafkaSourceRDD(
138136
if (range.fromOffset == range.untilOffset) {
139137
logInfo(s"Beginning offset ${range.fromOffset} is the same as ending offset " +
140138
s"skipping ${range.topic} ${range.partition}")
139+
consumer.release()
141140
Iterator.empty
142141
} else {
143142
val underlying = new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {

0 commit comments

Comments
 (0)