Skip to content

Commit e352a20

Browse files
committed
Improve KafkaRecoverableProducer
1 parent 3111fe8 commit e352a20

File tree

1 file changed

+9
-16
lines changed

1 file changed

+9
-16
lines changed

include/kafka/addons/KafkaRecoverableProducer.h

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22

33
#include <kafka/Project.h>
44

5-
#include <kafka/ClientCommon.h>
6-
#include <kafka/KafkaClient.h>
75
#include <kafka/KafkaProducer.h>
8-
#include <kafka/Types.h>
96

10-
#include <deque>
7+
#include <atomic>
8+
#include <memory>
119
#include <mutex>
12-
#include <vector>
10+
#include <thread>
1311

1412

1513
namespace KAFKA_API { namespace clients { namespace producer {
@@ -20,8 +18,8 @@ class KafkaRecoverableProducer
2018
explicit KafkaRecoverableProducer(const Properties& properties)
2119
: _properties(properties), _running(true)
2220
{
23-
_properties.put("enable.manual.events.poll", "true");
24-
_properties.put("error_cb", [this](const Error& error) { if (error.isFatal()) _fatalError = std::make_unique<Error>(error); });
21+
_properties.put(Config::ENABLE_MANUAL_EVENTS_POLL, "true");
22+
_properties.put(Config::ERROR_CB, [this](const Error& error) { if (error.isFatal()) _fatalError = std::make_unique<Error>(error); });
2523

2624
_producer = createProducer();
2725

@@ -60,8 +58,8 @@ class KafkaRecoverableProducer
6058
{
6159
const std::lock_guard<std::mutex> lock(_producerMutex);
6260

63-
_logLevel = level;
64-
_producer->setLogLevel(*_logLevel);
61+
_properties.put(Config::LOG_LEVEL, std::to_string(level));
62+
_producer->setLogLevel(level);
6563
}
6664

6765
/**
@@ -295,16 +293,11 @@ class KafkaRecoverableProducer
295293

296294
std::unique_ptr<KafkaProducer> createProducer()
297295
{
298-
auto producer = std::make_unique<KafkaProducer>(_properties);
299-
300-
if (_logLevel) producer->setLogLevel(*_logLevel);
301-
302-
return producer;
296+
return std::make_unique<KafkaProducer>(_properties);
303297
}
304298

305299
// Configurations for producer
306-
Properties _properties;
307-
Optional<int> _logLevel;
300+
Properties _properties;
308301

309302
std::unique_ptr<Error> _fatalError;
310303

0 commit comments

Comments
 (0)