Skip to content

Commit 97229eb

Browse files
committed
Added a high-priority queue to BufferedProducer to avoid message re-ordering
1 parent 4ba6b38 commit 97229eb

File tree

1 file changed

+49
-12
lines changed

1 file changed

+49
-12
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ class CPPKAFKA_API BufferedProducer {
530530
// Members
531531
Producer producer_;
532532
QueueType messages_;
533+
QueueType hi_pri_messages_;
533534
mutable std::mutex mutex_;
534535
ProduceSuccessCallback produce_success_callback_;
535536
ProduceFailureCallback produce_failure_callback_;
@@ -565,7 +566,8 @@ template <typename BufferType, typename Allocator>
565566
BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
566567
const Allocator& alloc)
567568
: producer_(prepare_configuration(std::move(config))),
568-
messages_(alloc) {
569+
messages_(alloc),
570+
hi_pri_messages_(alloc) {
569571
producer_.set_payload_policy(get_default_payload_policy<BufferType>());
570572
#ifdef KAFKA_TEST_INSTANCE
571573
test_params_ = nullptr;
@@ -625,10 +627,16 @@ template <typename BufferType, typename Allocator>
625627
void BufferedProducer<BufferType, Allocator>::async_flush() {
626628
CounterGuard<size_t> counter_guard(flushes_in_progress_);
627629
QueueType flush_queue; // flush from temporary queue
630+
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
628631
{
629632
std::lock_guard<std::mutex> lock(mutex_);
633+
std::swap(hi_pri_messages_, hi_pri_flush_queue);
630634
std::swap(messages_, flush_queue);
631635
}
636+
while (!hi_pri_flush_queue.empty()) {
637+
async_produce(std::move(hi_pri_flush_queue.front()), false);
638+
hi_pri_flush_queue.pop_front();
639+
}
632640
while (!flush_queue.empty()) {
633641
async_produce(std::move(flush_queue.front()), false);
634642
flush_queue.pop_front();
@@ -640,10 +648,16 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
640648
if (preserve_order) {
641649
CounterGuard<size_t> counter_guard(flushes_in_progress_);
642650
QueueType flush_queue; // flush from temporary queue
651+
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
643652
{
644653
std::lock_guard<std::mutex> lock(mutex_);
654+
std::swap(hi_pri_messages_, hi_pri_flush_queue);
645655
std::swap(messages_, flush_queue);
646656
}
657+
while (!hi_pri_flush_queue.empty()) {
658+
sync_produce(hi_pri_flush_queue.front());
659+
hi_pri_flush_queue.pop_front();
660+
}
647661
while (!flush_queue.empty()) {
648662
sync_produce(flush_queue.front());
649663
flush_queue.pop_front();
@@ -661,25 +675,45 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
661675
if (preserve_order) {
662676
CounterGuard<size_t> counter_guard(flushes_in_progress_);
663677
QueueType flush_queue; // flush from temporary queue
678+
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
664679
{
665680
std::lock_guard<std::mutex> lock(mutex_);
681+
std::swap(hi_pri_messages_, hi_pri_flush_queue);
666682
std::swap(messages_, flush_queue);
667683
}
668684
auto remaining = timeout;
669-
auto start_time = std::chrono::high_resolution_clock::now();
685+
auto start_time = std::chrono::high_resolution_clock::now();
670686
do {
671-
sync_produce(flush_queue.front());
672-
flush_queue.pop_front();
687+
if (!hi_pri_flush_queue.empty()) {
688+
sync_produce(hi_pri_flush_queue.front());
689+
hi_pri_flush_queue.pop_front();
690+
}
691+
else if (!flush_queue.empty()) {
692+
sync_produce(flush_queue.front());
693+
flush_queue.pop_front();
694+
}
695+
else {
696+
break;
697+
}
673698
// calculate remaining time
674699
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
675700
(std::chrono::high_resolution_clock::now() - start_time);
676-
} while (!flush_queue.empty() && (remaining.count() > 0));
701+
} while (remaining.count() > 0);
677702

678703
// Re-enqueue remaining messages in original order
679-
if (!flush_queue.empty()) {
704+
if (!hi_pri_flush_queue.empty() || !flush_queue.empty()) {
680705
std::lock_guard<std::mutex> lock(mutex_);
681-
messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end()));
682-
}
706+
if (!!hi_pri_flush_queue.empty()) {
707+
hi_pri_messages_.insert(hi_pri_messages_.begin(),
708+
std::make_move_iterator(hi_pri_flush_queue.begin()),
709+
std::make_move_iterator(hi_pri_flush_queue.end()));
710+
}
711+
if (!flush_queue.empty()) {
712+
messages_.insert(messages_.begin(),
713+
std::make_move_iterator(flush_queue.begin()),
714+
std::make_move_iterator(flush_queue.end()));
715+
}
716+
}
683717
}
684718
else {
685719
async_flush();
@@ -735,11 +769,13 @@ void BufferedProducer<BufferType, Allocator>::clear() {
735769
std::lock_guard<std::mutex> lock(mutex_);
736770
QueueType tmp;
737771
std::swap(tmp, messages_);
772+
QueueType hi_pri_tmp;
773+
std::swap(hi_pri_tmp, hi_pri_messages_);
738774
}
739775

740776
template <typename BufferType, typename Allocator>
741777
size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
742-
return messages_.size();
778+
return messages_.size() + hi_pri_messages_.size();
743779
}
744780

745781
template <typename BufferType, typename Allocator>
@@ -774,20 +810,21 @@ void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& build
774810
{
775811
std::lock_guard<std::mutex> lock(mutex_);
776812
if (priority == MessagePriority::High) {
777-
messages_.emplace_front(std::forward<BuilderType>(builder));
813+
hi_pri_messages_.emplace_back(std::forward<BuilderType>(builder));
778814
}
779815
else {
780816
messages_.emplace_back(std::forward<BuilderType>(builder));
781817
}
782818
}
783-
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
819+
820+
if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
784821
if (flush_method_ == FlushMethod::Sync) {
785822
flush();
786823
}
787824
else {
788825
async_flush();
789826
}
790-
}
827+
}
791828
}
792829

793830
template <typename BufferType, typename Allocator>

0 commit comments

Comments
 (0)