@@ -483,7 +483,9 @@ class CPPKAFKA_API BufferedProducer {
483483
484484private:
485485 enum class SenderType { Sync, Async };
486-
486+ enum class QueueKind { Retry, Regular };
487+ enum class FlushAction { DontFlush, DoFlush };
488+
487489 template <typename T>
488490 struct CounterGuard {
489491 CounterGuard (std::atomic<T>& counter) : counter_(counter) { ++counter_; }
@@ -518,19 +520,21 @@ class CPPKAFKA_API BufferedProducer {
518520 return nullptr ;
519521 }
520522 template <typename BuilderType>
521- void do_add_message (BuilderType&& builder, bool is_retry, bool do_flush );
523+ void do_add_message (BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action );
522524 template <typename BuilderType>
523525 void produce_message (BuilderType&& builder);
524526 Configuration prepare_configuration (Configuration config);
525527 void on_delivery_report (const Message& message);
526528 template <typename BuilderType>
527529 void async_produce (BuilderType&& message, bool throw_on_error);
530+ static void swap_queues (QueueType & queue1, QueueType & queue2, std::mutex & mutex);
528531
529532 // Members
530533 Producer producer_;
531534 QueueType messages_;
532535 QueueType retry_messages_;
533536 mutable std::mutex mutex_;
537+ mutable std::mutex retry_mutex_;
534538 ProduceSuccessCallback produce_success_callback_;
535539 ProduceFailureCallback produce_failure_callback_;
536540 ProduceTerminationCallback produce_termination_callback_;
@@ -581,7 +585,7 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder&
581585template <typename BufferType, typename Allocator>
582586void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
583587 add_tracker (SenderType::Async, builder);
584- do_add_message (move (builder), false , true );
588+ do_add_message (move (builder), QueueKind::Regular, FlushAction::DoFlush );
585589}
586590
587591template <typename BufferType, typename Allocator>
@@ -625,40 +629,36 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
625629template <typename BufferType, typename Allocator>
626630void BufferedProducer<BufferType, Allocator>::async_flush() {
627631 CounterGuard<size_t > counter_guard (flushes_in_progress_);
628- auto queue_flusher = [this ](QueueType& queue)->void
632+ auto queue_flusher = [this ](QueueType& queue, std::mutex & mutex )->void
629633 {
630634 QueueType flush_queue; // flush from temporary queue
631- {
632- std::lock_guard<std::mutex> lock (mutex_);
633- std::swap (queue, flush_queue);
634- }
635+ swap_queues (queue, flush_queue, mutex);
636+
635637 while (!flush_queue.empty ()) {
636638 async_produce (std::move (flush_queue.front ()), false );
637639 flush_queue.pop_front ();
638640 }
639641 };
640- queue_flusher (retry_messages_);
641- queue_flusher (messages_);
642+ queue_flusher (retry_messages_, retry_mutex_ );
643+ queue_flusher (messages_, mutex_ );
642644}
643645
644646template <typename BufferType, typename Allocator>
645647void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
646648 if (preserve_order) {
647649 CounterGuard<size_t > counter_guard (flushes_in_progress_);
648- auto queue_flusher = [this ](QueueType& queue)->void
650+ auto queue_flusher = [this ](QueueType& queue, std::mutex & mutex )->void
649651 {
650652 QueueType flush_queue; // flush from temporary queue
651- {
652- std::lock_guard<std::mutex> lock (mutex_);
653- std::swap (queue, flush_queue);
654- }
653+ swap_queues (queue, flush_queue, mutex);
654+
655655 while (!flush_queue.empty ()) {
656- sync_produce (flush_queue.front ());
657- flush_queue.pop_front ();
656+ sync_produce (flush_queue.front ());
657+ flush_queue.pop_front ();
658658 }
659659 };
660- queue_flusher (retry_messages_);
661- queue_flusher (messages_);
660+ queue_flusher (retry_messages_, retry_mutex_ );
661+ queue_flusher (messages_, mutex_ );
662662 }
663663 else {
664664 async_flush ();
@@ -672,12 +672,10 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
672672 if (preserve_order) {
673673 CounterGuard<size_t > counter_guard (flushes_in_progress_);
674674 QueueType flush_queue; // flush from temporary queue
675+ swap_queues (messages_, flush_queue, mutex_);
675676 QueueType retry_flush_queue; // flush from temporary retry queue
676- {
677- std::lock_guard<std::mutex> lock (mutex_);
678- std::swap (retry_messages_, retry_flush_queue);
679- std::swap (messages_, flush_queue);
680- }
677+ swap_queues (retry_messages_, retry_flush_queue, retry_mutex_);
678+
681679 auto queue_flusher = [this ](QueueType& queue)->bool
682680 {
683681 if (!queue.empty ()) {
@@ -699,17 +697,17 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
699697 } while (remaining.count () > 0 );
700698
701699 // Re-enqueue remaining messages in original order
702- auto re_enqueuer = [this ](QueueType& src_queue, QueueType& dst_queue)->void
700+ auto re_enqueuer = [this ](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex )->void
703701 {
704702 if (!src_queue.empty ()) {
705- std::lock_guard<std::mutex> lock (mutex_ );
703+ std::lock_guard<std::mutex> lock (mutex );
706704 dst_queue.insert (dst_queue.begin (),
707705 std::make_move_iterator (src_queue.begin ()),
708706 std::make_move_iterator (src_queue.end ()));
709707 }
710708 };
711- re_enqueuer (retry_flush_queue, retry_messages_);
712- re_enqueuer (flush_queue, messages_);
709+ re_enqueuer (retry_flush_queue, retry_messages_, retry_mutex_ );
710+ re_enqueuer (flush_queue, messages_, mutex_ );
713711 }
714712 else {
715713 async_flush ();
@@ -762,11 +760,10 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
762760
763761template <typename BufferType, typename Allocator>
764762void BufferedProducer<BufferType, Allocator>::clear() {
765- std::lock_guard<std::mutex> lock (mutex_);
766763 QueueType tmp;
767- std::swap ( tmp, messages_ );
764+ swap_queues (messages_, tmp, mutex_ );
768765 QueueType retry_tmp;
769- std::swap ( retry_tmp, retry_messages_ );
766+ swap_queues (retry_messages_, retry_tmp, retry_mutex_ );
770767}
771768
772769template <typename BufferType, typename Allocator>
@@ -801,21 +798,20 @@ BufferedProducer<BufferType, Allocator>::get_flush_method() const {
801798template <typename BufferType, typename Allocator>
802799template <typename BuilderType>
803800void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder,
804- bool is_retry,
805- bool do_flush) {
806- {
801+ QueueKind queue_kind,
802+ FlushAction flush_action) {
803+ if (queue_kind == QueueKind::Retry) {
804+ std::lock_guard<std::mutex> lock (retry_mutex_);
805+ retry_messages_.emplace_back (std::forward<BuilderType>(builder));
806+ }
807+ else {
807808 std::lock_guard<std::mutex> lock (mutex_);
808- if (is_retry) {
809- retry_messages_.emplace_back (std::forward<BuilderType>(builder));
810- }
811- else {
812- messages_.emplace_back (std::forward<BuilderType>(builder));
813- }
809+ messages_.emplace_back (std::forward<BuilderType>(builder));
814810 }
815811
816812 // Flush the queues only if a regular message is added. Retry messages may be added
817813 // from rdkafka callbacks, and flush/async_flush is a user-level call
818- if (!is_retry && do_flush && (max_buffer_size_ >= 0 ) && (max_buffer_size_ <= get_buffer_size ())) {
814+ if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0 ) && (max_buffer_size_ <= get_buffer_size ())) {
819815 if (flush_method_ == FlushMethod::Sync) {
820816 flush ();
821817 }
@@ -963,7 +959,7 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
963959 TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal ());
964960 if (tracker && tracker->num_retries_ > 0 ) {
965961 --tracker->num_retries_ ;
966- do_add_message (std::forward<BuilderType>(builder), true , false );
962+ do_add_message (std::forward<BuilderType>(builder), QueueKind::Retry, FlushAction::DontFlush );
967963 return ;
968964 }
969965 }
@@ -1002,7 +998,7 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
1002998 --tracker->num_retries_ ;
1003999 if (tracker->sender_ == SenderType::Async) {
10041000 // Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
1005- do_add_message (Builder (message), true , false );
1001+ do_add_message (Builder (message), QueueKind::Retry, FlushAction::DontFlush );
10061002 }
10071003 should_retry = true ;
10081004 }
@@ -1034,6 +1030,13 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
10341030 }
10351031}
10361032
1033+ template <typename BufferType, typename Allocator>
1034+ void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1, BufferedProducer<BufferType, Allocator>::QueueType & queue2, std::mutex & mutex)
1035+ {
1036+ std::lock_guard<std::mutex> lock (mutex);
1037+ std::swap (queue1, queue2);
1038+ }
1039+
10371040} // cppkafka
10381041
10391042#endif // CPPKAFKA_BUFFERED_PRODUCER_H
0 commit comments