Skip to content

Commit 92e46aa

Browse files
author
Alexander Damian
committed
Proper implementation of flush() with timeout
1 parent e401e97 commit 92e46aa

File tree

1 file changed

+25
-59
lines changed

1 file changed

+25
-59
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 25 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,11 @@ class CPPKAFKA_API BufferedProducer {
206206
*/
207207
void sync_produce(const MessageBuilder& builder);
208208

209+
/**
210+
* \brief Same as sync_produce but waits up to 'timeout' for acks to be received.
211+
*/
212+
void sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout);
213+
209214
/**
210215
* \brief Produces a message asynchronously without buffering it
211216
*
@@ -481,7 +486,7 @@ class CPPKAFKA_API BufferedProducer {
481486
return nullptr;
482487
}
483488
#endif
484-
489+
485490
private:
486491
enum class SenderType { Sync, Async };
487492
enum class QueueKind { Retry, Regular };
@@ -523,6 +528,7 @@ class CPPKAFKA_API BufferedProducer {
523528
if (sender_ == SenderType::Sync) {
524529
return retry_promise_.get_future().get();
525530
}
531+
return false;
526532
}
527533
// Signal the synchronous producer if the message should be retried or not.
528534
// Called from inside on_delivery_report(). For synchronous producers only.
@@ -663,6 +669,12 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil
663669

664670
template <typename BufferType, typename Allocator>
665671
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
672+
sync_produce(builder, producer_.get_timeout());
673+
}
674+
675+
template <typename BufferType, typename Allocator>
676+
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
677+
std::chrono::milliseconds timeout) {
666678
if (enable_message_retries_) {
667679
//Adding a retry tracker requires copying the builder since
668680
//we cannot modify the original instance. Cloning is a fast operation
@@ -673,14 +685,14 @@ void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
673685
do {
674686
tracker->prepare_to_retry();
675687
produce_message(builder_clone);
676-
wait_for_acks();
688+
wait_for_acks(timeout);
677689
}
678690
while (tracker->retry_again());
679691
}
680692
else {
681693
// produce once
682694
produce_message(builder);
683-
wait_for_acks();
695+
wait_for_acks(timeout);
684696
}
685697
}
686698

@@ -714,15 +726,21 @@ void BufferedProducer<BufferType, Allocator>::async_flush() {
714726

715727
template <typename BufferType, typename Allocator>
716728
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
729+
flush(producer_.get_timeout(), preserve_order);
730+
}
731+
732+
template <typename BufferType, typename Allocator>
733+
bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout,
734+
bool preserve_order) {
717735
if (preserve_order) {
718736
CounterGuard<size_t> counter_guard(flushes_in_progress_);
719-
auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void
737+
auto queue_flusher = [&timeout, this](QueueType& queue, std::mutex & mutex)->void
720738
{
721739
QueueType flush_queue; // flush from temporary queue
722740
swap_queues(queue, flush_queue, mutex);
723741
//Produce one message at a time and wait for acks until queue is empty
724742
while (!flush_queue.empty()) {
725-
sync_produce(flush_queue.front());
743+
sync_produce(flush_queue.front(), timeout);
726744
flush_queue.pop_front();
727745
}
728746
};
@@ -734,61 +752,9 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
734752
else {
735753
//Produce all messages at once then wait for acks.
736754
async_flush();
737-
wait_for_acks();
738-
}
739-
}
740-
741-
template <typename BufferType, typename Allocator>
742-
bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout,
743-
bool preserve_order) {
744-
if (preserve_order) {
745-
CounterGuard<size_t> counter_guard(flushes_in_progress_);
746-
QueueType flush_queue; // flush from temporary queue
747-
swap_queues(messages_, flush_queue, mutex_);
748-
QueueType retry_flush_queue; // flush from temporary retry queue
749-
swap_queues(retry_messages_, retry_flush_queue, retry_mutex_);
750-
751-
auto queue_flusher = [this](QueueType& queue)->bool
752-
{
753-
//Produce one message at a time and wait for acks
754-
if (!queue.empty()) {
755-
sync_produce(queue.front());
756-
queue.pop_front();
757-
return true;
758-
}
759-
return false;
760-
};
761-
auto remaining = timeout;
762-
auto start_time = std::chrono::high_resolution_clock::now();
763-
do {
764-
if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) {
765-
break;
766-
}
767-
// calculate remaining time
768-
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
769-
(std::chrono::high_resolution_clock::now() - start_time);
770-
} while (remaining.count() > 0);
771-
772-
// When timeout has expired, any remaining messages must be re-enqueue in their
773-
// original order so they can be flushed later.
774-
auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->bool
775-
{
776-
if (!src_queue.empty()) {
777-
std::lock_guard<std::mutex> lock(mutex);
778-
dst_queue.insert(dst_queue.begin(),
779-
std::make_move_iterator(src_queue.begin()),
780-
std::make_move_iterator(src_queue.end()));
781-
return true;
782-
}
783-
return false;
784-
};
785-
return !re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_) &&
786-
!re_enqueuer(flush_queue, messages_, mutex_);
787-
}
788-
else {
789-
async_flush();
790-
return wait_for_acks(timeout);
755+
wait_for_acks(timeout);
791756
}
757+
return pending_acks_ == 0;
792758
}
793759

794760
template <typename BufferType, typename Allocator>

0 commit comments

Comments
 (0)