@@ -208,8 +208,14 @@ class CPPKAFKA_API BufferedProducer {
208208
209209 /* *
210210 * \brief Same as sync_produce but waits up to 'timeout' for acks to be received.
211+ *
212+ * If retries are enabled, the timeout will limit the amount of time to wait
213+ * before all retries are completed.
214+ *
215+ * \returns True if succeeded, false otherwise. If retries are enabled, false
216+ * indicates there are still retries left.
211217 */
212- void sync_produce (const MessageBuilder& builder, std::chrono::milliseconds timeout);
218+ bool sync_produce (const MessageBuilder& builder, std::chrono::milliseconds timeout);
213219
214220 /* *
215221 * \brief Produces a message asynchronously without buffering it
@@ -592,6 +598,10 @@ class CPPKAFKA_API BufferedProducer {
592598 void async_produce (BuilderType&& message, bool throw_on_error);
593599 static void swap_queues (QueueType & queue1, QueueType & queue2, std::mutex & mutex);
594600
601+ // Static members
602+ static const std::chrono::milliseconds infinite_timeout;
603+ static const std::chrono::milliseconds no_timeout;
604+
595605 // Members
596606 Producer producer_;
597607 QueueType messages_;
@@ -618,6 +628,14 @@ class CPPKAFKA_API BufferedProducer {
618628#endif
619629};
620630
631+ // Full blocking wait as per RdKafka
632+ template <typename BufferType, typename Allocator>
633+ const std::chrono::milliseconds
634+ BufferedProducer<BufferType, Allocator>::infinite_timeout = std::chrono::milliseconds(-1 );
635+ template <typename BufferType, typename Allocator>
636+ const std::chrono::milliseconds
637+ BufferedProducer<BufferType, Allocator>::no_timeout = std::chrono::milliseconds::zero();
638+
621639template <typename BufferType>
622640Producer::PayloadPolicy get_default_payload_policy () {
623641 return Producer::PayloadPolicy::COPY_PAYLOAD;
@@ -669,11 +687,11 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil
669687
670688template <typename BufferType, typename Allocator>
671689void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
672- sync_produce (builder, producer_. get_timeout () );
690+ sync_produce (builder, infinite_timeout );
673691}
674692
675693template <typename BufferType, typename Allocator>
676- void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
694+ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
677695 std::chrono::milliseconds timeout) {
678696 if (enable_message_retries_) {
679697 // Adding a retry tracker requires copying the builder since
@@ -682,17 +700,25 @@ void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
682700 MessageBuilder builder_clone (builder.clone ());
683701 TrackerPtr tracker = add_tracker (SenderType::Sync, builder_clone);
684702 // produce until we succeed or we reach max retry limit
703+ auto endTime = std::chrono::steady_clock::now () + timeout;
685704 do {
686705 tracker->prepare_to_retry ();
687706 produce_message (builder_clone);
688- wait_for_acks (timeout);
707+ // Wait w/o timeout since we must get the ack to avoid a race condition.
708+ // Otherwise retry_again() will block as the producer won't get flushed
709+ // and the delivery callback will never be invoked.
710+ wait_for_acks ();
689711 }
690- while (tracker->retry_again ());
712+ while (tracker->retry_again () &&
713+ ((timeout == infinite_timeout) ||
714+ (std::chrono::steady_clock::now () >= endTime)));
715+ return !tracker->has_retries_left ();
691716 }
692717 else {
693718 // produce once
694719 produce_message (builder);
695720 wait_for_acks (timeout);
721+ return (pending_acks_ == 0 );
696722 }
697723}
698724
@@ -703,12 +729,12 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
703729
704730template <typename BufferType, typename Allocator>
705731void BufferedProducer<BufferType, Allocator>::async_flush() {
706- flush (std::chrono::milliseconds::zero () , false );
732+ flush (no_timeout , false );
707733}
708734
709735template <typename BufferType, typename Allocator>
710736void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
711- flush (producer_. get_timeout () , preserve_order);
737+ flush (infinite_timeout , preserve_order);
712738}
713739
714740template <typename BufferType, typename Allocator>
@@ -749,7 +775,8 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
749775
750776template <typename BufferType, typename Allocator>
751777void BufferedProducer<BufferType, Allocator>::wait_for_acks() {
752- wait_for_acks (producer_.get_timeout ());
778+ // block until all acks have been received
779+ wait_for_acks (infinite_timeout);
753780}
754781
755782template <typename BufferType, typename Allocator>
@@ -773,7 +800,8 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
773800 // calculate remaining time
774801 remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
775802 (std::chrono::high_resolution_clock::now () - start_time);
776- } while ((pending_acks_ > 0 ) && (remaining.count () > 0 ));
803+ } while ((pending_acks_ > 0 ) &&
804+ ((remaining.count () > 0 ) || (timeout == infinite_timeout)));
777805 return (pending_acks_ == 0 );
778806}
779807
0 commit comments