Skip to content

Commit 8cfd459

Browse files
author
Alexander Damian
committed
Call flush termination callbacks from sync_produce
1 parent f117720 commit 8cfd459

File tree

1 file changed

+51
-11
lines changed

1 file changed

+51
-11
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,7 @@ class CPPKAFKA_API BufferedProducer {
691691
void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action);
692692
template <typename BuilderType>
693693
void produce_message(BuilderType&& builder);
694+
bool sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout, bool throw_on_error);
694695
Configuration prepare_configuration(Configuration config);
695696
void on_delivery_report(const Message& message);
696697
template <typename BuilderType>
@@ -787,12 +788,19 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil
787788

788789
template <typename BufferType, typename Allocator>
789790
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
790-
sync_produce(builder, infinite_timeout);
791+
sync_produce(builder, infinite_timeout, true);
791792
}
792793

793794
template <typename BufferType, typename Allocator>
794795
bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
795796
std::chrono::milliseconds timeout) {
797+
return sync_produce(builder, infinite_timeout, true);
798+
}
799+
800+
template <typename BufferType, typename Allocator>
801+
bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
802+
std::chrono::milliseconds timeout,
803+
bool throw_on_error) {
796804
if (enable_message_retries_) {
797805
//Adding a retry tracker requires copying the builder since
798806
//we cannot modify the original instance. Cloning is a fast operation
@@ -802,12 +810,32 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
802810
// produce until we succeed or we reach max retry limit
803811
auto endTime = std::chrono::steady_clock::now() + timeout;
804812
do {
805-
tracker->prepare_to_retry();
806-
produce_message(builder_clone);
807-
//Wait w/o timeout since we must get the ack to avoid a race condition.
808-
//Otherwise retry_again() will block as the producer won't get flushed
809-
//and the delivery callback will never be invoked.
810-
wait_for_current_thread_acks();
813+
try {
814+
tracker->prepare_to_retry();
815+
produce_message(builder_clone);
816+
//Wait w/o timeout since we must get the ack to avoid a race condition.
817+
//Otherwise retry_again() will block as the producer won't get flushed
818+
//and the delivery callback will never be invoked.
819+
wait_for_current_thread_acks();
820+
}
821+
catch (const HandleException& ex) {
822+
// If we have a flush failure callback and it returns true, we retry producing this message later
823+
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
824+
if (!callback || callback(builder, ex.get_error())) {
825+
if (tracker && tracker->has_retries_left()) {
826+
tracker->decrement_retries();
827+
continue;
828+
}
829+
}
830+
++total_messages_dropped_;
831+
// Call the flush termination callback
832+
CallbackInvoker<FlushTerminationCallback>("flush termination", flush_termination_callback_, &producer_)
833+
(builder, ex.get_error());
834+
if (throw_on_error) {
835+
throw;
836+
}
837+
break;
838+
}
811839
}
812840
while (tracker->retry_again() &&
813841
((timeout == infinite_timeout) ||
@@ -816,10 +844,22 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
816844
}
817845
else {
818846
// produce once
819-
produce_message(builder);
820-
wait_for_current_thread_acks(timeout);
821-
return !ack_monitor_.has_current_thread_pending_acks();
847+
try {
848+
produce_message(builder);
849+
wait_for_current_thread_acks(timeout);
850+
return !ack_monitor_.has_current_thread_pending_acks();
851+
}
852+
catch (const HandleException& ex) {
853+
++total_messages_dropped_;
854+
// Call the flush termination callback
855+
CallbackInvoker<FlushTerminationCallback>("flush termination", flush_termination_callback_, &producer_)
856+
(builder, ex.get_error());
857+
if (throw_on_error) {
858+
throw;
859+
}
860+
}
822861
}
862+
return false;
823863
}
824864

825865
template <typename BufferType, typename Allocator>
@@ -851,7 +891,7 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
851891
if (preserve_order) {
852892
//When preserving order, we must ensure that each message
853893
//gets delivered before producing the next one.
854-
sync_produce(flush_queue.front(), timeout);
894+
sync_produce(flush_queue.front(), timeout, false);
855895
}
856896
else {
857897
//Produce as fast as possible w/o waiting. If one or more

0 commit comments

Comments
 (0)