Skip to content

Commit d6f8129

Browse files
acceleratedmfontanini
authored andcommitted
Fix for ref count on queue handles (#92)
* Fix for ref count on queue handles * added check for rdkafka version * changed to runtime version checking
1 parent 3238c94 commit d6f8129

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

include/cppkafka/consumer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
403403
* \return A Queue object
404404
*
405405
* \remark Note that this call will disable forwarding to the consumer_queue.
406-
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
406+
* To restore forwarding if desired, call Queue::forward_to_queue(consumer_queue)
407407
*/
408408
Queue get_main_queue() const;
409409

@@ -424,7 +424,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
424424
* \return A Queue object
425425
*
426426
* \remark Note that this call will disable forwarding to the consumer_queue.
427-
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
427+
* To restore forwarding if desired, call Queue::forward_to_queue(consumer_queue)
428428
*/
429429
Queue get_partition_queue(const TopicPartition& partition) const;
430430
private:

src/consumer.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,17 @@ using std::equal;
4747

4848
namespace cppkafka {
4949

50+
// See: https://github.com/edenhill/librdkafka/issues/1792
51+
const int rd_kafka_queue_refcount_bug_version = 0x000b0500;
52+
Queue get_queue(rd_kafka_queue_t* handle) {
53+
if (rd_kafka_version() <= rd_kafka_queue_refcount_bug_version) {
54+
return Queue::make_non_owning(handle);
55+
}
56+
else {
57+
return Queue(handle);
58+
}
59+
}
60+
5061
void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
5162
rd_kafka_topic_partition_list_t *partitions, void *opaque) {
5263
TopicPartitionList list = convert(partitions);
@@ -262,19 +273,19 @@ MessageList Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
262273
}
263274

264275
Queue Consumer::get_main_queue() const {
265-
Queue queue(Queue::make_non_owning(rd_kafka_queue_get_main(get_handle())));
276+
Queue queue(get_queue(rd_kafka_queue_get_main(get_handle())));
266277
queue.disable_queue_forwarding();
267278
return queue;
268279
}
269280

270281
Queue Consumer::get_consumer_queue() const {
271-
return Queue::make_non_owning(rd_kafka_queue_get_consumer(get_handle()));
282+
return get_queue(rd_kafka_queue_get_consumer(get_handle()));
272283
}
273284

274285
Queue Consumer::get_partition_queue(const TopicPartition& partition) const {
275-
Queue queue(Queue::make_non_owning(rd_kafka_queue_get_partition(get_handle(),
276-
partition.get_topic().c_str(),
277-
partition.get_partition())));
286+
Queue queue(get_queue(rd_kafka_queue_get_partition(get_handle(),
287+
partition.get_topic().c_str(),
288+
partition.get_partition())));
278289
queue.disable_queue_forwarding();
279290
return queue;
280291
}

0 commit comments

Comments
 (0)