Skip to content

Commit 3cf9bb5

Browse files
author
accelerated
committed
Added purge (aka async_flush) functionality
1 parent 0c7a3b0 commit 3cf9bb5

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,13 @@ class CPPKAFKA_API BufferedProducer {
169169
* \remark This method throws cppkafka::HandleException on failure
170170
*/
171171
void produce(const Message& message);
172+
173+
/**
174+
* \brief Flushes all buffered messages and returns immediately.
175+
*
176+
* Similar to flush, it will send all messages but will not wait for acks to complete.
177+
*/
178+
void purge();
172179

173180
/**
174181
* \brief Flushes the buffered messages.
@@ -471,7 +478,7 @@ void BufferedProducer<BufferType>::produce(const Message& message) {
471478
}
472479

473480
template <typename BufferType>
474-
void BufferedProducer<BufferType>::flush() {
481+
void BufferedProducer<BufferType>::purge() {
475482
CounterGuard<size_t> counter_guard(flushes_in_progress_);
476483
QueueType flush_queue; // flush from temporary queue
477484
{
@@ -482,6 +489,11 @@ void BufferedProducer<BufferType>::flush() {
482489
async_produce(std::move(flush_queue.front()), false);
483490
flush_queue.pop_front();
484491
}
492+
}
493+
494+
template <typename BufferType>
495+
void BufferedProducer<BufferType>::flush() {
496+
purge();
485497
wait_for_acks();
486498
}
487499

0 commit comments

Comments
 (0)