Skip to content

Commit 6118b66

Browse files
fix: memory leak in incremental assign (#35)
* fix: memory leak in incremental assign * Update src/kafka-consumer.cc Co-authored-by: Milind L <[email protected]> * docs: add comment about delete toppars --------- Co-authored-by: Milind L <[email protected]>
1 parent dccab40 commit 6118b66

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

src/kafka-consumer.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,15 @@ Baton KafkaConsumer::IncrementalUnassign(std::vector<RdKafka::TopicPartition*> p
245245

246246
RdKafka::Error* error = consumer->incremental_unassign(partitions);
247247

248+
std::vector<RdKafka::TopicPartition*> delete_partitions;
249+
248250
if (error == NULL) {
249251
// For now, use two for loops. Make more efficient if needed at a later point.
250252
for (unsigned int i = 0; i < partitions.size(); i++) {
251253
for (unsigned int j = 0; j < m_partitions.size(); j++) {
252254
if (partitions[i]->partition() == m_partitions[j]->partition() &&
253255
partitions[i]->topic() == m_partitions[j]->topic()) {
256+
delete_partitions.push_back(m_partitions[j]);
254257
m_partitions.erase(m_partitions.begin() + j);
255258
m_partition_cnt--;
256259
break;
@@ -260,7 +263,11 @@ Baton KafkaConsumer::IncrementalUnassign(std::vector<RdKafka::TopicPartition*> p
260263
}
261264

262265
// Destroy the old list of partitions since we are no longer using it
263-
RdKafka::TopicPartition::destroy(m_partitions);
266+
RdKafka::TopicPartition::destroy(delete_partitions);
267+
268+
// Destroy the partition args since those are only used to lookup the partitions
269+
// that needed to be deleted.
270+
RdKafka::TopicPartition::destroy(partitions);
264271

265272
return rdkafkaErrorToBaton(error);
266273
}

0 commit comments

Comments
 (0)