Skip to content

Commit 7530b9f

Browse files
author
accelerated
committed
added method to empty the buffer when max limit is reached
1 parent 3cf9bb5 commit 7530b9f

File tree

1 file changed

+37
-4
lines changed

1 file changed

+37
-4
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ namespace cppkafka {
8686
template <typename BufferType>
8787
class CPPKAFKA_API BufferedProducer {
8888
public:
89+
enum class EmptyBufferMethod { Flush, ///< Empty the buffer and wait for acks from the broker
90+
Purge }; ///< Empty the buffer and don't wait for acks
8991
/**
9092
* Concrete builder
9193
*/
@@ -227,6 +229,20 @@ class CPPKAFKA_API BufferedProducer {
227229
*/
228230
ssize_t get_max_buffer_size() const;
229231

232+
/**
233+
* \brief Sets the method used to empty the internal buffer when 'max_buffer_size' is reached. Default is 'Flush'
234+
*
235+
* \param method The method
236+
*/
237+
void set_buffer_empty_method(EmptyBufferMethod method);
238+
239+
/**
240+
* \brief Gets the method used to empty the internal buffer.
241+
*
242+
* \return The method
243+
*/
244+
EmptyBufferMethod get_buffer_empty_method() const;
245+
230246
/**
231247
* \brief Get the number of messages not yet acked by the broker
232248
*
@@ -382,7 +398,7 @@ class CPPKAFKA_API BufferedProducer {
382398
return nullptr;
383399
}
384400
template <typename BuilderType>
385-
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
401+
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_empty_buffer);
386402
template <typename BuilderType>
387403
void produce_message(BuilderType&& builder);
388404
Configuration prepare_configuration(Configuration config);
@@ -398,6 +414,7 @@ class CPPKAFKA_API BufferedProducer {
398414
ProduceFailureCallback produce_failure_callback_;
399415
FlushFailureCallback flush_failure_callback_;
400416
ssize_t max_buffer_size_{-1};
417+
EmptyBufferMethod empty_buffer_method_{EmptyBufferMethod::Flush};
401418
std::atomic<size_t> pending_acks_{0};
402419
std::atomic<size_t> flushes_in_progress_{0};
403420
std::atomic<size_t> total_messages_produced_{0};
@@ -540,11 +557,22 @@ ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
540557
return max_buffer_size_;
541558
}
542559

560+
template <typename BufferType>
561+
void BufferedProducer<BufferType>::set_buffer_empty_method(EmptyBufferMethod method) {
562+
empty_buffer_method_ = method;
563+
}
564+
565+
template <typename BufferType>
566+
typename BufferedProducer<BufferType>::EmptyBufferMethod
567+
BufferedProducer<BufferType>::get_buffer_empty_method() const {
568+
return empty_buffer_method_;
569+
}
570+
543571
template <typename BufferType>
544572
template <typename BuilderType>
545573
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
546574
MessagePriority priority,
547-
bool do_flush) {
575+
bool do_empty_buffer) {
548576
{
549577
std::lock_guard<std::mutex> lock(mutex_);
550578
if (priority == MessagePriority::High) {
@@ -554,8 +582,13 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
554582
messages_.emplace_back(std::forward<BuilderType>(builder));
555583
}
556584
}
557-
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
558-
flush();
585+
if (do_empty_buffer && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
586+
if (empty_buffer_method_ == EmptyBufferMethod::Flush) {
587+
flush();
588+
}
589+
else {
590+
purge();
591+
}
559592
}
560593
}
561594

0 commit comments

Comments
 (0)