Skip to content

Commit 785213e

Browse files
committed
Add error notification interface for Kafka clients
1 parent 8effbf2 commit 785213e

File tree

2 files changed

+54
-8
lines changed

2 files changed

+54
-8
lines changed

include/kafka/Error.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,12 @@ class Error
3434
// The error with rich info
3535
explicit Error(rd_kafka_error_t* error = nullptr): _rkError(error, RkErrorDeleter) {}
3636
// The error with brief info
37-
explicit Error(rd_kafka_resp_err_t respErr): _rkRespErr(respErr) {}
37+
explicit Error(rd_kafka_resp_err_t respErr): _respErr(respErr) {}
3838
// The error with detailed message
39-
Error(rd_kafka_resp_err_t respErr, std::string message): _rkRespErr(respErr), _message(std::move(message)) {}
39+
Error(rd_kafka_resp_err_t respErr, std::string message)
40+
: _respErr(respErr), _message(std::move(message)) {}
41+
Error(rd_kafka_resp_err_t respErr, std::string message, bool fatal)
42+
: _respErr(respErr), _message(std::move(message)), _fatal(fatal) {}
4043

4144
explicit operator bool() const { return static_cast<bool>(value()); }
4245

@@ -58,7 +61,7 @@ class Error
5861
*/
5962
int value() const
6063
{
61-
return static_cast<int>(_rkError ? rd_kafka_error_code(_rkError.get()) : _rkRespErr);
64+
return static_cast<int>(_rkError ? rd_kafka_error_code(_rkError.get()) : _respErr);
6265
}
6366

6467
/**
@@ -67,15 +70,15 @@ class Error
6770
std::string message() const
6871
{
6972
return _message ? *_message :
70-
(_rkError ? rd_kafka_error_string(_rkError.get()) : rd_kafka_err2str(_rkRespErr));
73+
(_rkError ? rd_kafka_error_string(_rkError.get()) : rd_kafka_err2str(_respErr));
7174
}
7275

7376
/**
7477
* Fatal error indicates that the client instance is no longer usable.
7578
*/
7679
Optional<bool> isFatal() const
7780
{
78-
return _rkError ? rd_kafka_error_is_fatal(_rkError.get()) : Optional<bool>{};
81+
return _rkError ? rd_kafka_error_is_fatal(_rkError.get()) : _fatal;
7982
}
8083

8184
/**
@@ -87,9 +90,10 @@ class Error
8790
}
8891

8992
private:
90-
rd_kafka_error_shared_ptr _rkError; // For error with rich info
91-
rd_kafka_resp_err_t _rkRespErr{}; // For error with a simple response code
92-
Optional<std::string> _message; // For additional detailed message
93+
rd_kafka_error_shared_ptr _rkError; // For error with rich info
94+
rd_kafka_resp_err_t _respErr{}; // For error with a simple response code
95+
Optional<std::string> _message; // Additional detailed message (if any)
96+
Optional<bool> _fatal; // Fatal flag (if any)
9397
};
9498

9599
} // end of KAFKA_API

include/kafka/KafkaClient.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class KafkaClient
3232
protected:
3333
using ConfigCallbacksRegister = std::function<void(rd_kafka_conf_t*)>;
3434
using StatsCallback = std::function<void(const std::string&)>;
35+
using ErrorCallback = std::function<void(const Error&)>;
3536

3637
enum class ClientType { KafkaConsumer, KafkaProducer, AdminClient };
3738
static std::string getClientTypeString(ClientType type)
@@ -91,6 +92,11 @@ class KafkaClient
9192
*/
9293
void setStatsCallback(StatsCallback cb) { _statsCb = std::move(cb); }
9394

95+
/**
96+
* Set callback for error notification.
97+
*/
98+
void setErrorCallback(ErrorCallback cb) { _errorCb = std::move(cb); }
99+
94100
/**
95101
* Return the properties which took effect.
96102
*/
@@ -172,6 +178,9 @@ class KafkaClient
172178
// Statistics callback (for librdkafka)
173179
static int statsCallback(rd_kafka_t* rk, char* jsonStrBuf, size_t jsonStrLen, void* opaque);
174180

181+
// Error callback (for librdkafka)
182+
static void errorCallback(rd_kafka_t* rk, int err, const char* reason, void* opaque);
183+
175184
// Validate properties (and fix it if necesary)
176185
static Properties validateAndReformProperties(const Properties& origProperties);
177186

@@ -191,6 +200,7 @@ class KafkaClient
191200
Logger _logger;
192201
Properties _properties;
193202
StatsCallback _statsCb;
203+
ErrorCallback _errorCb;
194204
rd_kafka_unique_ptr _rk;
195205

196206
// Log callback (for class instance)
@@ -199,6 +209,9 @@ class KafkaClient
199209
// Stats callback (for class instance)
200210
void onStats(const std::string& jsonString);
201211

212+
// Error callback (for class instance)
213+
void onError(const Error& error);
214+
202215
static const constexpr char* BOOTSTRAP_SERVERS = "bootstrap.servers";
203216
static const constexpr char* CLIENT_ID = "client.id";
204217
static const constexpr char* LOG_LEVEL = "log_level";
@@ -341,6 +354,9 @@ KafkaClient::KafkaClient(ClientType clientType,
341354
// Statistics Callback
342355
rd_kafka_conf_set_stats_cb(rk_conf.get(), KafkaClient::statsCallback);
343356

357+
// Error Callback
358+
rd_kafka_conf_set_error_cb(rk_conf.get(), KafkaClient::errorCallback);
359+
344360
// Other Callbacks
345361
if (registerCallbacks)
346362
{
@@ -466,6 +482,32 @@ KafkaClient::statsCallback(rd_kafka_t* rk, char* jsonStrBuf, size_t jsonStrLen,
466482
return 0;
467483
}
468484

485+
inline void
486+
KafkaClient::onError(const Error& error)
487+
{
488+
if (_errorCb) _errorCb(error);
489+
}
490+
491+
inline void
492+
KafkaClient::errorCallback(rd_kafka_t* rk, int err, const char* reason, void* /*opaque*/)
493+
{
494+
auto respErr = static_cast<rd_kafka_resp_err_t>(err);
495+
496+
Error error;
497+
if (respErr != RD_KAFKA_RESP_ERR__FATAL)
498+
{
499+
error = Error{respErr, reason};
500+
}
501+
else
502+
{
503+
LogBuffer<LOG_BUFFER_SIZE> errInfo;
504+
respErr = rd_kafka_fatal_error(rk, errInfo.str(), errInfo.capacity());
505+
error = Error{respErr, errInfo.c_str(), true};
506+
}
507+
508+
kafkaClient(rk).onError(error);
509+
}
510+
469511
inline Optional<BrokerMetadata>
470512
KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::milliseconds timeout, bool disableErrorLogging)
471513
{

0 commit comments

Comments
 (0)