@@ -340,6 +340,30 @@ class CPPKAFKA_API KafkaHandleBase {
340340 */
341341 int get_out_queue_length () const ;
342342
343+ #if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
344+ /* *
345+ * \brief Sets flags for rd_kafka_destroy_flags()
346+ *
347+ * 0 (default) - calls consumer_close() during handle destruction
348+ * to leave group and commit final offsets.
349+ *
350+ * RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE - don't call consumer_close()
351+ *
352+ * Details: see https://github.com/edenhill/librdkafka/blob/8fc678518738c6b80380326dad86ef62228f87f0/src/rdkafka.h#L2612-L2651
353+ *
354+ * With default value some termination sequences can lead to hang
355+ * during destruction, see: https://github.com/edenhill/librdkafka/issues/2077
356+ *
357+ */
358+ void set_destroy_flags (int destroy_flags);
359+
360+ /* *
361+ * \brief Returns destroy_flags
362+ *
363+ */
364+ int get_destroy_flags () const ;
365+
366+ #endif
343367 /* *
344368 * \brief Cancels the current callback dispatcher
345369 *
@@ -357,7 +381,20 @@ class CPPKAFKA_API KafkaHandleBase {
357381private:
358382 static const std::chrono::milliseconds DEFAULT_TIMEOUT;
359383
360- using HandlePtr = std::unique_ptr<rd_kafka_t , decltype (&rd_kafka_destroy)>;
384+ // It seems that destroy_flags maybe RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE by default.
385+ //
386+ // All the consumer objects created by cppkafka call rd_kafka_consumer_close during destruction.
387+ // So we don't want it to be called once again during handle destruction.
388+ int destroy_flags_ = 0 ;
389+
390+ struct handle_deleter {
391+ handle_deleter (const KafkaHandleBase * handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {}
392+ void operator ()(rd_kafka_t * handle);
393+ private:
394+ const KafkaHandleBase * handle_base_ptr_;
395+ };
396+
397+ using HandlePtr = std::unique_ptr<rd_kafka_t , handle_deleter>;
361398 using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
362399
363400 Topic get_topic (const std::string& name, rd_kafka_topic_conf_t * conf);
0 commit comments