Skip to content

Commit eb46b88

Browse files
acceleratedmfontanini
authored andcommitted
Bug fixes for sync flush and add_tracker (#91)
* fixes for sync flush and also add_tracker * added flag for flush
1 parent b8f4be5 commit eb46b88

File tree

2 files changed

+73
-18
lines changed

2 files changed

+73
-18
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -185,20 +185,28 @@ class CPPKAFKA_API BufferedProducer {
185185
* This will send all messages and keep waiting until all of them are acknowledged (this is
186186
* done by calling wait_for_acks).
187187
*
188+
* \param preserve_order If set to True, each message in the queue will be flushed only when the previous
189+
* message ack is received. This may result in performance degradation as messages
190+
* are sent one at a time. This calls sync_produce() on each message in the buffer.
191+
* If set to False, all messages are flushed in one batch before waiting for acks,
192+
* however message reordering may occur if librdkafka setting 'messages.sent.max.retries > 0'.
193+
*
188194
* \remark Although it is possible to call flush from multiple threads concurrently, better
189195
* performance is achieved when called from the same thread or when serialized
190196
* with respect to other threads.
191197
*/
192-
void flush();
198+
void flush(bool preserve_order = false);
193199

194200
/**
195201
* \brief Flushes the buffered messages and waits up to 'timeout'
196202
*
197203
* \param timeout The maximum time to wait until all acks are received
198204
*
205+
* \param preserve_order True to preserve message ordering, False otherwise. See flush above for more details.
206+
*
199207
* \return True if the operation completes and all acks have been received.
200208
*/
201-
bool flush(std::chrono::milliseconds timeout);
209+
bool flush(std::chrono::milliseconds timeout, bool preserve_order = false);
202210

203211
/**
204212
* Waits for produced message's acknowledgements from the brokers
@@ -404,13 +412,15 @@ class CPPKAFKA_API BufferedProducer {
404412
};
405413
using TrackerPtr = std::shared_ptr<Tracker>;
406414

415+
// Returns existing tracker or creates new one
407416
template <typename BuilderType>
408-
TrackerPtr add_tracker(BuilderType& builder) {
409-
if (has_internal_data_ && !builder.internal()) {
410-
// Add message tracker only if it hasn't been added before
411-
TrackerPtr tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
412-
builder.internal(tracker);
413-
return tracker;
417+
TrackerPtr add_tracker(SenderType sender, BuilderType& builder) {
418+
if (has_internal_data_) {
419+
if (!builder.internal()) {
420+
// Add message tracker only if it hasn't been added before
421+
builder.internal(std::make_shared<Tracker>(sender, max_number_retries_));
422+
}
423+
return std::static_pointer_cast<Tracker>(builder.internal());
414424
}
415425
return nullptr;
416426
}
@@ -469,15 +479,15 @@ void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
469479

470480
template <typename BufferType>
471481
void BufferedProducer<BufferType>::add_message(Builder builder) {
472-
add_tracker(builder);
482+
add_tracker(SenderType::Async, builder);
473483
do_add_message(move(builder), MessagePriority::Low, true);
474484
}
475485

476486
template <typename BufferType>
477487
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
478488
if (has_internal_data_) {
479489
MessageBuilder builder_clone(builder.clone());
480-
add_tracker(builder_clone);
490+
add_tracker(SenderType::Async, builder_clone);
481491
async_produce(builder_clone, true);
482492
}
483493
else {
@@ -489,7 +499,7 @@ template <typename BufferType>
489499
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
490500
if (has_internal_data_) {
491501
MessageBuilder builder_clone(builder.clone());
492-
TrackerPtr tracker = add_tracker(builder_clone);
502+
TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone);
493503
// produce until we succeed or we reach max retry limit
494504
std::future<bool> should_retry;
495505
do {
@@ -526,15 +536,47 @@ void BufferedProducer<BufferType>::async_flush() {
526536
}
527537

528538
template <typename BufferType>
529-
void BufferedProducer<BufferType>::flush() {
530-
async_flush();
531-
wait_for_acks();
539+
void BufferedProducer<BufferType>::flush(bool preserve_order) {
540+
if (preserve_order) {
541+
CounterGuard<size_t> counter_guard(flushes_in_progress_);
542+
QueueType flush_queue; // flush from temporary queue
543+
{
544+
std::lock_guard<std::mutex> lock(mutex_);
545+
std::swap(messages_, flush_queue);
546+
}
547+
while (!flush_queue.empty()) {
548+
sync_produce(flush_queue.front());
549+
flush_queue.pop_front();
550+
}
551+
}
552+
else {
553+
async_flush();
554+
wait_for_acks();
555+
}
532556
}
533557

534558
template <typename BufferType>
535-
bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout) {
536-
async_flush();
537-
return wait_for_acks(timeout);
559+
bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout,
560+
bool preserve_order) {
561+
if (preserve_order) {
562+
CounterGuard<size_t> counter_guard(flushes_in_progress_);
563+
QueueType flush_queue; // flush from temporary queue
564+
{
565+
std::lock_guard<std::mutex> lock(mutex_);
566+
std::swap(messages_, flush_queue);
567+
}
568+
auto start_time = std::chrono::high_resolution_clock::now();
569+
while (!flush_queue.empty() &&
570+
(std::chrono::duration_cast<std::chrono::milliseconds>
571+
(std::chrono::high_resolution_clock::now() - start_time) < timeout)) {
572+
sync_produce(flush_queue.front());
573+
flush_queue.pop_front();
574+
}
575+
}
576+
else {
577+
async_flush();
578+
return wait_for_acks(timeout);
579+
}
538580
}
539581

540582
template <typename BufferType>

tests/producer_test.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,19 @@ void flusher_run(BufferedProducer<string>& producer,
7979
producer.flush();
8080
}
8181

82+
void async_flusher_run(BufferedProducer<string>& producer,
83+
int& exit_flag,
84+
int num_flush) {
85+
while (!exit_flag) {
86+
if (producer.get_buffer_size() >= (size_t)num_flush) {
87+
producer.async_flush();
88+
}
89+
this_thread::sleep_for(milliseconds(10));
90+
}
91+
producer.async_flush();
92+
producer.wait_for_acks();
93+
}
94+
8295
void clear_run(BufferedProducer<string>& producer,
8396
condition_variable& clear) {
8497
mutex m;
@@ -377,7 +390,7 @@ TEST_CASE("replay async messages with errors", "[producer][buffered_producer][as
377390
ErrorProducer<string> producer(make_producer_config(),
378391
BufferedProducer<string>::TestParameters{false, true});
379392
producer.set_max_number_retries(num_retries);
380-
thread flusher_thread(flusher_run, ref(producer), ref(exit_flag), 0);
393+
thread flusher_thread(async_flusher_run, ref(producer), ref(exit_flag), 0);
381394
string payload = "Hello world";
382395
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload));
383396
this_thread::sleep_for(milliseconds(2000));

0 commit comments

Comments
 (0)