Skip to content

Commit 3c72eb5

Browse files
author
accelerated
committed
Added timeout to flush and wait_for_acks
1 parent 157b7ec commit 3c72eb5

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,27 @@ class CPPKAFKA_API BufferedProducer {
190190
* with respect to other threads.
191191
*/
192192
void flush();
193+
194+
/**
195+
* \brief Flushes the buffered messages and waits up to 'timeout'
196+
*
197+
* \param timeout The maximum time to wait until all acks are received
198+
*
199+
* \return True if the operation completes and all acks have been received.
200+
*/
201+
bool flush(std::chrono::milliseconds timeout);
193202

194203
/**
195204
* Waits for produced message's acknowledgements from the brokers
196205
*/
197206
void wait_for_acks();
207+
208+
/**
209+
* Waits for produced message's acknowledgements from the brokers up to 'timeout'.
210+
*
211+
* \return True if the operation completes and all acks have been received.
212+
*/
213+
bool wait_for_acks(std::chrono::milliseconds timeout);
198214

199215
/**
200216
* Clears any buffered messages
@@ -515,6 +531,12 @@ void BufferedProducer<BufferType>::flush() {
515531
wait_for_acks();
516532
}
517533

534+
template <typename BufferType>
535+
bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout) {
536+
async_flush();
537+
return wait_for_acks(timeout);
538+
}
539+
518540
template <typename BufferType>
519541
void BufferedProducer<BufferType>::wait_for_acks() {
520542
while (pending_acks_ > 0) {
@@ -533,6 +555,31 @@ void BufferedProducer<BufferType>::wait_for_acks() {
533555
}
534556
}
535557

558+
template <typename BufferType>
559+
bool BufferedProducer<BufferType>::wait_for_acks(std::chrono::milliseconds timeout) {
560+
auto remaining = timeout;
561+
auto start_time = std::chrono::high_resolution_clock::now();
562+
while ((pending_acks_ > 0) && (remaining.count() > 0)) {
563+
try {
564+
producer_.flush(remaining);
565+
}
566+
catch (const HandleException& ex) {
567+
// If we just hit the timeout, keep going, otherwise re-throw
568+
if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) {
569+
// There is no time remaining
570+
return (pending_acks_ == 0);
571+
}
572+
else {
573+
throw;
574+
}
575+
}
576+
// calculate remaining time
577+
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
578+
(std::chrono::high_resolution_clock::now() - start_time);
579+
}
580+
return (pending_acks_ == 0);
581+
}
582+
536583
template <typename BufferType>
537584
void BufferedProducer<BufferType>::clear() {
538585
std::lock_guard<std::mutex> lock(mutex_);

0 commit comments

Comments
 (0)