Skip to content

Commit f220062

Browse files
author
accelerated
committed
Changed purge to async_flush
1 parent 7530b9f commit f220062

File tree

1 file changed

+21
-20
lines changed

1 file changed

+21
-20
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ namespace cppkafka {
8686
template <typename BufferType>
8787
class CPPKAFKA_API BufferedProducer {
8888
public:
89-
enum class EmptyBufferMethod { Flush, ///< Empty the buffer and wait for acks from the broker
90-
Purge }; ///< Empty the buffer and don't wait for acks
89+
enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker
90+
Async }; ///< Empty the buffer and don't wait for acks
9191
/**
9292
* Concrete builder
9393
*/
@@ -177,7 +177,7 @@ class CPPKAFKA_API BufferedProducer {
177177
*
178178
* Similar to flush, it will send all messages but will not wait for acks to complete.
179179
*/
180-
void purge();
180+
void async_flush();
181181

182182
/**
183183
* \brief Flushes the buffered messages.
@@ -230,18 +230,19 @@ class CPPKAFKA_API BufferedProducer {
230230
ssize_t get_max_buffer_size() const;
231231

232232
/**
233-
* \brief Sets the method used to empty the internal buffer when 'max_buffer_size' is reached. Default is 'Flush'
233+
* \brief Sets the method used to flush the internal buffer when 'max_buffer_size' is reached.
234+
* Default is 'Sync'
234235
*
235236
* \param method The method
236237
*/
237-
void set_buffer_empty_method(EmptyBufferMethod method);
238+
void set_flush_method(FlushMethod method);
238239

239240
/**
240-
* \brief Gets the method used to empty the internal buffer.
241+
* \brief Gets the method used to flush the internal buffer.
241242
*
242243
* \return The method
243244
*/
244-
EmptyBufferMethod get_buffer_empty_method() const;
245+
FlushMethod get_flush_method() const;
245246

246247
/**
247248
* \brief Get the number of messages not yet acked by the broker
@@ -398,7 +399,7 @@ class CPPKAFKA_API BufferedProducer {
398399
return nullptr;
399400
}
400401
template <typename BuilderType>
401-
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_empty_buffer);
402+
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
402403
template <typename BuilderType>
403404
void produce_message(BuilderType&& builder);
404405
Configuration prepare_configuration(Configuration config);
@@ -414,7 +415,7 @@ class CPPKAFKA_API BufferedProducer {
414415
ProduceFailureCallback produce_failure_callback_;
415416
FlushFailureCallback flush_failure_callback_;
416417
ssize_t max_buffer_size_{-1};
417-
EmptyBufferMethod empty_buffer_method_{EmptyBufferMethod::Flush};
418+
FlushMethod flush_method_{FlushMethod::Sync};
418419
std::atomic<size_t> pending_acks_{0};
419420
std::atomic<size_t> flushes_in_progress_{0};
420421
std::atomic<size_t> total_messages_produced_{0};
@@ -495,7 +496,7 @@ void BufferedProducer<BufferType>::produce(const Message& message) {
495496
}
496497

497498
template <typename BufferType>
498-
void BufferedProducer<BufferType>::purge() {
499+
void BufferedProducer<BufferType>::async_flush() {
499500
CounterGuard<size_t> counter_guard(flushes_in_progress_);
500501
QueueType flush_queue; // flush from temporary queue
501502
{
@@ -510,7 +511,7 @@ void BufferedProducer<BufferType>::purge() {
510511

511512
template <typename BufferType>
512513
void BufferedProducer<BufferType>::flush() {
513-
purge();
514+
async_flush();
514515
wait_for_acks();
515516
}
516517

@@ -558,21 +559,21 @@ ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
558559
}
559560

560561
template <typename BufferType>
561-
void BufferedProducer<BufferType>::set_buffer_empty_method(EmptyBufferMethod method) {
562-
empty_buffer_method_ = method;
562+
void BufferedProducer<BufferType>::set_flush_method(FlushMethod method) {
563+
flush_method_ = method;
563564
}
564565

565566
template <typename BufferType>
566-
typename BufferedProducer<BufferType>::EmptyBufferMethod
567-
BufferedProducer<BufferType>::get_buffer_empty_method() const {
568-
return empty_buffer_method_;
567+
typename BufferedProducer<BufferType>::FlushMethod
568+
BufferedProducer<BufferType>::get_flush_method() const {
569+
return flush_method_;
569570
}
570571

571572
template <typename BufferType>
572573
template <typename BuilderType>
573574
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
574575
MessagePriority priority,
575-
bool do_empty_buffer) {
576+
bool do_flush) {
576577
{
577578
std::lock_guard<std::mutex> lock(mutex_);
578579
if (priority == MessagePriority::High) {
@@ -582,12 +583,12 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
582583
messages_.emplace_back(std::forward<BuilderType>(builder));
583584
}
584585
}
585-
if (do_empty_buffer && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
586-
if (empty_buffer_method_ == EmptyBufferMethod::Flush) {
586+
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
587+
if (flush_method_ == FlushMethod::Sync) {
587588
flush();
588589
}
589590
else {
590-
purge();
591+
async_flush();
591592
}
592593
}
593594
}

0 commit comments

Comments
 (0)