Skip to content

Commit 4fa50ba

Browse files
committed
fix: rd_kafka_event_t is not destroyed
1 parent 3c0e925 commit 4fa50ba

File tree

4 files changed

+70
-18
lines changed

4 files changed

+70
-18
lines changed

create_topic.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
#pragma once
1717

18+
#include "rk_event_wrapper.h"
1819
#include <array>
1920
#include <iostream>
2021
#include <librdkafka/rdkafka.h>
@@ -38,12 +39,12 @@ inline void create_topic(rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
3839

3940
rd_kafka_CreateTopics(rk, &rk_topic, 1, nullptr, rkqu);
4041

41-
if (auto event = rd_kafka_queue_poll(rkqu, -1 /* infinite timeout */);
42-
rd_kafka_event_error(event)) {
43-
std::cerr << "CreateTopics failed for " << topic << ": "
44-
<< rd_kafka_event_error_string(event);
45-
} else {
42+
try {
43+
RdKafkaEvent::poll(rk, rkqu);
4644
std::cout << R"(Created topic ")" << topic << R"(" with )" << num_partitions
4745
<< " partition" << (num_partitions == 1 ? "" : "s") << std::endl;
46+
} catch (const std::runtime_error &e) {
47+
std::cerr << "CreateTopics failed for " << topic << ": " << e.what();
48+
return;
4849
}
4950
}

delete_topic.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
#pragma once
1717

18+
#include "rk_event_wrapper.h"
1819
#include <iostream>
1920
#include <librdkafka/rdkafka.h>
2021
#include <memory>
@@ -31,12 +32,9 @@ inline void delete_topic(rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
3132

3233
rd_kafka_DeleteTopics(rk, &rk_topic, 1, nullptr, rkqu);
3334

34-
if (auto event = rd_kafka_queue_poll(rkqu, -1 /* infinite timeout */);
35-
rd_kafka_event_error(event)) {
36-
std::cerr << "DeleteTopics failed for " << topic << ": "
37-
<< rd_kafka_event_error_string(event);
38-
} else {
39-
auto result = rd_kafka_event_DeleteTopics_result(event);
35+
try {
36+
auto event = RdKafkaEvent::poll(rk, rkqu);
37+
auto result = rd_kafka_event_DeleteTopics_result(event.handle());
4038
size_t cntp;
4139
auto topics = rd_kafka_DeleteTopics_result_topics(result, &cntp);
4240
if (cntp != 1) {
@@ -50,5 +48,8 @@ inline void delete_topic(rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
5048
std::cerr << R"(Failed to delete topic ")" << topic << R"(": )" << error
5149
<< std::endl;
5250
}
51+
} catch (const std::runtime_error &e) {
52+
std::cerr << "DeleteTopics failed for " << topic << ": " << e.what()
53+
<< std::endl;
5354
}
5455
}

describe_topic.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
*/
1616
#pragma once
1717

18-
#include <argparse/argparse.hpp>
18+
#include "rk_event_wrapper.h"
1919
#include <iostream>
2020
#include <librdkafka/rdkafka.h>
21+
#include <stdexcept>
2122
#include <string>
2223

2324
inline void describe_topic(rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
@@ -30,12 +31,9 @@ inline void describe_topic(rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
3031

3132
rd_kafka_DescribeTopics(rk, topic_names, nullptr, rkqu);
3233

33-
if (auto event = rd_kafka_queue_poll(rkqu, -1 /* infinite timeout */);
34-
rd_kafka_event_error(event)) {
35-
std::cerr << "DescribeTopics failed for " << topic << ": "
36-
<< rd_kafka_event_error_string(event);
37-
} else {
38-
auto result = rd_kafka_event_DescribeTopics_result(event);
34+
try {
35+
auto event = RdKafkaEvent::poll(rk, rkqu);
36+
auto result = rd_kafka_event_DescribeTopics_result(event.handle());
3937
size_t result_topics_cnt;
4038
auto result_topics =
4139
rd_kafka_DescribeTopics_result_topics(result, &result_topics_cnt);
@@ -66,5 +64,8 @@ inline void describe_topic(rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
6664
}
6765
}
6866
}
67+
} catch (const std::runtime_error &e) {
68+
std::cerr << "DescribeTopics failed for " << topic << ": " << e.what()
69+
<< std::endl;
6970
}
7071
}

rk_event_wrapper.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright 2025 Yunze Xu
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <librdkafka/rdkafka.h>
19+
#include <stdexcept>
20+
21+
class RdKafkaEvent final {
22+
public:
23+
static RdKafkaEvent poll(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
24+
auto event = rd_kafka_queue_poll(rkqu, -1 /* infinite timeout */);
25+
if (rd_kafka_event_error(event)) {
26+
throw std::runtime_error(rd_kafka_event_error_string(event));
27+
}
28+
return RdKafkaEvent(event);
29+
}
30+
31+
RdKafkaEvent(const RdKafkaEvent &) = delete;
32+
RdKafkaEvent(RdKafkaEvent &&rhs) noexcept : event_(rhs.event_) {
33+
rhs.event_ = nullptr;
34+
}
35+
36+
~RdKafkaEvent() {
37+
if (event_ != nullptr) {
38+
rd_kafka_event_destroy(event_);
39+
}
40+
}
41+
42+
// Get the underlying C handle for rdkafka's C APIs to use
43+
auto handle() const noexcept { return event_; }
44+
45+
private:
46+
rd_kafka_event_t *event_;
47+
48+
RdKafkaEvent(rd_kafka_event_t *event) : event_(event) {}
49+
};

0 commit comments

Comments
 (0)