Skip to content

Commit 02cb4b6

Browse files
committed
Improve global logger
1 parent 0d6b4fa commit 02cb4b6

File tree

6 files changed

+87
-93
lines changed

6 files changed

+87
-93
lines changed

include/kafka/ClientCommon.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@
33
#include <kafka/Project.h>
44

55
#include <kafka/Error.h>
6-
#include <kafka/RdKafkaHelper.h>
7-
#include <kafka/Types.h>
8-
9-
#include <librdkafka/rdkafka.h>
106

117
#include <functional>
128

@@ -28,6 +24,19 @@ namespace KAFKA_API { namespace clients {
2824
*/
2925
using StatsCallback = std::function<void(const std::string&)>;
3026

27+
/**
28+
* SASL OAUTHBEARER token info.
29+
*/
30+
struct SaslOauthbearerToken
31+
{
32+
using KeyValuePairs = std::map<std::string, std::string>;
33+
34+
std::string value;
35+
std::chrono::microseconds mdLifetime{};
36+
std::string mdPrincipalName;
37+
KeyValuePairs extensions;
38+
};
39+
3140
/**
3241
* Callback type for OAUTHBEARER token refresh.
3342
*/

include/kafka/KafkaClient.h

Lines changed: 12 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include <climits>
2121
#include <functional>
2222
#include <memory>
23-
#include <mutex>
2423
#include <string>
2524
#include <thread>
2625
#include <vector>
@@ -46,15 +45,6 @@ class KafkaClient
4645
*/
4746
const std::string& name() const { return _clientName; }
4847

49-
/**
50-
* Set a log callback for kafka clients, which do not have a client specific logging callback configured (see `setLogCallback`).
51-
*/
52-
static void setGlobalLogger(Logger logger = NullLogger)
53-
{
54-
std::call_once(Global<>::initOnce, [](){}); // Then no need to init within KafkaClient constructor
55-
Global<>::logger = std::move(logger);
56-
}
57-
5848
/**
5949
* Set log level for the kafka client (the default value: 5).
6050
*/
@@ -90,12 +80,11 @@ class KafkaClient
9080
template<class ...Args>
9181
void doLog(int level, const char* filename, int lineno, const char* format, Args... args) const
9282
{
93-
const auto& logger = _logCb ? _logCb : Global<>::logger;
94-
if (level >= 0 && level <= _logLevel && logger)
83+
if (level >= 0 && level <= _logLevel && _logCb)
9584
{
9685
LogBuffer<LOG_BUFFER_SIZE> logBuffer;
9786
logBuffer.print("%s ", name().c_str()).print(format, args...);
98-
logger(level, filename, lineno, logBuffer.c_str());
87+
_logCb(level, filename, lineno, logBuffer.c_str());
9988
}
10089
}
10190

@@ -106,28 +95,6 @@ class KafkaClient
10695

10796
#define KAFKA_API_DO_LOG(lvl, ...) doLog(lvl, __FILE__, __LINE__, ##__VA_ARGS__)
10897

109-
template<class ...Args>
110-
static void doGlobalLog(int level, const char* filename, int lineno, const char* format, Args... args)
111-
{
112-
if (!Global<>::logger) return;
113-
114-
LogBuffer<LOG_BUFFER_SIZE> logBuffer;
115-
logBuffer.print(format, args...);
116-
Global<>::logger(level, filename, lineno, logBuffer.c_str());
117-
}
118-
static void doGlobalLog(int level, const char* filename, int lineno, const char* msg)
119-
{
120-
doGlobalLog(level, filename, lineno, "%s", msg);
121-
}
122-
123-
/**
124-
* Log for kafka clients, with the callback which `setGlobalLogger` assigned.
125-
*
126-
* E.g,
127-
* KAFKA_API_LOG(Log::Level::Err, "something wrong happened! %s", detailedInfo.c_str());
128-
*/
129-
#define KAFKA_API_LOG(lvl, ...) KafkaClient::doGlobalLog(lvl, __FILE__, __LINE__, ##__VA_ARGS__)
130-
13198
#if COMPILER_SUPPORTS_CPP_17
13299
static constexpr int DEFAULT_METADATA_TIMEOUT_MS = 10000;
133100
#else
@@ -169,14 +136,6 @@ class KafkaClient
169136
// Buffer size for single line logging
170137
static const constexpr int LOG_BUFFER_SIZE = 1024;
171138

172-
// Global logger
173-
template <typename T = void>
174-
struct Global
175-
{
176-
static Logger logger;
177-
static std::once_flag initOnce;
178-
};
179-
180139
// Validate properties (and fix it if necesary)
181140
static Properties validateAndReformProperties(const Properties& properties);
182141

@@ -197,8 +156,8 @@ class KafkaClient
197156
std::string _clientName;
198157

199158
std::atomic<int> _logLevel = {Log::Level::Notice};
200-
Logger _logCb;
201159

160+
LogCallback _logCb = DefaultLogger;
202161
StatsCallback _statsCb;
203162
ErrorCallback _errorCb;
204163
OauthbearerTokenRefreshCallback _oauthbearerTokenRefreshCb;
@@ -247,10 +206,6 @@ class KafkaClient
247206
void interceptThreadStart(const std::string& threadName, const std::string& threadType);
248207
void interceptThreadExit(const std::string& threadName, const std::string& threadType);
249208

250-
static const constexpr char* BOOTSTRAP_SERVERS = "bootstrap.servers";
251-
static const constexpr char* CLIENT_ID = "client.id";
252-
static const constexpr char* LOG_LEVEL = "log_level";
253-
254209
protected:
255210
struct Pollable
256211
{
@@ -331,11 +286,6 @@ class KafkaClient
331286
std::unique_ptr<PollThread> _pollThread;
332287
};
333288

334-
template <typename T>
335-
Logger KafkaClient::Global<T>::logger;
336-
337-
template <typename T>
338-
std::once_flag KafkaClient::Global<T>::initOnce;
339289

340290
inline
341291
KafkaClient::KafkaClient(ClientType clientType,
@@ -345,23 +295,20 @@ KafkaClient::KafkaClient(ClientType clientType,
345295
static const std::set<std::string> PRIVATE_PROPERTY_KEYS = { "max.poll.records" };
346296

347297
// Save clientID
348-
if (auto clientId = properties.getProperty(CLIENT_ID))
298+
if (auto clientId = properties.getProperty(Config::CLIENT_ID))
349299
{
350300
_clientId = *clientId;
351301
_clientName = getClientTypeString(clientType) + "[" + _clientId + "]";
352302
}
353303

354-
// Init global logger
355-
std::call_once(Global<>::initOnce, [](){ Global<>::logger = DefaultLogger; });
356-
357304
// Log Callback
358305
if (properties.contains("log_cb"))
359306
{
360307
setLogCallback(properties.get<LogCallback>("log_cb"));
361308
}
362309

363310
// Save LogLevel
364-
if (auto logLevel = properties.getProperty(LOG_LEVEL))
311+
if (auto logLevel = properties.getProperty(Config::LOG_LEVEL))
365312
{
366313
try
367314
{
@@ -480,7 +427,7 @@ KafkaClient::KafkaClient(ClientType clientType,
480427
KAFKA_THROW_IF_WITH_ERROR(Error(rd_kafka_last_error()));
481428

482429
// Add brokers
483-
auto brokers = properties.getProperty(BOOTSTRAP_SERVERS);
430+
auto brokers = properties.getProperty(Config::BOOTSTRAP_SERVERS);
484431
if (!brokers || rd_kafka_brokers_add(getClientHandle(), brokers->c_str()) == 0)
485432
{
486433
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
@@ -496,22 +443,22 @@ KafkaClient::validateAndReformProperties(const Properties& properties)
496443
auto newProperties = properties;
497444

498445
// BOOTSTRAP_SERVERS property is mandatory
499-
if (!newProperties.getProperty(BOOTSTRAP_SERVERS))
446+
if (!newProperties.getProperty(Config::BOOTSTRAP_SERVERS))
500447
{
501448
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
502-
"Validation failed! With no property [" + std::string(BOOTSTRAP_SERVERS) + "]"));
449+
"Validation failed! With no property [" + std::string(Config::BOOTSTRAP_SERVERS) + "]"));
503450
}
504451

505452
// If no "client.id" configured, generate a random one for user
506-
if (!newProperties.getProperty(CLIENT_ID))
453+
if (!newProperties.getProperty(Config::CLIENT_ID))
507454
{
508-
newProperties.put(CLIENT_ID, utility::getRandomString());
455+
newProperties.put(Config::CLIENT_ID, utility::getRandomString());
509456
}
510457

511458
// If no "log_level" configured, use Log::Level::Notice as default
512-
if (!newProperties.getProperty(LOG_LEVEL))
459+
if (!newProperties.getProperty(Config::LOG_LEVEL))
513460
{
514-
newProperties.put(LOG_LEVEL, std::to_string(static_cast<int>(Log::Level::Notice)));
461+
newProperties.put(Config::LOG_LEVEL, std::to_string(static_cast<int>(Log::Level::Notice)));
515462
}
516463

517464
return newProperties;

include/kafka/Log.h

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <cassert>
1010
#include <functional>
1111
#include <iostream>
12+
#include <mutex>
1213

1314

1415
namespace KAFKA_API {
@@ -36,6 +37,8 @@ struct Log
3637
}
3738
};
3839

40+
41+
// Log Buffer
3942
template <std::size_t MAX_CAPACITY>
4043
class LogBuffer
4144
{
@@ -72,17 +75,66 @@ class LogBuffer
7275
char* _wptr;
7376
};
7477

75-
using Logger = std::function<void(int, const char*, int, const char* msg)>;
7678

79+
// Default Logger
7780
inline void DefaultLogger(int level, const char* /*filename*/, int /*lineno*/, const char* msg)
7881
{
7982
std::cout << "[" << utility::getCurrentTime() << "]" << Log::levelString(static_cast<std::size_t>(level)) << " " << msg;
8083
std::cout << std::endl;
8184
}
8285

86+
// Null Logger
8387
inline void NullLogger(int /*level*/, const char* /*filename*/, int /*lineno*/, const char* /*msg*/)
8488
{
8589
}
8690

91+
92+
// Global Logger
93+
template <typename T = void>
94+
struct GlobalLogger
95+
{
96+
static clients::LogCallback logCb;
97+
static std::once_flag initOnce;
98+
99+
static const constexpr int LOG_BUFFER_SIZE = 1024;
100+
101+
template<class ...Args>
102+
static void doLog(int level, const char* filename, int lineno, const char* format, Args... args)
103+
{
104+
if (!GlobalLogger<>::logCb) return;
105+
106+
LogBuffer<LOG_BUFFER_SIZE> logBuffer;
107+
logBuffer.print(format, args...);
108+
GlobalLogger<>::logCb(level, filename, lineno, logBuffer.c_str());
109+
}
110+
};
111+
112+
template <typename T>
113+
clients::LogCallback GlobalLogger<T>::logCb;
114+
115+
template <typename T>
116+
std::once_flag GlobalLogger<T>::initOnce;
117+
118+
/**
119+
* Set a global log interface for kafka API (Note: it takes no effect on Kafka clients).
120+
*/
121+
inline void setGlobalLogger(clients::LogCallback cb)
122+
{
123+
std::call_once(GlobalLogger<>::initOnce, [](){}); // Then no need to init within the first KAFKA_API_LOG call.
124+
GlobalLogger<>::logCb = std::move(cb);
125+
}
126+
127+
/**
128+
* Log for kafka API (Note: not for Kafka client instances).
129+
*
130+
* E.g,
131+
* KAFKA_API_LOG(Log::Level::Err, "something wrong happened! %s", detailedInfo.c_str());
132+
*/
133+
#define KAFKA_API_LOG(level, ...) do { \
134+
std::call_once(GlobalLogger<>::initOnce, [](){ GlobalLogger<>::logCb = DefaultLogger; }); \
135+
GlobalLogger<>::doLog(level, __FILE__, __LINE__, ##__VA_ARGS__); \
136+
} while (0)
137+
138+
87139
} // end of KAFKA_API
88140

include/kafka/Types.h

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -199,20 +199,5 @@ inline std::string toString(const TopicPartitionOffsets& tpos)
199199
return ret;
200200
}
201201

202-
203-
/**
204-
* SASL OAUTHBEARER token info.
205-
*/
206-
struct SaslOauthbearerToken
207-
{
208-
using KeyValuePairs = std::map<std::string, std::string>;
209-
210-
std::string value;
211-
std::chrono::microseconds mdLifetime{};
212-
std::string mdPrincipalName;
213-
KeyValuePairs extensions;
214-
};
215-
216-
217202
} // end of KAFKA_API
218203

