3434#include < deque>
3535#include < cstdint>
3636#include < algorithm>
37- #include < unordered_set>
38- #include < unordered_map>
3937#include < map>
4038#include < mutex>
4139#include < atomic>
4240#include < future>
41+ #include < thread>
4342#include < boost/optional.hpp>
4443#include " ../producer.h"
4544#include " ../detail/callback_invoker.h"
@@ -53,8 +52,9 @@ namespace cppkafka {
5352 * This class allows buffering messages and flushing them synchronously while also allowing
5453 * to produce them just as you would using the Producer class.
5554 *
56- * When calling either flush or wait_for_acks, the buffered producer will block until all
57- * produced messages (either buffered or sent directly) are acknowledged by the kafka brokers.
55+ * When calling either flush or wait_for_acks/wait_for_current_thread_acks, the buffered producer
56+ * will block until all produced messages (either buffered or sent directly) are acknowledged
57+ * by the kafka brokers.
5858 *
5959 * When producing messages, this class will handle cases where the producer's queue is full so it
6060 * will poll until the production is successful.
@@ -185,8 +185,9 @@ class CPPKAFKA_API BufferedProducer {
185185 /* *
186186 * \brief Produces a message asynchronously without buffering it
187187 *
188- * The message will still be tracked so that a call to flush or wait_for_acks will actually
189- * wait for it to be acknowledged.
188+ * The message will still be tracked so that a call to flush or
189+ * wait_for_acks/wait_for_current_thread_acks will actually wait for it
190+ * to be acknowledged.
190191 *
191192 * \param builder The builder that contains the message to be produced
192193 *
@@ -220,8 +221,9 @@ class CPPKAFKA_API BufferedProducer {
220221 /* *
221222 * \brief Produces a message asynchronously without buffering it
222223 *
223- * The message will still be tracked so that a call to flush or wait_for_acks will actually
224- * wait for it to be acknowledged.
224+ * The message will still be tracked so that a call to flush or
225+ * wait_for_acks/wait_for_current_thread_acks will actually wait for it
226+ * to be acknowledged.
225227 *
226228 * \param message The message to be produced
227229 *
@@ -241,7 +243,7 @@ class CPPKAFKA_API BufferedProducer {
241243 * \brief Flushes the buffered messages.
242244 *
243245 * This will send all messages and keep waiting until all of them are acknowledged (this is
244- * done by calling wait_for_acks).
246+ * done by calling wait_for_acks/wait_for_current_thread_acks ).
245247 *
246248 * \param preserve_order If set to True, each message in the queue will be flushed only when the previous
247249 * message ack is received. This may result in performance degradation as messages
@@ -267,16 +269,30 @@ class CPPKAFKA_API BufferedProducer {
267269 bool flush (std::chrono::milliseconds timeout, bool preserve_order = false );
268270
269271 /* *
270- * Waits for produced message's acknowledgements from the brokers
272+ * \brief Waits for produced message's acknowledgements from the brokers
271273 */
272274 void wait_for_acks ();
273275
274276 /* *
275- * Waits for produced message's acknowledgements from the brokers up to 'timeout'.
277+ * \brief Waits for acknowledgements from brokers for messages produced
278+ * on the current thread only
279+ */
280+ void wait_for_current_thread_acks ();
281+
282+ /* *
283+ * \brief Waits for produced message's acknowledgements from the brokers up to 'timeout'.
276284 *
277285 * \return True if the operation completes and all acks have been received.
278286 */
279287 bool wait_for_acks (std::chrono::milliseconds timeout);
288+
289+ /* *
290+ * \brief Waits for acknowledgements from brokers for messages produced
291+ * on the current thread only. Times out after 'timeout' milliseconds.
292+ *
293+ * \return True if the operation completes and all acks have been received.
294+ */
295+ bool wait_for_current_thread_acks (std::chrono::milliseconds timeout);
280296
281297 /* *
282298 * Clears any buffered messages
@@ -327,12 +343,20 @@ class CPPKAFKA_API BufferedProducer {
327343 FlushMethod get_flush_method () const ;
328344
329345 /* *
330- * \brief Get the number of messages not yet acked by the broker
346+ * \brief Get the number of messages not yet acked by the broker.
331347 *
332348 * \return The number of messages
333349 */
334350 size_t get_pending_acks () const ;
335351
352+ /* *
353+ * \brief Get the number of pending acks for messages produces on the
354+ * current thread only.
355+ *
356+ * \return The number of messages
357+ */
358+ size_t get_current_thread_pending_acks () const ;
359+
336360 /* *
337361 * \brief Get the total number of messages successfully produced since the beginning
338362 *
@@ -358,9 +382,10 @@ class CPPKAFKA_API BufferedProducer {
358382 size_t get_flushes_in_progress () const ;
359383
360384 /* *
361- * \brief Sets the maximum number of retries per message until giving up
385+ * \brief Sets the maximum number of retries per message until giving up. Default is 5.
362386 *
363- * Default is 5
387+ * \remark Is it recommended to set the RdKafka option message.send.max.retries=0
388+ * to prevent re-ordering of messages inside RdKafka.
364389 */
365390 void set_max_number_retries (size_t max_number_retries);
366391
@@ -495,8 +520,9 @@ class CPPKAFKA_API BufferedProducer {
495520
496521private:
497522 enum class SenderType { Sync, Async };
498- enum class QueueKind { Retry, Regular };
523+ enum class QueueKind { Retry, Produce };
499524 enum class FlushAction { DontFlush, DoFlush };
525+ enum class Threads { All, Current };
500526
501527 // Simple RAII type which increments a counter on construction and
502528 // decrements it on destruction, meant to be used as reference counting.
@@ -569,6 +595,79 @@ class CPPKAFKA_API BufferedProducer {
569595 };
570596 using TrackerPtr = std::shared_ptr<Tracker>;
571597
598+ // The AckMonitor is responsible for properly counting the
599+ // outstanding unacknowledged messages for each thread as well
600+ // as the total acks. Counting acks on a per-thread basis is
601+ // critical in a multi-threaded producer since we don't want one
602+ // producer having to wait for all concurrent pending acks. Each
603+ // producer should only wait for his own acks.
604+ struct AckMonitor
605+ {
606+ // Increments the number of sent acks
607+ void increment_pending_acks () {
608+ while (!flag_.test_and_set ()) {
609+ // save the last ack number for this thread so we only
610+ // wait up to this number.
611+ last_ack_[std::this_thread::get_id ()] = ++sent_acks_;
612+ flag_.clear ();
613+ break ;
614+ }
615+ }
616+ // Increments the number of received acks,
617+ // reducing the total pending acks.
618+ void decrement_pending_acks () {
619+ while (!flag_.test_and_set ()) {
620+ ++received_acks_;
621+ flag_.clear ();
622+ break ;
623+ }
624+ }
625+ // Returns true if there are any pending acks overall.
626+ bool has_pending_acks () const {
627+ return get_pending_acks () > 0 ;
628+ }
629+ // Returns true if there are any pending acks on this thread.
630+ bool has_current_thread_pending_acks () const {
631+ return get_current_thread_pending_acks () > 0 ;
632+ }
633+ // Returns total pending acks. This is the difference between
634+ // total produced and total received.
635+ ssize_t get_pending_acks () const {
636+ ssize_t rc = 0 ;
637+ while (!flag_.test_and_set ()) {
638+ rc = get_pending_acks_impl ();
639+ flag_.clear ();
640+ break ;
641+ }
642+ return rc;
643+ }
644+ // Returns the total pending acks for this thread
645+ ssize_t get_current_thread_pending_acks () const {
646+ ssize_t rc = 0 ;
647+ while (!flag_.test_and_set ()) {
648+ rc = get_current_thread_pending_acks_impl ();
649+ flag_.clear ();
650+ break ;
651+ }
652+ return rc;
653+ }
654+ private:
655+ ssize_t get_pending_acks_impl () const {
656+ return (sent_acks_ - received_acks_);
657+ }
658+ ssize_t get_current_thread_pending_acks_impl () const {
659+ auto it = last_ack_.find (std::this_thread::get_id ());
660+ if (it != last_ack_.end ()) {
661+ return (it->second > received_acks_) ? it->second - received_acks_ : 0 ;
662+ }
663+ return 0 ;
664+ }
665+ mutable std::atomic_flag flag_{0 };
666+ ssize_t sent_acks_{0 };
667+ ssize_t received_acks_{0 };
668+ std::map<std::thread::id, ssize_t > last_ack_; // last ack number expected for this thread
669+ };
670+
572671 // Returns existing tracker or creates new one
573672 template <typename BuilderType>
574673 TrackerPtr add_tracker (SenderType sender, BuilderType& builder) {
@@ -597,6 +696,7 @@ class CPPKAFKA_API BufferedProducer {
597696 template <typename BuilderType>
598697 void async_produce (BuilderType&& message, bool throw_on_error);
599698 static void swap_queues (QueueType & queue1, QueueType & queue2, std::mutex & mutex);
699+ bool wait_for_acks_impl (Threads threads, std::chrono::milliseconds timeout);
600700
601701 // Static members
602702 static const std::chrono::milliseconds infinite_timeout;
@@ -616,7 +716,7 @@ class CPPKAFKA_API BufferedProducer {
616716 QueueFullCallback queue_full_callback_;
617717 ssize_t max_buffer_size_{-1 };
618718 FlushMethod flush_method_{FlushMethod::Sync};
619- std::atomic< size_t > pending_acks_{ 0 } ;
719+ AckMonitor ack_monitor_ ;
620720 std::atomic<size_t > flushes_in_progress_{0 };
621721 std::atomic<size_t > total_messages_produced_{0 };
622722 std::atomic<size_t > total_messages_dropped_{0 };
@@ -667,7 +767,7 @@ template <typename BufferType, typename Allocator>
667767void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
668768 add_tracker (SenderType::Async, builder);
669769 // post message unto the producer queue
670- do_add_message (move (builder), QueueKind::Regular , FlushAction::DoFlush);
770+ do_add_message (move (builder), QueueKind::Produce , FlushAction::DoFlush);
671771}
672772
673773template <typename BufferType, typename Allocator>
@@ -707,7 +807,7 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
707807 // Wait w/o timeout since we must get the ack to avoid a race condition.
708808 // Otherwise retry_again() will block as the producer won't get flushed
709809 // and the delivery callback will never be invoked.
710- wait_for_acks ();
810+ wait_for_current_thread_acks ();
711811 }
712812 while (tracker->retry_again () &&
713813 ((timeout == infinite_timeout) ||
@@ -717,8 +817,8 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
717817 else {
718818 // produce once
719819 produce_message (builder);
720- wait_for_acks (timeout);
721- return (pending_acks_ == 0 );
820+ wait_for_current_thread_acks (timeout);
821+ return !ack_monitor_. has_current_thread_pending_acks ( );
722822 }
723823}
724824
@@ -768,21 +868,41 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
768868 queue_flusher (messages_, mutex_);
769869 if (!preserve_order) {
770870 // Wait for acks from the messages produced above via async_produce
771- wait_for_acks (timeout);
871+ wait_for_current_thread_acks (timeout);
772872 }
773- return pending_acks_ == 0 ;
873+ return !ack_monitor_. has_current_thread_pending_acks () ;
774874}
775875
776876template <typename BufferType, typename Allocator>
777877void BufferedProducer<BufferType, Allocator>::wait_for_acks() {
778878 // block until all acks have been received
779- wait_for_acks (infinite_timeout);
879+ wait_for_acks_impl (Threads::All, infinite_timeout);
880+ }
881+
882+ template <typename BufferType, typename Allocator>
883+ void BufferedProducer<BufferType, Allocator>::wait_for_current_thread_acks() {
884+ // block until all acks from the current thread have been received
885+ wait_for_acks_impl (Threads::Current, infinite_timeout);
780886}
781887
782888template <typename BufferType, typename Allocator>
783889bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::milliseconds timeout) {
890+ // block until all acks have been received
891+ return wait_for_acks_impl (Threads::All, timeout);
892+ }
893+
894+ template <typename BufferType, typename Allocator>
895+ bool BufferedProducer<BufferType, Allocator>::wait_for_current_thread_acks(std::chrono::milliseconds timeout) {
896+ // block until all acks from the current thread have been received
897+ return wait_for_acks_impl (Threads::Current, timeout);
898+ }
899+
900+ template <typename BufferType, typename Allocator>
901+ bool BufferedProducer<BufferType, Allocator>::wait_for_acks_impl(Threads threads,
902+ std::chrono::milliseconds timeout) {
784903 auto remaining = timeout;
785904 auto start_time = std::chrono::high_resolution_clock::now ();
905+ bool pending_acks = true ;
786906 do {
787907 try {
788908 producer_.flush (remaining);
@@ -791,7 +911,10 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
791911 // If we just hit the timeout, keep going, otherwise re-throw
792912 if (ex.get_error () == RD_KAFKA_RESP_ERR__TIMED_OUT) {
793913 // There is no time remaining
794- return (pending_acks_ == 0 );
914+ pending_acks = (threads == Threads::All) ?
915+ ack_monitor_.has_pending_acks () :
916+ ack_monitor_.has_current_thread_pending_acks ();
917+ return !pending_acks;
795918 }
796919 else {
797920 throw ;
@@ -800,9 +923,11 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
800923 // calculate remaining time
801924 remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
802925 (std::chrono::high_resolution_clock::now () - start_time);
803- } while ((pending_acks_ > 0 ) &&
804- ((remaining.count () > 0 ) || (timeout == infinite_timeout)));
805- return (pending_acks_ == 0 );
926+ pending_acks = (threads == Threads::All) ?
927+ ack_monitor_.has_pending_acks () :
928+ ack_monitor_.has_current_thread_pending_acks ();
929+ } while (pending_acks && ((remaining.count () > 0 ) || (timeout == infinite_timeout)));
930+ return !pending_acks;
806931}
807932
808933template <typename BufferType, typename Allocator>
@@ -864,9 +989,9 @@ void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& build
864989 std::lock_guard<std::mutex> lock (mutex_);
865990 messages_.emplace_back (std::forward<BuilderType>(builder));
866991 }
867- // Flush the queues only if a regular message is added. Retry messages may be added
992+ // Flush the queues only if a produced message is added. Retry messages may be added
868993 // from on_delivery_report() during which flush()/async_flush() cannot be called.
869- if (queue_kind == QueueKind::Regular &&
994+ if (queue_kind == QueueKind::Produce &&
870995 flush_action == FlushAction::DoFlush &&
871996 (max_buffer_size_ >= 0 ) &&
872997 (max_buffer_size_ <= (ssize_t )get_buffer_size ())) {
@@ -891,7 +1016,12 @@ const Producer& BufferedProducer<BufferType, Allocator>::get_producer() const {
8911016
8921017template <typename BufferType, typename Allocator>
8931018size_t BufferedProducer<BufferType, Allocator>::get_pending_acks() const {
894- return pending_acks_;
1019+ return ack_monitor_.get_pending_acks ();
1020+ }
1021+
1022+ template <typename BufferType, typename Allocator>
1023+ size_t BufferedProducer<BufferType, Allocator>::get_current_thread_pending_acks() const {
1024+ return ack_monitor_.get_current_thread_pending_acks ();
8951025}
8961026
8971027template <typename BufferType, typename Allocator>
@@ -980,7 +1110,7 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil
9801110 producer_.produce (builder);
9811111 internal_guard.release ();
9821112 // Sent successfully
983- ++pending_acks_ ;
1113+ ack_monitor_. increment_pending_acks () ;
9841114 break ;
9851115 }
9861116 catch (const HandleException& ex) {
@@ -1092,9 +1222,7 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
10921222 tracker->should_retry (retry);
10931223 }
10941224 // Decrement the expected acks and check to prevent underflow
1095- if (pending_acks_ > 0 ) {
1096- --pending_acks_;
1097- }
1225+ ack_monitor_.decrement_pending_acks ();
10981226}
10991227
11001228template <typename BufferType, typename Allocator>
0 commit comments