Skip to content

Commit ca3a132

Browse files
authored
Merge pull request #247 from filimonov/kafka_destroy_flags3
Add support for rd_kafka_destroy_flags.
2 parents 006642c + 244726c commit ca3a132

File tree

3 files changed

+45
-3
lines changed

3 files changed

+45
-3
lines changed

include/cppkafka/kafka_handle_base.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,18 @@ class CPPKAFKA_API KafkaHandleBase {
340340
*/
341341
int get_out_queue_length() const;
342342

343+
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
344+
/**
345+
* \brief Sets flags for rd_kafka_destroy_flags()
346+
*/
347+
void set_destroy_flags(int destroy_flags);
348+
349+
/**
350+
* \brief Returns flags for rd_kafka_destroy_flags()
351+
*/
352+
int get_destroy_flags() const;
353+
354+
#endif
343355
/**
344356
* \brief Cancels the current callback dispatcher
345357
*
@@ -357,7 +369,14 @@ class CPPKAFKA_API KafkaHandleBase {
357369
private:
358370
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
359371

360-
using HandlePtr = std::unique_ptr<rd_kafka_t, decltype(&rd_kafka_destroy)>;
372+
struct HandleDeleter {
373+
explicit HandleDeleter(const KafkaHandleBase* handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {}
374+
void operator()(rd_kafka_t* handle);
375+
private:
376+
const KafkaHandleBase * handle_base_ptr_;
377+
};
378+
379+
using HandlePtr = std::unique_ptr<rd_kafka_t, HandleDeleter>;
361380
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
362381

363382
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
@@ -373,6 +392,7 @@ class CPPKAFKA_API KafkaHandleBase {
373392
TopicConfigurationMap topic_configurations_;
374393
std::mutex topic_configurations_mutex_;
375394
HandlePtr handle_;
395+
int destroy_flags_;
376396
};
377397

378398
} // cppkafka

include/cppkafka/macros.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,6 @@
5151
#define RD_KAFKA_EVENT_STATS_SUPPORT_VERSION 0x000b0000 //v0.11.0.00
5252
#define RD_KAFKA_MESSAGE_STATUS_SUPPORT_VERSION 0x01000002 //v1.0.0.02
5353
#define RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION 0x00090501 //v0.9.5.01
54+
#define RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION 0x000b0600 //v0.11.6
5455

5556
#endif // CPPKAFKA_MACROS_H

src/kafka_handle_base.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ namespace cppkafka {
4848
const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000};
4949

5050
KafkaHandleBase::KafkaHandleBase(Configuration config)
51-
: timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)), handle_(nullptr, nullptr) {
51+
: timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)), handle_(nullptr, HandleDeleter(this)), destroy_flags_(0) {
5252
auto& maybe_config = config_.get_default_topic_configuration();
5353
if (maybe_config) {
5454
maybe_config->set_as_opaque();
@@ -213,7 +213,7 @@ void KafkaHandleBase::yield() const {
213213
}
214214

215215
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
216-
handle_ = HandlePtr(handle, &rd_kafka_destroy);
216+
handle_ = HandlePtr(handle, HandleDeleter(this));
217217
}
218218

219219
Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) {
@@ -285,4 +285,25 @@ rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() {
285285
return config_.get_handle();
286286
}
287287

288+
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
289+
290+
void KafkaHandleBase::set_destroy_flags(int destroy_flags) {
291+
destroy_flags_ = destroy_flags;
292+
};
293+
294+
int KafkaHandleBase::get_destroy_flags() const {
295+
return destroy_flags_;
296+
};
297+
298+
#endif
299+
300+
301+
void KafkaHandleBase::HandleDeleter::operator()(rd_kafka_t* handle) {
302+
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
303+
rd_kafka_destroy_flags(handle, handle_base_ptr_->get_destroy_flags());
304+
#else
305+
rd_kafka_destroy(handle);
306+
#endif
307+
}
308+
288309
} // cppkafka

0 commit comments

Comments
 (0)