Skip to content

Commit b81072d

Browse files
committed
New addon: KafkaRecoverableProducer
1 parent ff9fbe1 commit b81072d

File tree

2 files changed

+525
-0
lines changed

2 files changed

+525
-0
lines changed
Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
#pragma once
2+
3+
#include "kafka/Project.h"
4+
5+
#include "kafka/KafkaClient.h"
6+
#include "kafka/KafkaProducer.h"
7+
#include "kafka/Types.h"
8+
9+
#include <deque>
10+
#include <mutex>
11+
#include <vector>
12+
13+
namespace KAFKA_API::clients {
14+
15+
class KafkaRecoverableProducer
16+
{
17+
public:
18+
explicit KafkaRecoverableProducer(const Properties& properties)
19+
: _properties(properties), _running(true)
20+
{
21+
_errorCb = [this](const Error& error) {
22+
if (error.isFatal()) _fatalError = std::make_unique<Error>(error);
23+
};
24+
25+
_producer = createProducer();
26+
27+
_pollThread = std::thread([this]() { keepPolling(); });
28+
}
29+
30+
~KafkaRecoverableProducer()
31+
{
32+
std::lock_guard<std::mutex> lock(_producerMutex);
33+
34+
_producer->close();
35+
36+
_running = false;
37+
if (_pollThread.joinable()) _pollThread.join();
38+
}
39+
40+
/**
41+
* Get the client id.
42+
*/
43+
const std::string& clientId() const
44+
{
45+
std::lock_guard<std::mutex> lock(_producerMutex);
46+
47+
return _producer->clientId();
48+
}
49+
50+
/**
51+
* Get the client name (i.e. client type + id).
52+
*/
53+
const std::string& name() const
54+
{
55+
// lock
56+
return _producer->name();
57+
}
58+
59+
/**
60+
* Set the log callback for the kafka client (it's a per-client setting).
61+
*/
62+
void setLogger(const Logger& logger)
63+
{
64+
std::lock_guard<std::mutex> lock(_producerMutex);
65+
66+
_logger = logger;
67+
_producer->setLogger(*_logger);
68+
}
69+
70+
/**
71+
* Set log level for the kafka client (the default value: 5).
72+
*/
73+
void setLogLevel(int level)
74+
{
75+
std::lock_guard<std::mutex> lock(_producerMutex);
76+
77+
_logLevel = level;
78+
_producer->setLogLevel(*_logLevel);
79+
}
80+
81+
/**
82+
* Set callback to receive the periodic statistics info.
83+
* Note: 1) It only works while the "statistics.interval.ms" property is configured with a non-0 value.
84+
* 2) The callback would be triggered periodically, receiving the internal statistics info (with JSON format) emited from librdkafka.
85+
*/
86+
void setStatsCallback(const KafkaClient::StatsCallback& cb)
87+
{
88+
std::lock_guard<std::mutex> lock(_producerMutex);
89+
90+
_statsCb = cb;
91+
_producer->setStatsCallback(*_statsCb);
92+
}
93+
94+
void setErrorCallback(const KafkaClient::ErrorCallback& cb)
95+
{
96+
std::lock_guard<std::mutex> lock(_producerMutex);
97+
98+
_errorCb = [cb, this](const Error& error) {
99+
cb(error);
100+
101+
if (error.isFatal()) _fatalError = std::make_unique<Error>(error);
102+
};
103+
_producer->setErrorCallback(*_errorCb);
104+
}
105+
106+
/**
107+
* Return the properties which took effect.
108+
*/
109+
const Properties& properties() const
110+
{
111+
std::lock_guard<std::mutex> lock(_producerMutex);
112+
113+
return _producer->properties();
114+
}
115+
116+
/**
117+
* Fetch the effected property (including the property internally set by librdkafka).
118+
*/
119+
Optional<std::string> getProperty(const std::string& name) const
120+
{
121+
std::lock_guard<std::mutex> lock(_producerMutex);
122+
123+
return _producer->getProperty(name);
124+
}
125+
126+
/**
127+
* Fetch matadata from a available broker.
128+
* Note: the Metadata response information may trigger a re-join if any subscribed topic has changed partition count or existence state.
129+
*/
130+
Optional<BrokerMetadata> fetchBrokerMetadata(const std::string& topic,
131+
std::chrono::milliseconds timeout = std::chrono::milliseconds(KafkaClient::DEFAULT_METADATA_TIMEOUT_MS),
132+
bool disableErrorLogging = false)
133+
{
134+
std::lock_guard<std::mutex> lock(_producerMutex);
135+
136+
return _producer->fetchBrokerMetadata(topic, timeout, disableErrorLogging);
137+
}
138+
139+
/**
140+
* Invoking this method makes all buffered records immediately available to send, and blocks on the completion of the requests associated with these records.
141+
*
142+
* Possible error values:
143+
* - RD_KAFKA_RESP_ERR__TIMED_OUT: The `timeout` was reached before all outstanding requests were completed.
144+
*/
145+
Error flush(std::chrono::milliseconds timeout = std::chrono::milliseconds::max())
146+
{
147+
std::lock_guard<std::mutex> lock(_producerMutex);
148+
149+
return _producer->flush(timeout);
150+
}
151+
152+
/**
153+
* Purge messages currently handled by the KafkaProducer.
154+
*/
155+
Error purge()
156+
{
157+
std::lock_guard<std::mutex> lock(_producerMutex);
158+
159+
return _producer->purge();
160+
}
161+
162+
/**
163+
* Close this producer. This method would wait up to timeout for the producer to complete the sending of all incomplete requests (before purging them).
164+
*/
165+
void close(std::chrono::milliseconds timeout = std::chrono::milliseconds::max())
166+
{
167+
std::lock_guard<std::mutex> lock(_producerMutex);
168+
169+
_producer->close(timeout);
170+
}
171+
172+
/**
173+
* Synchronously send a record to a topic.
174+
* Throws KafkaException with errors:
175+
* Local errors,
176+
* - RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC: The topic doesn't exist
177+
* - RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: The partition doesn't exist
178+
* - RD_KAFKA_RESP_ERR__INVALID_ARG: Invalid topic(topic is null, or the length is too long (> 512)
179+
* - RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: No ack received within the time limit
180+
* Broker errors,
181+
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
182+
*/
183+
producer::RecordMetadata syncSend(const producer::ProducerRecord& record)
184+
{
185+
std::lock_guard<std::mutex> lock(_producerMutex);
186+
187+
return _producer->syncSend(record);
188+
}
189+
190+
/**
191+
* Asynchronously send a record to a topic.
192+
*
193+
* Note:
194+
* - If a callback is provided, it's guaranteed to be triggered (before closing the producer).
195+
* - If any error occured, an exception would be thrown.
196+
* - Make sure the memory block (for ProducerRecord's value) is valid until the delivery callback finishes; Otherwise, should be with option `KafkaProducer::SendOption::ToCopyRecordValue`.
197+
*
198+
* Possible errors:
199+
* Local errors,
200+
* - RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC: The topic doesn't exist
201+
* - RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: The partition doesn't exist
202+
* - RD_KAFKA_RESP_ERR__INVALID_ARG: Invalid topic(topic is null, or the length is too long (> 512)
203+
* - RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: No ack received within the time limit
204+
* - RD_KAFKA_RESP_ERR__QUEUE_FULL: The message buffing queue is full
205+
* Broker errors,
206+
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
207+
*/
208+
void send(const producer::ProducerRecord& record,
209+
const producer::Callback& deliveryCb,
210+
KafkaProducer::SendOption option = KafkaProducer::SendOption::NoCopyRecordValue)
211+
{
212+
std::lock_guard<std::mutex> lock(_producerMutex);
213+
214+
_producer->send(record, deliveryCb, option);
215+
216+
}
217+
218+
/**
219+
* Asynchronously send a record to a topic.
220+
*
221+
* Note:
222+
* - If a callback is provided, it's guaranteed to be triggered (before closing the producer).
223+
* - The input reference parameter `error` will be set if an error occurred.
224+
* - Make sure the memory block (for ProducerRecord's value) is valid until the delivery callback finishes; Otherwise, should be with option `KafkaProducer::SendOption::ToCopyRecordValue`.
225+
*
226+
* Possible errors:
227+
* Local errors,
228+
* - RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC: The topic doesn't exist
229+
* - RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: The partition doesn't exist
230+
* - RD_KAFKA_RESP_ERR__INVALID_ARG: Invalid topic(topic is null, or the length is too long (> 512)
231+
* - RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: No ack received within the time limit
232+
* - RD_KAFKA_RESP_ERR__QUEUE_FULL: The message buffing queue is full
233+
* Broker errors,
234+
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
235+
*/
236+
237+
void send(const producer::ProducerRecord& record,
238+
const producer::Callback& deliveryCb,
239+
Error& error,
240+
KafkaProducer::SendOption option = KafkaProducer::SendOption::NoCopyRecordValue)
241+
{
242+
std::lock_guard<std::mutex> lock(_producerMutex);
243+
244+
_producer->send(record, deliveryCb, error, option);
245+
}
246+
247+
/**
248+
* Needs to be called before any other methods when the transactional.id is set in the configuration.
249+
*/
250+
void initTransactions(std::chrono::milliseconds timeout = std::chrono::milliseconds(KafkaProducer::DEFAULT_INIT_TRANSACTIONS_TIMEOUT_MS))
251+
{
252+
std::lock_guard<std::mutex> lock(_producerMutex);
253+
254+
_producer->initTransactions(timeout);
255+
}
256+
257+
/**
258+
* Should be called before the start of each new transaction.
259+
*/
260+
void beginTransaction()
261+
{
262+
std::lock_guard<std::mutex> lock(_producerMutex);
263+
264+
_producer->beginTransaction();
265+
}
266+
267+
/**
268+
* Commit the ongoing transaction.
269+
*/
270+
void commitTransaction(std::chrono::milliseconds timeout = std::chrono::milliseconds(KafkaProducer::DEFAULT_COMMIT_TRANSACTION_TIMEOUT_MS))
271+
{
272+
std::lock_guard<std::mutex> lock(_producerMutex);
273+
274+
_producer->commitTransaction(timeout);
275+
}
276+
277+
/**
278+
* Abort the ongoing transaction.
279+
*/
280+
void abortTransaction(std::chrono::milliseconds timeout = std::chrono::milliseconds::max())
281+
{
282+
std::lock_guard<std::mutex> lock(_producerMutex);
283+
284+
_producer->abortTransaction(timeout);
285+
}
286+
287+
/**
288+
* Send a list of specified offsets to the consumer group coodinator, and also marks those offsets as part of the current transaction.
289+
*/
290+
void sendOffsetsToTransaction(const TopicPartitionOffsets& topicPartitionOffsets,
291+
const consumer::ConsumerGroupMetadata& groupMetadata,
292+
std::chrono::milliseconds timeout)
293+
{
294+
std::lock_guard<std::mutex> lock(_producerMutex);
295+
296+
_producer->sendOffsetsToTransaction(topicPartitionOffsets, groupMetadata, timeout);
297+
}
298+
299+
#ifdef KAFKA_API_ENABLE_UNIT_TEST_STUBS
300+
void mockFatalError()
301+
{
302+
_fatalError = std::make_unique<Error>(RD_KAFKA_RESP_ERR__FATAL, "fake fatal error", true);
303+
}
304+
#endif
305+
306+
private:
307+
void keepPolling()
308+
{
309+
while (_running)
310+
{
311+
_producer->pollEvents(std::chrono::milliseconds(1));
312+
if (_fatalError)
313+
{
314+
KAFKA_API_LOG(Log::Level::Notice, "met fatal error[%s], would re-initialze the internal producer");
315+
316+
std::lock_guard<std::mutex> lock(_producerMutex);
317+
318+
if (!_running) return;
319+
320+
_producer->purge();
321+
_producer->close();
322+
323+
_fatalError.reset();
324+
325+
_producer = createProducer();
326+
}
327+
}
328+
}
329+
330+
std::unique_ptr<KafkaProducer> createProducer()
331+
{
332+
auto producer = std::make_unique<KafkaProducer>(_properties, KafkaClient::EventsPollingOption::Manual);
333+
334+
if (_logger) producer->setLogger(*_logger);
335+
if (_logLevel) producer->setLogLevel(*_logLevel);
336+
if (_statsCb) producer->setStatsCallback(*_statsCb);
337+
if (_errorCb) producer->setErrorCallback(*_errorCb);
338+
339+
return producer;
340+
}
341+
342+
// Configurations for producer
343+
Properties _properties;
344+
Optional<Logger> _logger;
345+
Optional<int> _logLevel;
346+
Optional<KafkaClient::StatsCallback> _statsCb;
347+
Optional<KafkaClient::ErrorCallback> _errorCb;
348+
349+
std::unique_ptr<Error> _fatalError;
350+
351+
std::atomic<bool> _running;
352+
std::thread _pollThread;
353+
354+
mutable std::mutex _producerMutex;
355+
std::unique_ptr<KafkaProducer> _producer;
356+
};
357+
358+
} // end of KAFKA_API::clients
359+

0 commit comments

Comments
 (0)