@@ -190,11 +190,27 @@ class CPPKAFKA_API BufferedProducer {
190190 * with respect to other threads.
191191 */
192192 void flush ();
193+
194+ /* *
195+ * \brief Flushes the buffered messages and waits up to 'timeout'
196+ *
197+ * \param timeout The maximum time to wait until all acks are received
198+ *
199+ * \return True if the operation completes and all acks have been received.
200+ */
201+ bool flush (std::chrono::milliseconds timeout);
193202
194203 /* *
195204 * Waits for produced message's acknowledgements from the brokers
196205 */
197206 void wait_for_acks ();
207+
208+ /* *
209+ * Waits for produced message's acknowledgements from the brokers up to 'timeout'.
210+ *
211+ * \return True if the operation completes and all acks have been received.
212+ */
213+ bool wait_for_acks (std::chrono::milliseconds timeout);
198214
199215 /* *
200216 * Clears any buffered messages
@@ -515,6 +531,12 @@ void BufferedProducer<BufferType>::flush() {
515531 wait_for_acks ();
516532}
517533
534+ template <typename BufferType>
535+ bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout) {
536+ async_flush ();
537+ return wait_for_acks (timeout);
538+ }
539+
518540template <typename BufferType>
519541void BufferedProducer<BufferType>::wait_for_acks() {
520542 while (pending_acks_ > 0 ) {
@@ -533,6 +555,31 @@ void BufferedProducer<BufferType>::wait_for_acks() {
533555 }
534556}
535557
558+ template <typename BufferType>
559+ bool BufferedProducer<BufferType>::wait_for_acks(std::chrono::milliseconds timeout) {
560+ auto remaining = timeout;
561+ auto start_time = std::chrono::high_resolution_clock::now ();
562+ while ((pending_acks_ > 0 ) && (remaining.count () > 0 )) {
563+ try {
564+ producer_.flush (remaining);
565+ }
566+ catch (const HandleException& ex) {
567+ // If we just hit the timeout, keep going, otherwise re-throw
568+ if (ex.get_error () == RD_KAFKA_RESP_ERR__TIMED_OUT) {
569+ // There is no time remaining
570+ return (pending_acks_ == 0 );
571+ }
572+ else {
573+ throw ;
574+ }
575+ }
576+ // calculate remaining time
577+ remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
578+ (std::chrono::high_resolution_clock::now () - start_time);
579+ }
580+ return (pending_acks_ == 0 );
581+ }
582+
536583template <typename BufferType>
537584void BufferedProducer<BufferType>::clear() {
538585 std::lock_guard<std::mutex> lock (mutex_);
0 commit comments