@@ -691,6 +691,7 @@ class CPPKAFKA_API BufferedProducer {
691691 void do_add_message (BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action);
692692 template <typename BuilderType>
693693 void produce_message (BuilderType&& builder);
694+ bool sync_produce (const MessageBuilder& builder, std::chrono::milliseconds timeout, bool throw_on_error);
694695 Configuration prepare_configuration (Configuration config);
695696 void on_delivery_report (const Message& message);
696697 template <typename BuilderType>
@@ -787,12 +788,19 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil
787788
788789template <typename BufferType, typename Allocator>
789790void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
790- sync_produce (builder, infinite_timeout);
791+ sync_produce (builder, infinite_timeout, true );
791792}
792793
793794template <typename BufferType, typename Allocator>
794795bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
795796 std::chrono::milliseconds timeout) {
797+ return sync_produce (builder, infinite_timeout, true );
798+ }
799+
800+ template <typename BufferType, typename Allocator>
801+ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
802+ std::chrono::milliseconds timeout,
803+ bool throw_on_error) {
796804 if (enable_message_retries_) {
797805 // Adding a retry tracker requires copying the builder since
798806 // we cannot modify the original instance. Cloning is a fast operation
@@ -802,12 +810,32 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
802810 // produce until we succeed or we reach max retry limit
803811 auto endTime = std::chrono::steady_clock::now () + timeout;
804812 do {
805- tracker->prepare_to_retry ();
806- produce_message (builder_clone);
807- // Wait w/o timeout since we must get the ack to avoid a race condition.
808- // Otherwise retry_again() will block as the producer won't get flushed
809- // and the delivery callback will never be invoked.
810- wait_for_current_thread_acks ();
813+ try {
814+ tracker->prepare_to_retry ();
815+ produce_message (builder_clone);
816+ // Wait w/o timeout since we must get the ack to avoid a race condition.
817+ // Otherwise retry_again() will block as the producer won't get flushed
818+ // and the delivery callback will never be invoked.
819+ wait_for_current_thread_acks ();
820+ }
821+ catch (const HandleException& ex) {
822+ // If we have a flush failure callback and it returns true, we retry producing this message later
823+ CallbackInvoker<FlushFailureCallback> callback (" flush failure" , flush_failure_callback_, &producer_);
824+ if (!callback || callback (builder, ex.get_error ())) {
825+ if (tracker && tracker->has_retries_left ()) {
826+ tracker->decrement_retries ();
827+ continue ;
828+ }
829+ }
830+ ++total_messages_dropped_;
831+ // Call the flush termination callback
832+ CallbackInvoker<FlushTerminationCallback>(" flush termination" , flush_termination_callback_, &producer_)
833+ (builder, ex.get_error ());
834+ if (throw_on_error) {
835+ throw ;
836+ }
837+ break ;
838+ }
811839 }
812840 while (tracker->retry_again () &&
813841 ((timeout == infinite_timeout) ||
@@ -816,10 +844,22 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
816844 }
817845 else {
818846 // produce once
819- produce_message (builder);
820- wait_for_current_thread_acks (timeout);
821- return !ack_monitor_.has_current_thread_pending_acks ();
847+ try {
848+ produce_message (builder);
849+ wait_for_current_thread_acks (timeout);
850+ return !ack_monitor_.has_current_thread_pending_acks ();
851+ }
852+ catch (const HandleException& ex) {
853+ ++total_messages_dropped_;
854+ // Call the flush termination callback
855+ CallbackInvoker<FlushTerminationCallback>(" flush termination" , flush_termination_callback_, &producer_)
856+ (builder, ex.get_error ());
857+ if (throw_on_error) {
858+ throw ;
859+ }
860+ }
822861 }
862+ return false ;
823863}
824864
825865template <typename BufferType, typename Allocator>
@@ -851,7 +891,7 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
851891 if (preserve_order) {
852892 // When preserving order, we must ensure that each message
853893 // gets delivered before producing the next one.
854- sync_produce (flush_queue.front (), timeout);
894+ sync_produce (flush_queue.front (), timeout, false );
855895 }
856896 else {
857897 // Produce as fast as possible w/o waiting. If one or more
0 commit comments