Skip to content

Commit df04b27

Browse files
shashank88mfontanini
authored andcommitted
Fix #104: memory leak in poll_batch (#107)
poll_batch currently leaks memory while initialising the queue returned by rd_kafka_queue_get_consumer. The fix as suggested by @mfontanini as done here is to initialise the queue with a Queue so it's cleaned up when going out of scope.
1 parent d6f8129 commit df04b27

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

src/consumer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,9 @@ MessageList Consumer::poll_batch(size_t max_batch_size) {
261261

262262
MessageList Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
263263
vector<rd_kafka_message_t*> raw_messages(max_batch_size);
264-
rd_kafka_queue_t* queue = rd_kafka_queue_get_consumer(get_handle());
265-
ssize_t result = rd_kafka_consume_batch_queue(queue, timeout.count(), raw_messages.data(),
264+
// Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment)
265+
Queue queue(get_queue(rd_kafka_queue_get_consumer(get_handle())));
266+
ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle() , timeout.count(), raw_messages.data(),
266267
raw_messages.size());
267268
if (result == -1) {
268269
check_error(rd_kafka_last_error());

0 commit comments

Comments
 (0)