@@ -343,23 +343,11 @@ class CPPKAFKA_API KafkaHandleBase {
343343#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
344344 /* *
345345 * \brief Sets flags for rd_kafka_destroy_flags()
346- *
347- * 0 (default) - calls consumer_close() during handle destruction
348- * to leave group and commit final offsets.
349- *
350- * RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE - don't call consumer_close()
351- *
352- * Details: see https://github.com/edenhill/librdkafka/blob/8fc678518738c6b80380326dad86ef62228f87f0/src/rdkafka.h#L2612-L2651
353- *
354- * With default value some termination sequences can lead to hang
355- * during destruction, see: https://github.com/edenhill/librdkafka/issues/2077
356- *
357346 */
358347 void set_destroy_flags (int destroy_flags);
359348
360349 /* *
361- * \brief Returns destroy_flags
362- *
350+ * \brief Returns flags for rd_kafka_destroy_flags()
363351 */
364352 int get_destroy_flags () const ;
365353
@@ -381,20 +369,14 @@ class CPPKAFKA_API KafkaHandleBase {
381369private:
382370 static const std::chrono::milliseconds DEFAULT_TIMEOUT;
383371
384- // It seems that destroy_flags maybe RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE by default.
385- //
386- // All the consumer objects created by cppkafka call rd_kafka_consumer_close during destruction.
387- // So we don't want it to be called once again during handle destruction.
388- int destroy_flags_ = 0 ;
389-
390- struct handle_deleter {
391- handle_deleter (const KafkaHandleBase * handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {}
372+ struct HandleDeleter {
373+ explicit HandleDeleter (const KafkaHandleBase* handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {}
392374 void operator ()(rd_kafka_t * handle);
393375 private:
394376 const KafkaHandleBase * handle_base_ptr_;
395377 };
396378
397- using HandlePtr = std::unique_ptr<rd_kafka_t , handle_deleter >;
379+ using HandlePtr = std::unique_ptr<rd_kafka_t , HandleDeleter >;
398380 using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
399381
400382 Topic get_topic (const std::string& name, rd_kafka_topic_conf_t * conf);
@@ -410,6 +392,7 @@ class CPPKAFKA_API KafkaHandleBase {
410392 TopicConfigurationMap topic_configurations_;
411393 std::mutex topic_configurations_mutex_;
412394 HandlePtr handle_;
395+ int destroy_flags_;
413396};
414397
415398} // cppkafka
0 commit comments