@@ -79,7 +79,7 @@ namespace cppkafka {
7979 * \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector<char>
8080 * the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type
8181 * cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka
82- * shall not make any internal copies of the message and it is the application's responsability to free
82+ * shall not make any internal copies of the message and it is the application's responsibility to free
8383 * the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory
8484 * corruptions.
8585 */
@@ -487,36 +487,92 @@ class CPPKAFKA_API BufferedProducer {
487487 enum class QueueKind { Retry, Regular };
488488 enum class FlushAction { DontFlush, DoFlush };
489489
490+ // Simple RAII type which increments a counter on construction and
491+ // decrements it on destruction, meant to be used as reference counting.
490492 template <typename T>
491493 struct CounterGuard {
492- CounterGuard (std::atomic<T>& counter) : counter_(counter) { ++counter_; }
494+ CounterGuard (std::atomic<T>& counter)
495+ : counter_(counter) {
496+ ++counter_;
497+ }
493498 ~CounterGuard () { --counter_; }
494499 std::atomic<T>& counter_;
495500 };
496501
502+ // If the application enables retry logic, this object is passed
503+ // as internal (opaque) data with each message, so that it can keep
504+ // track of each failed attempt. Only a single tracker will be
505+ // instantiated and it's lifetime will be the same as the message it
506+ // belongs to.
497507 struct Tracker : public Internal {
498508 Tracker (SenderType sender, size_t num_retries)
499- : sender_(sender), num_retries_(num_retries)
500- {}
501- std::future<bool > get_new_future () {
502- should_retry_ = std::promise<bool >(); // reset shared data
503- return should_retry_.get_future (); // issue new future
509+ : sender_(sender),
510+ num_retries_ (num_retries) {
511+ }
512+ // Creates a new promise for synchronizing with the
513+ // on_delivery_report() callback. For synchronous producers only.
514+ void prepare_to_retry () {
515+ if (sender_ == SenderType::Sync) {
516+ retry_promise_ = std::promise<bool >();
517+ }
518+ }
519+ // Waits for the on_delivery_report() callback and determines if this message
520+ // should be retried. This call will block until on_delivery_report() executes.
521+ // For synchronous producers only.
522+ bool retry_again () {
523+ if (sender_ == SenderType::Sync) {
524+ return retry_promise_.get_future ().get ();
525+ }
526+ }
527+ // Signal the synchronous producer if the message should be retried or not.
528+ // Called from inside on_delivery_report(). For synchronous producers only.
529+ void should_retry (bool value) const {
530+ if (sender_ == SenderType::Sync) {
531+ try {
532+ retry_promise_.set_value (value);
533+ }
534+ catch (const std::future_error&) {
535+ // Promise has already been set once.
536+ }
537+ }
538+ }
539+ void set_sender_type (SenderType type) {
540+ sender_ = type;
541+ }
542+ SenderType get_sender_type () const {
543+ return sender_;
504544 }
545+ bool has_retries_left () const {
546+ return num_retries_ > 0 ;
547+ }
548+ void decrement_retries () {
549+ if (num_retries_ > 0 ) {
550+ --num_retries_;
551+ }
552+ }
553+ private:
505554 SenderType sender_;
506- std::promise<bool > should_retry_ ;
555+ mutable std::promise<bool > retry_promise_ ;
507556 size_t num_retries_;
508557 };
509558 using TrackerPtr = std::shared_ptr<Tracker>;
510559
511560 // Returns existing tracker or creates new one
512561 template <typename BuilderType>
513562 TrackerPtr add_tracker (SenderType sender, BuilderType& builder) {
514- if (has_internal_data_ ) {
563+ if (enable_message_retries_ ) {
515564 if (!builder.internal ()) {
516565 // Add message tracker only if it hasn't been added before
517566 builder.internal (std::make_shared<Tracker>(sender, max_number_retries_));
567+ return std::static_pointer_cast<Tracker>(builder.internal ());
518568 }
519- return std::static_pointer_cast<Tracker>(builder.internal ());
569+ // Return existing tracker
570+ TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal ());
571+ // Update the sender type. Since a message could have been initially produced
572+ // asynchronously but then flushed synchronously (or vice-versa), the sender
573+ // type should always reflect the latest retry mechanism.
574+ tracker->set_sender_type (sender);
575+ return tracker;
520576 }
521577 return nullptr ;
522578 }
@@ -549,7 +605,7 @@ class CPPKAFKA_API BufferedProducer {
549605 std::atomic<size_t > total_messages_produced_{0 };
550606 std::atomic<size_t > total_messages_dropped_{0 };
551607 int max_number_retries_{0 };
552- bool has_internal_data_ {false };
608+ bool enable_message_retries_ {false };
553609 QueueFullNotification queue_full_notification_{QueueFullNotification::None};
554610#ifdef KAFKA_TEST_INSTANCE
555611 TestParameters* test_params_;
@@ -586,12 +642,16 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder&
586642template <typename BufferType, typename Allocator>
587643void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
588644 add_tracker (SenderType::Async, builder);
645+ // post message unto the producer queue
589646 do_add_message (move (builder), QueueKind::Regular, FlushAction::DoFlush);
590647}
591648
592649template <typename BufferType, typename Allocator>
593650void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& builder) {
594- if (has_internal_data_) {
651+ if (enable_message_retries_) {
652+ // Adding a retry tracker requires copying the builder since
653+ // we cannot modify the original instance. Cloning is a fast operation
654+ // since the MessageBuilder class holds pointers to data only.
595655 MessageBuilder builder_clone (builder.clone ());
596656 add_tracker (SenderType::Async, builder_clone);
597657 async_produce (builder_clone, true );
@@ -603,17 +663,19 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil
603663
604664template <typename BufferType, typename Allocator>
605665void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
606- if (has_internal_data_) {
666+ if (enable_message_retries_) {
667+ // Adding a retry tracker requires copying the builder since
668+ // we cannot modify the original instance. Cloning is a fast operation
669+ // since the MessageBuilder class holds pointers to data only.
607670 MessageBuilder builder_clone (builder.clone ());
608671 TrackerPtr tracker = add_tracker (SenderType::Sync, builder_clone);
609672 // produce until we succeed or we reach max retry limit
610- std::future<bool > should_retry;
611673 do {
612- should_retry = tracker->get_new_future ();
674+ tracker->prepare_to_retry ();
613675 produce_message (builder_clone);
614676 wait_for_acks ();
615677 }
616- while (should_retry. get ());
678+ while (tracker-> retry_again ());
617679 }
618680 else {
619681 // produce once
@@ -640,9 +702,14 @@ void BufferedProducer<BufferType, Allocator>::async_flush() {
640702 flush_queue.pop_front ();
641703 }
642704 };
705+ // Produce retry queue first since these messages were produced first
643706 queue_flusher (retry_messages_, retry_mutex_);
707+ // Produce recently enqueued messages
644708 queue_flusher (messages_, mutex_);
645- wait_for_acks (std::chrono::milliseconds (0 )); // flush the producer but don't wait
709+ // Flush the producer but don't wait. It is necessary to poll
710+ // the producer at least once during this operation because
711+ // async_produce() will not.
712+ wait_for_acks (std::chrono::milliseconds (0 ));
646713}
647714
648715template <typename BufferType, typename Allocator>
@@ -653,16 +720,19 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
653720 {
654721 QueueType flush_queue; // flush from temporary queue
655722 swap_queues (queue, flush_queue, mutex);
656-
723+ // Produce one message at a time and wait for acks until queue is empty
657724 while (!flush_queue.empty ()) {
658725 sync_produce (flush_queue.front ());
659726 flush_queue.pop_front ();
660727 }
661728 };
729+ // Produce retry queue first since these messages were produced first
662730 queue_flusher (retry_messages_, retry_mutex_);
731+ // Produce recently enqueued messages
663732 queue_flusher (messages_, mutex_);
664733 }
665734 else {
735+ // Produce all messages at once then wait for acks.
666736 async_flush ();
667737 wait_for_acks ();
668738 }
@@ -680,6 +750,7 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
680750
681751 auto queue_flusher = [this ](QueueType& queue)->bool
682752 {
753+ // Produce one message at a time and wait for acks
683754 if (!queue.empty ()) {
684755 sync_produce (queue.front ());
685756 queue.pop_front ();
@@ -698,19 +769,21 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
698769 (std::chrono::high_resolution_clock::now () - start_time);
699770 } while (remaining.count () > 0 );
700771
701- // Re-enqueue remaining messages in original order
702- auto re_enqueuer = [this ](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void
772+ // When timeout has expired, any remaining messages must be re-enqueue in their
773+ // original order so they can be flushed later.
774+ auto re_enqueuer = [this ](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->bool
703775 {
704776 if (!src_queue.empty ()) {
705777 std::lock_guard<std::mutex> lock (mutex);
706778 dst_queue.insert (dst_queue.begin (),
707779 std::make_move_iterator (src_queue.begin ()),
708780 std::make_move_iterator (src_queue.end ()));
781+ return true ;
709782 }
783+ return false ;
710784 };
711- re_enqueuer (retry_flush_queue, retry_messages_, retry_mutex_);
712- re_enqueuer (flush_queue, messages_, mutex_);
713- return true ;
785+ return !re_enqueuer (retry_flush_queue, retry_messages_, retry_mutex_) &&
786+ !re_enqueuer (flush_queue, messages_, mutex_);
714787 }
715788 else {
716789 async_flush ();
@@ -820,10 +893,12 @@ void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& build
820893 std::lock_guard<std::mutex> lock (mutex_);
821894 messages_.emplace_back (std::forward<BuilderType>(builder));
822895 }
823-
824896 // Flush the queues only if a regular message is added. Retry messages may be added
825- // from rdkafka callbacks, and flush/async_flush is a user-level call
826- if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0 ) && (max_buffer_size_ <= (ssize_t )get_buffer_size ())) {
897+ // from on_delivery_report() during which flush()/async_flush() cannot be called.
898+ if (queue_kind == QueueKind::Regular &&
899+ flush_action == FlushAction::DoFlush &&
900+ (max_buffer_size_ >= 0 ) &&
901+ (max_buffer_size_ <= (ssize_t )get_buffer_size ())) {
827902 if (flush_method_ == FlushMethod::Sync) {
828903 flush ();
829904 }
@@ -865,8 +940,8 @@ size_t BufferedProducer<BufferType, Allocator>::get_flushes_in_progress() const
865940
866941template <typename BufferType, typename Allocator>
867942void BufferedProducer<BufferType, Allocator>::set_max_number_retries(size_t max_number_retries) {
868- if (!has_internal_data_ && (max_number_retries > 0 )) {
869- has_internal_data_ = true ; // enable once
943+ if (!enable_message_retries_ && (max_number_retries > 0 )) {
944+ enable_message_retries_ = true ; // enable once
870945 }
871946 max_number_retries_ = max_number_retries;
872947}
@@ -969,8 +1044,11 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
9691044 CallbackInvoker<FlushFailureCallback> callback (" flush failure" , flush_failure_callback_, &producer_);
9701045 if (!callback || callback (builder, ex.get_error ())) {
9711046 TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal ());
972- if (tracker && tracker->num_retries_ > 0 ) {
973- --tracker->num_retries_ ;
1047+ if (tracker && tracker->has_retries_left ()) {
1048+ tracker->decrement_retries ();
1049+ // Post message unto the retry queue. This queue has higher priority and will be
1050+ // flushed before the producer queue to preserve original message order.
1051+ // We don't flush now since we just had an error while producing.
9741052 do_add_message (std::forward<BuilderType>(builder), QueueKind::Retry, FlushAction::DontFlush);
9751053 return ;
9761054 }
@@ -995,24 +1073,30 @@ Configuration BufferedProducer<BufferType, Allocator>::prepare_configuration(Con
9951073
9961074template <typename BufferType, typename Allocator>
9971075void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message& message) {
998- // Get tracker data
9991076 TestParameters* test_params = get_test_parameters ();
1000- TrackerPtr tracker = has_internal_data_ ?
1001- std::static_pointer_cast<Tracker>(MessageInternal::load (const_cast <Message&>(message))->get_internal ()) : nullptr ;
1002- bool should_retry = false ;
1077+ // Get tracker if present
1078+ TrackerPtr tracker =
1079+ enable_message_retries_ ?
1080+ std::static_pointer_cast<Tracker>(MessageInternal::load (const_cast <Message&>(message))->get_internal ()) :
1081+ nullptr ;
1082+ bool retry = false ;
10031083 if (message.get_error () || (test_params && test_params->force_delivery_error_ )) {
10041084 // We should produce this message again if we don't have a produce failure callback
1005- // or we have one but it returns true
1085+ // or we have one but it returns true (indicating error is re-tryable)
10061086 CallbackInvoker<ProduceFailureCallback> callback (" produce failure" , produce_failure_callback_, &producer_);
10071087 if (!callback || callback (message)) {
10081088 // Check if we have reached the maximum retry limit
1009- if (tracker && tracker->num_retries_ > 0 ) {
1010- --tracker->num_retries_ ;
1011- if (tracker->sender_ == SenderType::Async) {
1012- // Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
1089+ if (tracker && tracker->has_retries_left ()) {
1090+ tracker->decrement_retries ();
1091+ // If the sender is asynchronous, the message is re-enqueued. If the sender is
1092+ // synchronous, we simply notify via Tracker::should_retry() below.
1093+ if (tracker->get_sender_type () == SenderType::Async) {
1094+ // Post message unto the retry queue. This queue has higher priority and will be
1095+ // flushed later by the application (before the producer queue) to preserve original message order.
1096+ // We prevent flushing now since we are within a callback context.
10131097 do_add_message (Builder (message), QueueKind::Retry, FlushAction::DontFlush);
10141098 }
1015- should_retry = true ;
1099+ retry = true ;
10161100 }
10171101 else {
10181102 ++total_messages_dropped_;
@@ -1032,14 +1116,9 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
10321116 // Increment the total successful transmissions
10331117 ++total_messages_produced_;
10341118 }
1035- // Signal producers
1119+ // Signal synchronous sender and unblock it since it's waiting for this ack to arrive.
10361120 if (tracker) {
1037- try {
1038- tracker->should_retry_ .set_value (should_retry);
1039- }
1040- catch (const std::future_error& ex) {
1041- // This is an async retry and future is not being read
1042- }
1121+ tracker->should_retry (retry);
10431122 }
10441123 // Decrement the expected acks and check to prevent underflow
10451124 if (pending_acks_ > 0 ) {
@@ -1048,7 +1127,9 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
10481127}
10491128
10501129template <typename BufferType, typename Allocator>
1051- void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1, BufferedProducer<BufferType, Allocator>::QueueType & queue2, std::mutex & mutex)
1130+ void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1,
1131+ BufferedProducer<BufferType, Allocator>::QueueType & queue2,
1132+ std::mutex & mutex)
10521133{
10531134 std::lock_guard<std::mutex> lock (mutex);
10541135 std::swap (queue1, queue2);
0 commit comments