tools/KafkaConsoleConsumer.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ void RunConsumer(const std::string& topic, const kafka::clients::Config& props)
8686
using namespace kafka::clients;
8787
using namespace kafka::clients::consumer;
8888

89-
// Create a manual-commit consumer
90-
KafkaClient::setGlobalLogger(kafka::Logger());
89+
// Create a auto-commit consumer
9190
KafkaConsumer consumer(props);
9291

9392
// Subscribe to topic
@@ -142,9 +141,8 @@ int main (int argc, char **argv)
142141
// Use Ctrl-C to terminate the program
143142
signal(SIGINT, stopRunning); // NOLINT
144143

145-
// Prepare consumer properties
146-
//
147144
using namespace kafka::clients;
145+
// Prepare consumer properties
148146
Config props;
149147
props.put(Config::BOOTSTRAP_SERVERS, boost::algorithm::join(args->brokerList, ","));
150148
// Get client id
@@ -156,6 +154,8 @@ int main (int argc, char **argv)
156154
{
157155
props.put(prop.first, prop.second);
158156
}
157+
// Disable logging
158+
props.put(Config::LOG_CB, kafka::NullLogger);
159159

160160
// Start consumer
161161
try

tools/KafkaConsoleProducer.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,10 @@ int main (int argc, char **argv)
9797
{
9898
props.put(prop.first, prop.second);
9999
}
100+
// Disable logging
101+
props.put(Config::LOG_CB, kafka::NullLogger);
100102

101-
// Create a sync-send producer
102-
KafkaClient::setGlobalLogger(kafka::Logger());
103+
// Create a producer
103104
KafkaProducer producer(props);
104105

105106
auto startPromptLine = []() { std::cout << "> "; };
@@ -131,9 +132,9 @@ int main (int argc, char **argv)
131132
startPromptLine();
132133
}
133134
}
134-
catch (const std::exception& e)
135+
catch (const kafka::KafkaException& e)
135136
{
136-
std::cout << e.what() << std::endl;
137+
std::cerr << "Exception thrown by producer: " << e.what() << std::endl;
137138
return EXIT_FAILURE;
138139
}
139140

0 commit comments

Comments
 (0)