Skip to content

Commit 2287e09

Browse files
author
Alexander Damian
committed
Express async_flush in terms of flush since the logic is identical except for the timeout
1 parent 92e46aa commit 2287e09

File tree

1 file changed

+28
-51
lines changed

1 file changed

+28
-51
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 28 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -703,25 +703,7 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
703703

704704
template <typename BufferType, typename Allocator>
705705
void BufferedProducer<BufferType, Allocator>::async_flush() {
706-
CounterGuard<size_t> counter_guard(flushes_in_progress_);
707-
auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void
708-
{
709-
QueueType flush_queue; // flush from temporary queue
710-
swap_queues(queue, flush_queue, mutex);
711-
712-
while (!flush_queue.empty()) {
713-
async_produce(std::move(flush_queue.front()), false);
714-
flush_queue.pop_front();
715-
}
716-
};
717-
//Produce retry queue first since these messages were produced first
718-
queue_flusher(retry_messages_, retry_mutex_);
719-
//Produce recently enqueued messages
720-
queue_flusher(messages_, mutex_);
721-
// Flush the producer but don't wait. It is necessary to poll
722-
// the producer at least once during this operation because
723-
// async_produce() will not.
724-
wait_for_acks(std::chrono::milliseconds(0));
706+
flush(std::chrono::milliseconds::zero(), false);
725707
}
726708

727709
template <typename BufferType, typename Allocator>
@@ -732,47 +714,42 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
732714
template <typename BufferType, typename Allocator>
733715
bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout,
734716
bool preserve_order) {
735-
if (preserve_order) {
736-
CounterGuard<size_t> counter_guard(flushes_in_progress_);
737-
auto queue_flusher = [&timeout, this](QueueType& queue, std::mutex & mutex)->void
738-
{
739-
QueueType flush_queue; // flush from temporary queue
740-
swap_queues(queue, flush_queue, mutex);
741-
//Produce one message at a time and wait for acks until queue is empty
742-
while (!flush_queue.empty()) {
717+
CounterGuard<size_t> counter_guard(flushes_in_progress_);
718+
auto queue_flusher = [timeout, preserve_order, this]
719+
(QueueType& queue, std::mutex & mutex)->void
720+
{
721+
QueueType flush_queue; // flush from temporary queue
722+
swap_queues(queue, flush_queue, mutex);
723+
//Produce one message at a time and wait for acks until queue is empty
724+
while (!flush_queue.empty()) {
725+
if (preserve_order) {
726+
//When preserving order, we must ensure that each message
727+
//gets delivered before producing the next one.
743728
sync_produce(flush_queue.front(), timeout);
744-
flush_queue.pop_front();
745729
}
746-
};
747-
//Produce retry queue first since these messages were produced first
748-
queue_flusher(retry_messages_, retry_mutex_);
749-
//Produce recently enqueued messages
750-
queue_flusher(messages_, mutex_);
751-
}
752-
else {
753-
//Produce all messages at once then wait for acks.
754-
async_flush();
730+
else {
731+
//Produce as fast as possible w/o waiting. If one or more
732+
//messages fail, they will be re-enqueued for retry
733+
//on the next flush cycle, which causes re-ordering.
734+
async_produce(flush_queue.front(), false);
735+
}
736+
flush_queue.pop_front();
737+
}
738+
};
739+
//Produce retry queue first since these messages were produced first.
740+
queue_flusher(retry_messages_, retry_mutex_);
741+
//Produce recently enqueued messages
742+
queue_flusher(messages_, mutex_);
743+
if (!preserve_order) {
744+
//Wait for acks from the messages produced above via async_produce
755745
wait_for_acks(timeout);
756746
}
757747
return pending_acks_ == 0;
758748
}
759749

760750
template <typename BufferType, typename Allocator>
761751
void BufferedProducer<BufferType, Allocator>::wait_for_acks() {
762-
while (pending_acks_ > 0) {
763-
try {
764-
producer_.flush();
765-
}
766-
catch (const HandleException& ex) {
767-
// If we just hit the timeout, keep going, otherwise re-throw
768-
if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) {
769-
continue;
770-
}
771-
else {
772-
throw;
773-
}
774-
}
775-
}
752+
wait_for_acks(producer_.get_timeout());
776753
}
777754

778755
template <typename BufferType, typename Allocator>

0 commit comments

Comments
 (0)