-
Notifications
You must be signed in to change notification settings - Fork 99
Open
Description
I created headers in a local function, and their values were created by dynamically allocating memory in the heap. Then, I store these pointers ( and I‘ve never freed them ):
long n1;
long n2;
kafka::Headers headers;
auto pN1Bytes = (decltype(n1)*)malloc(sizeof(n1));
*pN1Bytes = n1;
auto pN2Bytes = (decltype(n2)*)malloc(sizeof(n2));
*pN2Bytes = n2;
headers.push_back(kafka::Header(kafka::Header::Key("n1"), kafka::Header::Value(pN1Bytes, sizeof(n1))));
headers.push_back(kafka::Header(kafka::Header::Key("n2"), kafka::Header::Value(pN2Bytes, sizeof(n2))));but it occasionally crashes in rd_kafka_headers_destroy, here is the stack:

I sent msg using the following method, and this method is called sequentially:
bool KafkaProducerService::Produce(const kafka::Key& key, const std::string& msg, const std::string& topic, const kafka::Headers& headers)
{
auto ret = true;
try {
auto record = kafka::clients::producer::ProducerRecord(topic, mPartition, key, kafka::Value(msg.c_str(), msg.size()));
record.headers() = headers;
mpProducer->send(
record,
mProduceCallback,
kafka::clients::producer::KafkaProducer::SendOption::ToCopyRecordValue,
kafka::clients::producer::KafkaProducer::ActionWhileQueueIsFull::NoBlock
);
mpProducer->flush(std::chrono::milliseconds(50));
}
catch (const kafka::KafkaException& e) {
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
ret = false;
}
return ret;
}- version:
-- mordern-cpp-kafka: v2024.07.03
-- librdkafka: 2.4.0
Metadata
Metadata
Assignees
Labels
No labels