Skip to content

Commit 9c9d1e4

Browse files
committed
Add BackoffCommitter class
1 parent 9594ddc commit 9c9d1e4

File tree

5 files changed

+294
-3
lines changed

5 files changed

+294
-3
lines changed

include/cppkafka/producer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase {
8181
/**
8282
* The policy to use for the payload. The default policy is COPY_PAYLOAD
8383
*/
84-
enum PayloadPolicy {
84+
enum class PayloadPolicy {
8585
COPY_PAYLOAD = RD_KAFKA_MSG_F_COPY, ///< Means RD_KAFKA_MSG_F_COPY
8686
FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE
8787
};
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_BACKOFF_COMMITTER_H
31+
#define CPPKAFKA_BACKOFF_COMMITTER_H
32+
33+
#include <chrono>
34+
#include <functional>
35+
#include <thread>
36+
#include "../consumer.h"
37+
38+
namespace cppkafka {
39+
40+
/**
41+
* \brief Allows performing synchronous commits that will backoff until successful
42+
*
43+
* This class serves as a simple wrapper around Consumer::commit, allowing to commit
44+
* messages and topic/partition/offset tuples handling errors and performing a backoff
45+
* whenever the commit fails.
46+
*
47+
* Both linear and exponential backoff policies are supported, the former one being
48+
* the default.
49+
*
50+
* Example code on how to use this:
51+
*
52+
* \code
53+
* // Create a consumer
54+
* Consumer consumer(...);
55+
*
56+
* // Create a committer using this consumer
57+
* BackoffCommitter committer(consumer);
58+
*
59+
* // Set an error callback. This is optional and allows having some feedback
60+
* // when commits fail. If the callback returns false, then this message won't
61+
* // be committer again.
62+
* committer.set_error_callback([](Error error) {
63+
* cout << "Error committing: " << error << endl;
64+
* return true;
65+
* });
66+
*
67+
* // Now commit. If there's an error, this will retry forever
68+
* committer.commit(some_message);
69+
* \endcode
70+
*/
71+
class BackoffCommitter {
72+
public:
73+
using TimeUnit = std::chrono::milliseconds;
74+
static constexpr TimeUnit DEFAULT_INITIAL_BACKOFF{100};
75+
static constexpr TimeUnit DEFAULT_BACKOFF_STEP{50};
76+
static constexpr TimeUnit DEFAULT_MAXIMUM_BACKOFF{1000};
77+
78+
/**
79+
* \brief The error callback.
80+
*
81+
* Whenever an error occurs comitting an offset, this callback will be executed using
82+
* the generated error. While the function returns true, then this is offset will be
83+
* committed again until it either succeeds or the function returns false.
84+
*/
85+
using ErrorCallback = std::function<bool(Error)>;
86+
87+
/**
88+
* The backoff policy to use
89+
*/
90+
enum class BackoffPolicy {
91+
LINEAR,
92+
EXPONENTIAL
93+
};
94+
95+
/**
96+
* \brief Constructs an instance using default values
97+
*
98+
* By default, the linear backoff policy is used
99+
*
100+
* \param consumer The consumer to use for committing offsets
101+
*/
102+
BackoffCommitter(Consumer& consumer);
103+
104+
/**
105+
* \brief Sets the backoff policy
106+
*
107+
* \param policy The backoff policy to be used
108+
*/
109+
void set_backoff_policy(BackoffPolicy policy);
110+
111+
/**
112+
* \brief Sets the initial backoff
113+
*
114+
* The first time a commit fails, this will be the delay between the request is sent
115+
* and we re-try doing so
116+
*
117+
* \param value The value to be used
118+
*/
119+
void set_initial_backoff(TimeUnit value);
120+
121+
/**
122+
* \brief Sets the backoff step
123+
*
124+
* When using the linear backoff policy, this will be the delay between sending a request
125+
* that fails and re-trying it
126+
*
127+
* \param value The value to be used
128+
*/
129+
void set_backoff_step(TimeUnit value);
130+
131+
/**
132+
* \brief Sets the maximum backoff
133+
*
134+
* The backoff used will never be larger than this number
135+
*
136+
* \param value The value to be used
137+
*/
138+
void set_maximum_backoff(TimeUnit value);
139+
140+
/**
141+
* \brief Sets the error callback
142+
*
143+
* \sa ErrorCallback
144+
* \param callback The callback to be set
145+
*/
146+
void set_error_callback(ErrorCallback callback);
147+
148+
/**
149+
* \brief Commits the given message synchronously
150+
*
151+
* This will call Consumer::commit until either the message is successfully
152+
* committed or the error callback returns false (if any is set).
153+
*
154+
* \param msg The message to be committed
155+
*/
156+
void commit(const Message& msg);
157+
158+
/**
159+
* \brief Commits the offsets on the given topic/partitions synchronously
160+
*
161+
* This will call Consumer::commit until either the offsets are successfully
162+
* committed or the error callback returns false (if any is set).
163+
*
164+
* \param topic_partitions The topic/partition list to be committed
165+
*/
166+
void commit(const TopicPartitionList& topic_partitions);
167+
private:
168+
TimeUnit increase_backoff(TimeUnit backoff);
169+
170+
template <typename T>
171+
void do_commit(const T& object) {
172+
TimeUnit backoff = initial_backoff_;
173+
while (true) {
174+
auto start = std::chrono::steady_clock::now();
175+
try {
176+
consumer_.commit(object);
177+
// If the commit succeeds, we're done
178+
return;
179+
}
180+
catch (const HandleException& ex) {
181+
// If there's a callback and it returns false for this message, abort
182+
if (callback_ && !callback_(ex.get_error())) {
183+
return;
184+
}
185+
}
186+
187+
auto end = std::chrono::steady_clock::now();
188+
auto time_elapsed = end - start;
189+
// If we still have time left, then sleep
190+
if (time_elapsed < backoff) {
191+
std::this_thread::sleep_for(backoff - time_elapsed);
192+
}
193+
// Increase out backoff depending on the policy being used
194+
backoff = increase_backoff(backoff);
195+
}
196+
}
197+
198+
Consumer& consumer_;
199+
TimeUnit initial_backoff_;
200+
TimeUnit backoff_step_;
201+
TimeUnit maximum_backoff_;
202+
ErrorCallback callback_;
203+
BackoffPolicy policy_;
204+
};
205+
206+
} // cppkafka
207+
208+
#endif // CPPKAFKA_BACKOFF_COMMITTER_H

src/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ set(SOURCES
1515
kafka_handle_base.cpp
1616
producer.cpp
1717
consumer.cpp
18+
19+
utils/backoff_committer.cpp
1820
)
1921

2022
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka)

src/producer.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ using std::chrono::milliseconds;
3939
namespace cppkafka {
4040

4141
Producer::Producer(Configuration config)
42-
: KafkaHandleBase(move(config)) {
42+
: KafkaHandleBase(move(config)), message_payload_policy_(PayloadPolicy::COPY_PAYLOAD) {
4343
char error_buffer[512];
4444
auto config_handle = get_configuration().get_handle();
4545
rd_kafka_conf_set_opaque(config_handle, this);
@@ -51,7 +51,6 @@ Producer::Producer(Configuration config)
5151
}
5252
rd_kafka_set_log_level(ptr, 7);
5353
set_handle(ptr);
54-
set_payload_policy(Producer::COPY_PAYLOAD);
5554
}
5655

5756
void Producer::set_payload_policy(PayloadPolicy policy) {

src/utils/backoff_committer.cpp

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#include <algorithm>
31+
#include "utils/backoff_committer.h"
32+
33+
using std::min;
34+
35+
namespace cppkafka {
36+
37+
BackoffCommitter::BackoffCommitter(Consumer& consumer)
38+
: consumer_(consumer), initial_backoff_(DEFAULT_INITIAL_BACKOFF),
39+
backoff_step_(DEFAULT_BACKOFF_STEP), maximum_backoff_(DEFAULT_MAXIMUM_BACKOFF),
40+
policy_(BackoffPolicy::LINEAR) {
41+
42+
}
43+
44+
void BackoffCommitter::set_backoff_policy(BackoffPolicy policy) {
45+
policy_ = policy;
46+
}
47+
48+
void BackoffCommitter::set_initial_backoff(TimeUnit value) {
49+
initial_backoff_ = value;
50+
}
51+
52+
void BackoffCommitter::set_backoff_step(TimeUnit value) {
53+
backoff_step_ = value;
54+
}
55+
56+
void BackoffCommitter::set_maximum_backoff(TimeUnit value) {
57+
maximum_backoff_ = value;
58+
}
59+
60+
void BackoffCommitter::set_error_callback(ErrorCallback callback) {
61+
callback_ = move(callback);
62+
}
63+
64+
void BackoffCommitter::commit(const Message& msg) {
65+
do_commit(msg);
66+
}
67+
68+
void BackoffCommitter::commit(const TopicPartitionList& topic_partitions) {
69+
do_commit(topic_partitions);
70+
}
71+
72+
BackoffCommitter::TimeUnit BackoffCommitter::increase_backoff(TimeUnit backoff) {
73+
if (policy_ == BackoffPolicy::LINEAR) {
74+
backoff = backoff + backoff_step_;
75+
}
76+
else {
77+
backoff = backoff * 2;
78+
}
79+
return min(backoff, maximum_backoff_);
80+
}
81+
82+
} // cppkafka

0 commit comments

Comments
 (0)