Skip to content

Commit bb900f2

Browse files
committed
Allow clearing buffered messages on buffered producer
1 parent 5e84da2 commit bb900f2

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

include/cppkafka/utils/buffered_producer.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
#include <string>
55
#include <queue>
6-
#include <type_traits>
76
#include <cstdint>
7+
#include <algorithm>
88
#include <unordered_set>
99
#include <unordered_map>
1010
#include <map>
@@ -93,6 +93,11 @@ class BufferedProducer {
9393
*/
9494
void wait_for_acks();
9595

96+
/**
97+
* Clears any buffered messages
98+
*/
99+
void clear();
100+
96101
/**
97102
* Gets the Producer object
98103
*/
@@ -119,8 +124,7 @@ class BufferedProducer {
119124
*/
120125
void set_produce_failure_callback(ProduceFailureCallback callback);
121126
private:
122-
// Pick the most appropriate index type depending on the platform we're using
123-
using IndexType = std::conditional<sizeof(void*) == 8, uint64_t, uint32_t>::type;
127+
using QueueType = std::queue<Builder>;
124128

125129
template <typename BuilderType>
126130
void do_add_message(BuilderType&& builder);
@@ -129,7 +133,7 @@ class BufferedProducer {
129133
void on_delivery_report(const Message& message);
130134

131135
Producer producer_;
132-
std::queue<Builder> messages_;
136+
QueueType messages_;
133137
ProduceFailureCallback produce_failure_callback_;
134138
size_t expected_acks_{0};
135139
size_t messages_acked_{0};
@@ -187,6 +191,12 @@ void BufferedProducer<BufferType>::wait_for_acks() {
187191
messages_acked_ = 0;
188192
}
189193

194+
template <typename BufferType>
195+
void BufferedProducer<BufferType>::clear() {
196+
QueueType tmp;
197+
std::swap(tmp, messages_);
198+
}
199+
190200
template <typename BufferType>
191201
template <typename BuilderType>
192202
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {

tests/producer_test.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ TEST_F(ProducerTest, BufferedProducer) {
238238
producer.flush();
239239
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
240240
producer.wait_for_acks();
241+
// Add another one but then clear it
242+
producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload));
243+
producer.clear();
241244
runner.try_join();
242245

243246
const auto& messages = runner.get_messages();

0 commit comments

Comments
 (0)