- 
                Notifications
    You must be signed in to change notification settings 
- Fork 407
Fix memory leak when fetching metadata for a single topic #1130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
          
     Open
      
      
            mszabo-wikia
  wants to merge
  1
  commit into
  Blizzard:master
  
    
      
        
          
  
    
      Choose a base branch
      
     
    
      
        
      
      
        
          
          
        
        
          
            
              
              
              
  
           
        
        
          
            
              
              
           
        
       
     
  
        
          
            
          
            
          
        
       
    
      
from
mszabo-wikia:fix-gettopic-leak
  
      
      
   
  
    
  
  
  
 
  
      
    base: master
Could not load branches
            
              
  
    Branch not found: {{ refName }}
  
            
                
      Loading
              
            Could not load tags
            
            
              Nothing to show
            
              
  
            
                
      Loading
              
            Are you sure you want to change the base?
            Some commits from the old base branch may be removed from the timeline,
            and old review comments may become outdated.
          
          
      
        
          +85
        
        
          −23
        
        
          
        
      
    
  
Conversation
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
    When requesting metadata for a single topic, Connection::GetMetadata()
calls Connection::CreateTopic() to resolve the provided topic name into
a Topic, but fails to deallocate it.
To reproduce, compile node-rdkafka and librdkafka with ASAN, then run
the following:
```js
const { KafkaConsumer } = require('.');
const consumer = new KafkaConsumer({
  'group.id': 'kafka',
  'metadata.broker.list': 'localhost:9092',
}, {});
consumer.connect({ timeout: 2000 }, function (err) {
  if (err) {
    console.error('Error connecting to Kafka:', err);
    return;
  }
  consumer.getMetadata({ topic: 'test' }, function (metadataErr, metadata) {
    if (metadataErr) {
      console.error('Error fetching metadata:', metadataErr);
    } else {
      console.log(`Metadata: ${JSON.stringify(metadata, null, 2)}`);
    }
    consumer.disconnect();
  });
})
```
ASAN will report a leak from GetMetadata():
```
Indirect leak of 1048 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63fa037 in __interceptor_calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:154
    Blizzard#1 0x7f9dab530394 in rd_calloc /node-rdkafka/deps/librdkafka/src/rd.h:134
    Blizzard#2 0x7f9dab530394 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:349
    Blizzard#3 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533
    Blizzard#4 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114
    Blizzard#5 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#6 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#7 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#8 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#9 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#10 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#11 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)
Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63fa1f8 in __interceptor_realloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:164
    Blizzard#1 0x7f9dab6bf9eb in rd_realloc /node-rdkafka/deps/librdkafka/src/rd.h:146
    Blizzard#2 0x7f9dab6bf9eb in rd_list_grow /node-rdkafka/deps/librdkafka/src/rdlist.c:49
    Blizzard#3 0x7f9dab6bfa9f in rd_list_init /node-rdkafka/deps/librdkafka/src/rdlist.c:57
    Blizzard#4 0x7f9dab530dd7 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:478
    Blizzard#5 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533
    Blizzard#6 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114
    Blizzard#7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)
Indirect leak of 32 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63fb647 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:99
    Blizzard#1 0x7f9dd1f47743 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:84
    Blizzard#2 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#3 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#4 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#5 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#6 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#7 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#8 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)
Indirect leak of 23 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63f9e8f in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:145
    Blizzard#1 0x7f9dab5303ea in rd_malloc /node-rdkafka/deps/librdkafka/src/rd.h:140
    Blizzard#2 0x7f9dab5303ea in rd_kafkap_str_new /node-rdkafka/deps/librdkafka/src/rdkafka_proto.h:315
    Blizzard#3 0x7f9dab5303ea in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:353
    Blizzard#4 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533
    Blizzard#5 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114
    Blizzard#6 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#7 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#8 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#9 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#10 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#11 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#12 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)
Indirect leak of 20 byte(s) in 2 object(s) allocated from:
    #0 0x7f9dd63a7817 in __interceptor_strdup ../../../../src/libsanitizer/asan/asan_interceptors.cpp:452
    Blizzard#1 0x7f9dab537302 in rd_strdup /node-rdkafka/deps/librdkafka/src/rd.h:157
    Blizzard#2 0x7f9dab537302 in rd_kafka_anyconf_set_prop0 /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:1827
    Blizzard#3 0x7f9dab537c63 in rd_kafka_defaultconf_set /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2273
    Blizzard#4 0x7f9dab5394fd in rd_kafka_topic_conf_new /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2293
    Blizzard#5 0x7f9dab539e9f in rd_kafka_topic_conf_dup /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2725
    Blizzard#6 0x7f9dd1f4794f in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:89
    Blizzard#7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)
```
The main issue seems to be that `Baton` does not take ownership of
pointers it receives, requiring callers to manually dispose of the data
on an ad-hoc basis. So, introduce a new typed RAII wrapper class
suitable for wrapping the results of a librdkafka operation, and convert
CreateTopic() to return it instead.
As a potential followup, other methods that currently return a `Baton`
could also be incrementally migrated to the new wrapper to reduce the
amount of manual memory management required.
    
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment
  
      
  Add this suggestion to a batch that can be applied as a single commit.
  This suggestion is invalid because no changes were made to the code.
  Suggestions cannot be applied while the pull request is closed.
  Suggestions cannot be applied while viewing a subset of changes.
  Only one suggestion per line can be applied in a batch.
  Add this suggestion to a batch that can be applied as a single commit.
  Applying suggestions on deleted lines is not supported.
  You must change the existing code in this line in order to create a valid suggestion.
  Outdated suggestions cannot be applied.
  This suggestion has been applied or marked resolved.
  Suggestions cannot be applied from pending reviews.
  Suggestions cannot be applied on multi-line comments.
  Suggestions cannot be applied while the pull request is queued to merge.
  Suggestion cannot be applied right now. Please check back later.
  
    
  
    
When requesting metadata for a single topic, Connection::GetMetadata() calls Connection::CreateTopic() to resolve the provided topic name into a Topic, but fails to deallocate it.
To reproduce, compile node-rdkafka and librdkafka with ASAN, then run the following:
ASAN will report a leak from GetMetadata():
The main issue seems to be that
Batondoes not take ownership of pointers it receives, requiring callers to manually dispose of the data on an ad-hoc basis. So, introduce a new typed RAII wrapper class suitable for wrapping the results of a librdkafka operation, and convert CreateTopic() to return it instead.As a potential followup, other methods that currently return a
Batoncould also be incrementally migrated to the new wrapper to reduce the amount of manual memory management required.