From 1707cbddae67407ff7c13116b3fa1995981d88c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1t=C3=A9=20Szab=C3=B3?= Date: Fri, 23 May 2025 20:22:14 +0200 Subject: [PATCH] Fix memory leak when fetching metadata for a single topic When requesting metadata for a single topic, Connection::GetMetadata() calls Connection::CreateTopic() to resolve the provided topic name into a Topic, but fails to deallocate it. To reproduce, compile node-rdkafka and librdkafka with ASAN, then run the following: ```js const { KafkaConsumer } = require('.'); const consumer = new KafkaConsumer({ 'group.id': 'kafka', 'metadata.broker.list': 'localhost:9092', }, {}); consumer.connect({ timeout: 2000 }, function (err) { if (err) { console.error('Error connecting to Kafka:', err); return; } consumer.getMetadata({ topic: 'test' }, function (metadataErr, metadata) { if (metadataErr) { console.error('Error fetching metadata:', metadataErr); } else { console.log(`Metadata: ${JSON.stringify(metadata, null, 2)}`); } consumer.disconnect(); }); }) ``` ASAN will report a leak from GetMetadata(): ``` Indirect leak of 1048 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63fa037 in __interceptor_calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:154 #1 0x7f9dab530394 in rd_calloc /node-rdkafka/deps/librdkafka/src/rd.h:134 #2 0x7f9dab530394 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:349 #3 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533 #4 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string, std::allocator > const&, RdKafka::Conf const*, std::__cxx11::basic_string, std::allocator >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114 #5 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >, RdKafka::Conf*) ../src/connection.cc:115 #6 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >) ../src/connection.cc:104 #7 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string, std::allocator >, int) ../src/connection.cc:198 #8 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 #9 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 #10 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 #11 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 128 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63fa1f8 in __interceptor_realloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:164 #1 0x7f9dab6bf9eb in rd_realloc /node-rdkafka/deps/librdkafka/src/rd.h:146 #2 0x7f9dab6bf9eb in rd_list_grow /node-rdkafka/deps/librdkafka/src/rdlist.c:49 #3 0x7f9dab6bfa9f in rd_list_init /node-rdkafka/deps/librdkafka/src/rdlist.c:57 #4 0x7f9dab530dd7 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:478 #5 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533 #6 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string, std::allocator > const&, RdKafka::Conf const*, std::__cxx11::basic_string, std::allocator >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114 #7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >, RdKafka::Conf*) ../src/connection.cc:115 #8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >) ../src/connection.cc:104 #9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string, std::allocator >, int) ../src/connection.cc:198 #10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 #11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 #12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 #13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 32 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63fb647 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:99 #1 0x7f9dd1f47743 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string, std::allocator > const&, RdKafka::Conf const*, std::__cxx11::basic_string, std::allocator >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:84 #2 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >, RdKafka::Conf*) ../src/connection.cc:115 #3 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >) ../src/connection.cc:104 #4 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string, std::allocator >, int) ../src/connection.cc:198 #5 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 #6 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 #7 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 #8 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 23 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63f9e8f in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:145 #1 0x7f9dab5303ea in rd_malloc /node-rdkafka/deps/librdkafka/src/rd.h:140 #2 0x7f9dab5303ea in rd_kafkap_str_new /node-rdkafka/deps/librdkafka/src/rdkafka_proto.h:315 #3 0x7f9dab5303ea in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:353 #4 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533 #5 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string, std::allocator > const&, RdKafka::Conf const*, std::__cxx11::basic_string, std::allocator >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114 #6 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >, RdKafka::Conf*) ../src/connection.cc:115 #7 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >) ../src/connection.cc:104 #8 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string, std::allocator >, int) ../src/connection.cc:198 #9 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 #10 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 #11 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 #12 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 20 byte(s) in 2 object(s) allocated from: #0 0x7f9dd63a7817 in __interceptor_strdup ../../../../src/libsanitizer/asan/asan_interceptors.cpp:452 #1 0x7f9dab537302 in rd_strdup /node-rdkafka/deps/librdkafka/src/rd.h:157 #2 0x7f9dab537302 in rd_kafka_anyconf_set_prop0 /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:1827 #3 0x7f9dab537c63 in rd_kafka_defaultconf_set /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2273 #4 0x7f9dab5394fd in rd_kafka_topic_conf_new /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2293 #5 0x7f9dab539e9f in rd_kafka_topic_conf_dup /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2725 #6 0x7f9dd1f4794f in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string, std::allocator > const&, RdKafka::Conf const*, std::__cxx11::basic_string, std::allocator >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:89 #7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >, RdKafka::Conf*) ../src/connection.cc:115 #8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string, std::allocator >) ../src/connection.cc:104 #9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string, std::allocator >, int) ../src/connection.cc:198 #10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 #11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 #12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 #13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) ``` The main issue seems to be that `Baton` does not take ownership of pointers it receives, requiring callers to manually dispose of the data on an ad-hoc basis. So, introduce a new typed RAII wrapper class suitable for wrapping the results of a librdkafka operation, and convert CreateTopic() to return it instead. As a potential followup, other methods that currently return a `Baton` could also be incrementally migrated to the new wrapper to reduce the amount of manual memory management required. --- src/connection.cc | 21 ++++++------ src/connection.h | 5 +-- src/kafka-operation-result.h | 63 ++++++++++++++++++++++++++++++++++++ src/producer.cc | 14 +++----- src/topic.cc | 2 +- src/topic.h | 3 +- 6 files changed, 85 insertions(+), 23 deletions(-) create mode 100644 src/kafka-operation-result.h diff --git a/src/connection.cc b/src/connection.cc index f76e012f..95e2ddff 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -12,6 +12,7 @@ #include #include "src/connection.h" +#include "kafka-operation-result.h" #include "src/workers.h" using RdKafka::Conf; @@ -100,11 +101,11 @@ RdKafka::Handle* Connection::GetClient() { return m_client; } -Baton Connection::CreateTopic(std::string topic_name) { +KafkaOperationResult Connection::CreateTopic(std::string topic_name) { return CreateTopic(topic_name, NULL); } -Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) { +KafkaOperationResult Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) { std::string errstr; RdKafka::Topic* topic = NULL; @@ -114,19 +115,19 @@ Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) { if (IsConnected()) { topic = RdKafka::Topic::create(m_client, topic_name, conf, errstr); } else { - return Baton(RdKafka::ErrorCode::ERR__STATE); + return KafkaOperationResult(RdKafka::ErrorCode::ERR__STATE); } } else { - return Baton(RdKafka::ErrorCode::ERR__STATE); + return KafkaOperationResult(RdKafka::ErrorCode::ERR__STATE); } if (!errstr.empty()) { - return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr); + return KafkaOperationResult(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr); } // Maybe do it this way later? Then we don't need to do static_cast // - return Baton(topic); + return KafkaOperationResult(topic); } Baton Connection::QueryWatermarkOffsets( @@ -189,15 +190,15 @@ Baton Connection::OffsetsForTimes( Baton Connection::GetMetadata( bool all_topics, std::string topic_name, int timeout_ms) { - RdKafka::Topic* topic = NULL; + std::unique_ptr topic{}; RdKafka::ErrorCode err; std::string errstr; if (!topic_name.empty()) { - Baton b = CreateTopic(topic_name); + KafkaOperationResult b = CreateTopic(topic_name); if (b.err() == RdKafka::ErrorCode::ERR_NO_ERROR) { - topic = b.data(); + topic = b.take_ownership(); } } @@ -211,7 +212,7 @@ Baton Connection::GetMetadata( scoped_shared_read_lock lock(m_connection_lock); if (IsConnected()) { // Always send true - we - err = m_client->metadata(all_topics, topic, &metadata, timeout_ms); + err = m_client->metadata(all_topics, topic.get(), &metadata, timeout_ms); } else { err = RdKafka::ERR__STATE; } diff --git a/src/connection.h b/src/connection.h index bba8a7c7..602e8d17 100644 --- a/src/connection.h +++ b/src/connection.h @@ -21,6 +21,7 @@ #include "src/errors.h" #include "src/config.h" #include "src/callbacks.h" +#include "src/kafka-operation-result.h" namespace NodeKafka { @@ -56,8 +57,8 @@ class Connection : public Nan::ObjectWrap { bool IsClosing(); // Baton - Baton CreateTopic(std::string); - Baton CreateTopic(std::string, RdKafka::Conf*); + KafkaOperationResult CreateTopic(std::string); + KafkaOperationResult CreateTopic(std::string, RdKafka::Conf*); Baton GetMetadata(bool, std::string, int); Baton QueryWatermarkOffsets(std::string, int32_t, int64_t*, int64_t*, int); Baton OffsetsForTimes(std::vector &, int); diff --git a/src/kafka-operation-result.h b/src/kafka-operation-result.h new file mode 100644 index 00000000..cc240efa --- /dev/null +++ b/src/kafka-operation-result.h @@ -0,0 +1,63 @@ +#ifndef SRC_KAFKA_OPERATION_RESULT_H_ +#define SRC_KAFKA_OPERATION_RESULT_H_ + +#include +#include +#include + +#include "rdkafkacpp.h" + +namespace NodeKafka { +/** + * Type-safe wrapper for the result of an RDKafka library operation. + */ +template +class KafkaOperationResult { + public: + /** + * Constructor for a successful operation result. + * Takes ownership of the data pointer. + */ + explicit KafkaOperationResult(T* data) + : m_data(data), m_err(RdKafka::ErrorCode::ERR_NO_ERROR) {} + explicit KafkaOperationResult(RdKafka::ErrorCode err) + : m_data(nullptr), m_err(err) {} + explicit KafkaOperationResult(RdKafka::ErrorCode err, std::string errstr) + : m_data(nullptr), m_err(err), m_errstr(errstr) {} + + /** + * Get a non-owning pointer to the result data. + * Only should be called for non-error results. + */ + T* data() const { + assert(m_data != nullptr); + return m_data.get(); + } + + /** + * Transfer ownership of the result data to the caller. + * Only should be called for non-error results. + */ + std::unique_ptr take_ownership() { + assert(m_data != nullptr); + std::unique_ptr data = std::move(m_data); + m_data.reset(); + return data; + } + + RdKafka::ErrorCode err() const { + return m_err; + } + + std::string errstr() const { + return m_errstr.empty() ? RdKafka::err2str(m_err) : m_errstr; + } + + private: + std::unique_ptr m_data; + RdKafka::ErrorCode m_err; + std::string m_errstr; +}; +} // namespace NodeKafka + +#endif // SRC_KAFKA_OPERATION_RESULT_H_ diff --git a/src/producer.cc b/src/producer.cc index 8e20320a..90e214fb 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -12,6 +12,7 @@ #include "src/producer.h" #include "src/kafka-consumer.h" +#include "src/kafka-operation-result.h" #include "src/workers.h" namespace NodeKafka { @@ -617,22 +618,17 @@ NAN_METHOD(Producer::NodeProduce) { Topic* topic = ObjectWrap::Unwrap(info[0].As()); // Unwrap it and turn it into an RdKafka::Topic* - Baton topic_baton = topic->toRDKafkaTopic(producer); + KafkaOperationResult topic_result = topic->toRDKafkaTopic(producer); - if (topic_baton.err() != RdKafka::ERR_NO_ERROR) { + if (topic_result.err() != RdKafka::ERR_NO_ERROR) { // Let the JS library throw if we need to so the error can be more rich - error_code = static_cast(topic_baton.err()); + error_code = static_cast(topic_result.err()); return info.GetReturnValue().Set(Nan::New(error_code)); } - RdKafka::Topic* rd_topic = topic_baton.data(); - Baton b = producer->Produce(message_buffer_data, message_buffer_length, - rd_topic, partition, key_buffer_data, key_buffer_length, opaque); - - // Delete the topic when we are done. - delete rd_topic; + topic_result.data(), partition, key_buffer_data, key_buffer_length, opaque); error_code = static_cast(b.err()); } diff --git a/src/topic.cc b/src/topic.cc index 4c9d663a..98f6c568 100644 --- a/src/topic.cc +++ b/src/topic.cc @@ -45,7 +45,7 @@ std::string Topic::name() { return m_topic_name; } -Baton Topic::toRDKafkaTopic(Connection* handle) { +KafkaOperationResult Topic::toRDKafkaTopic(Connection* handle) { if (m_config) { return handle->CreateTopic(m_topic_name, m_config); } else { diff --git a/src/topic.h b/src/topic.h index 425bf7fd..4518e8fc 100644 --- a/src/topic.h +++ b/src/topic.h @@ -16,6 +16,7 @@ #include "rdkafkacpp.h" #include "src/config.h" +#include "src/kafka-operation-result.h" namespace NodeKafka { @@ -24,7 +25,7 @@ class Topic : public Nan::ObjectWrap { static void Init(v8::Local); static v8::Local NewInstance(v8::Local arg); - Baton toRDKafkaTopic(Connection *handle); + KafkaOperationResult toRDKafkaTopic(Connection *handle); protected: static Nan::Persistent constructor;