Skip to content

Commit 307ac51

Browse files
committed
Use unified Error for all interfaces
1 parent 73d6bc5 commit 307ac51

24 files changed

+454
-415
lines changed

.clang-tidy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Checks: "*,\
3131
-cppcoreguidelines-no-malloc,\
3232
-cppcoreguidelines-non-private-member-variables-in-classes,\
3333
-cppcoreguidelines-pro-bounds-constant-array-index,\
34+
-cppcoreguidelines-pro-type-union-access,\
3435
-misc-non-private-member-variables-in-classes,\
3536
-readability-magic-numbers,\
3637
-readability-implicit-bool-conversion,\

examples/kafka_async_producer_copy_payload.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ int main(int argc, char **argv)
3636
// Send the message.
3737
producer.send(record,
3838
// The delivery report handler
39-
[](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
40-
if (!ec) {
39+
[](const kafka::Producer::RecordMetadata& metadata, const kafka::Error& error) {
40+
if (!error) {
4141
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
4242
} else {
43-
std::cerr << "% Message delivery failed: " << ec.message() << std::endl;
43+
std::cerr << "% Message delivery failed: " << error.message() << std::endl;
4444
}
4545
},
4646
// The memory block given by record.value() would be copied

examples/kafka_async_producer_not_copy_payload.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ int main(int argc, char **argv)
4242
// Note: Here we capture the shared_pointer of `line`,
4343
// which holds the content for `record.value()`.
4444
// It makes sure the memory block is valid until the lambda finishes.
45-
[line](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
46-
if (!ec) {
45+
[line](const kafka::Producer::RecordMetadata& metadata, const kafka::Error& error) {
46+
if (!error) {
4747
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
4848
} else {
49-
std::cerr << "% Message delivery failed: " << ec.message() << std::endl;
49+
std::cerr << "% Message delivery failed: " << error.message() << std::endl;
5050
}
5151
});
5252

include/kafka/AdminClient.h

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,54 @@ namespace Admin
2424
/**
2525
* The result of AdminClient::createTopics().
2626
*/
27-
using CreateTopicsResult = SimpleError;
27+
struct CreateTopicsResult
28+
{
29+
explicit CreateTopicsResult(Error err): error(std::move(err)) {}
30+
31+
/**
32+
* The result error.
33+
*/
34+
Error error;
35+
};
2836

2937
/**
3038
* The result of AdminClient::deleteTopics().
3139
*/
32-
using DeleteTopicsResult = SimpleError;
40+
struct DeleteTopicsResult
41+
{
42+
explicit DeleteTopicsResult(Error err): error(std::move(err)) {}
43+
44+
/**
45+
* The result error.
46+
*/
47+
Error error;
48+
};
3349

3450
/**
3551
* The result of AdminClient::deleteRecords().
3652
*/
37-
using DeleteRecordsResult = SimpleError;
53+
struct DeleteRecordsResult
54+
{
55+
explicit DeleteRecordsResult(Error err): error(std::move(err)) {}
56+
57+
/**
58+
* The result error.
59+
*/
60+
Error error;
61+
};
3862

3963
/**
4064
* The result of AdminClient::listTopics().
4165
*/
42-
struct ListTopicsResult: public SimpleError
66+
struct ListTopicsResult
4367
{
44-
ListTopicsResult(rd_kafka_resp_err_t respErr, std::string detailedMsg): SimpleError(respErr, std::move(detailedMsg)) {}
45-
explicit ListTopicsResult(Topics names): SimpleError(RD_KAFKA_RESP_ERR_NO_ERROR, "Success"), topics(std::move(names)) {}
68+
explicit ListTopicsResult(Error err): error(std::move(err)) {}
69+
explicit ListTopicsResult(Topics names): topics(std::move(names)) {}
70+
71+
/**
72+
* The result error.
73+
*/
74+
Error error;
4675

4776
/**
4877
* The topics fetched.
@@ -92,9 +121,9 @@ class AdminClient: public KafkaClient
92121
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
93122

94123
private:
95-
static std::list<SimpleError> getPerTopicResults(const rd_kafka_topic_result_t** topicResults, std::size_t topicCount);
96-
static std::list<SimpleError> getPerTopicPartitionResults(const rd_kafka_topic_partition_list_t* partitionResults);
97-
static SimpleError combineErrors(const std::list<SimpleError>& errors);
124+
static std::list<Error> getPerTopicResults(const rd_kafka_topic_result_t** topicResults, std::size_t topicCount);
125+
static std::list<Error> getPerTopicPartitionResults(const rd_kafka_topic_partition_list_t* partitionResults);
126+
static Error combineErrors(const std::list<Error>& errors);
98127

99128
#if COMPILER_SUPPORTS_CPP_17
100129
static constexpr int DEFAULT_COMMAND_TIMEOUT_MS = 30000;
@@ -106,10 +135,10 @@ class AdminClient: public KafkaClient
106135
};
107136

108137

109-
inline std::list<SimpleError>
138+
inline std::list<Error>
110139
AdminClient::getPerTopicResults(const rd_kafka_topic_result_t** topicResults, std::size_t topicCount)
111140
{
112-
std::list<SimpleError> errors;
141+
std::list<Error> errors;
113142

114143
for (std::size_t i = 0; i < topicCount; ++i)
115144
{
@@ -123,10 +152,10 @@ AdminClient::getPerTopicResults(const rd_kafka_topic_result_t** topicResults, st
123152
return errors;
124153
}
125154

126-
inline std::list<SimpleError>
155+
inline std::list<Error>
127156
AdminClient::getPerTopicPartitionResults(const rd_kafka_topic_partition_list_t* partitionResults)
128157
{
129-
std::list<SimpleError> errors;
158+
std::list<Error> errors;
130159

131160
for (int i = 0; i < (partitionResults ? partitionResults->cnt : 0); ++i)
132161
{
@@ -139,8 +168,8 @@ AdminClient::getPerTopicPartitionResults(const rd_kafka_topic_partition_list_t*
139168
return errors;
140169
}
141170

142-
inline SimpleError
143-
AdminClient::combineErrors(const std::list<SimpleError>& errors)
171+
inline Error
172+
AdminClient::combineErrors(const std::list<Error>& errors)
144173
{
145174
if (!errors.empty())
146175
{
@@ -152,10 +181,10 @@ AdminClient::combineErrors(const std::list<SimpleError>& errors)
152181
detailedMsg += error.message();
153182
});
154183

155-
return SimpleError(static_cast<rd_kafka_resp_err_t>(errors.front().errorCode().value()), detailedMsg);
184+
return Error{static_cast<rd_kafka_resp_err_t>(errors.front().value()), detailedMsg};
156185
}
157186

158-
return SimpleError(RD_KAFKA_RESP_ERR_NO_ERROR, "Success");
187+
return Error{RD_KAFKA_RESP_ERR_NO_ERROR, "Success"};
159188
}
160189

161190
inline Admin::CreateTopicsResult
@@ -174,7 +203,7 @@ AdminClient::createTopics(const Topics& topics,
174203
rkNewTopics.emplace_back(rd_kafka_NewTopic_new(topic.c_str(), numPartitions, replicationFactor, errInfo.str(), errInfo.capacity()));
175204
if (!rkNewTopics.back())
176205
{
177-
return Admin::CreateTopicsResult(RD_KAFKA_RESP_ERR__INVALID_ARG, rd_kafka_err2str(RD_KAFKA_RESP_ERR__INVALID_ARG));
206+
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, rd_kafka_err2str(RD_KAFKA_RESP_ERR__INVALID_ARG)});
178207
}
179208

180209
for (const auto& conf: topicConfig.map())
@@ -184,7 +213,7 @@ AdminClient::createTopics(const Topics& topics,
184213
{
185214
std::string errMsg = "Invalid config[" + conf.first + "=" + conf.second + "]";
186215
KAFKA_API_DO_LOG(Log::Level::Err, errMsg.c_str());
187-
return Admin::CreateTopicsResult(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg);
216+
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg});
188217
}
189218
}
190219
}
@@ -215,10 +244,10 @@ AdminClient::createTopics(const Topics& topics,
215244

216245
if (!rk_ev)
217246
{
218-
return Admin::CreateTopicsResult(RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit");
247+
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
219248
}
220249

221-
std::list<SimpleError> errors;
250+
std::list<Error> errors;
222251

223252
if (rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
224253
{
@@ -235,20 +264,20 @@ AdminClient::createTopics(const Topics& topics,
235264
// Return the error if any
236265
if (!errors.empty())
237266
{
238-
return combineErrors(errors);
267+
return Admin::CreateTopicsResult{combineErrors(errors)};
239268
}
240269

241270
// Update metedata
242271
do
243272
{
244273
auto listResult = listTopics();
245-
if (!listResult.errorCode())
274+
if (!listResult.error)
246275
{
247-
return Admin::CreateTopicsResult(RD_KAFKA_RESP_ERR_NO_ERROR, "Success");
276+
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR_NO_ERROR, "Success"});
248277
}
249278
} while (std::chrono::steady_clock::now() < end);
250279

251-
return Admin::CreateTopicsResult(RD_KAFKA_RESP_ERR__TIMED_OUT, "Updating metadata timed out");
280+
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "Updating metadata timed out"});
252281
}
253282

254283
inline Admin::DeleteTopicsResult
@@ -288,10 +317,10 @@ AdminClient::deleteTopics(const Topics& topics, std::chrono::milliseconds timeou
288317

289318
if (!rk_ev)
290319
{
291-
return Admin::DeleteTopicsResult(RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit");
320+
return Admin::DeleteTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
292321
}
293322

294-
std::list<SimpleError> errors;
323+
std::list<Error> errors;
295324

296325
if (rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
297326
{
@@ -305,7 +334,7 @@ AdminClient::deleteTopics(const Topics& topics, std::chrono::milliseconds timeou
305334

306335
errors.splice(errors.end(), getPerTopicResults(res_topics, res_topic_cnt));
307336

308-
return combineErrors(errors);
337+
return Admin::DeleteTopicsResult(combineErrors(errors));
309338
}
310339

311340
inline Admin::ListTopicsResult
@@ -317,7 +346,7 @@ AdminClient::listTopics(std::chrono::milliseconds timeout)
317346

318347
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
319348
{
320-
return Admin::ListTopicsResult(err, rd_kafka_err2str(err));
349+
return Admin::ListTopicsResult(Error{err, rd_kafka_err2str(err)});
321350
}
322351

323352
Topics names;
@@ -357,10 +386,10 @@ AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
357386

358387
if (!rk_ev)
359388
{
360-
return Admin::DeleteRecordsResult(RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit");
389+
return Admin::DeleteRecordsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
361390
}
362391

363-
std::list<SimpleError> errors;
392+
std::list<Error> errors;
364393

365394
if (rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
366395
{
@@ -372,7 +401,7 @@ AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
372401

373402
errors.splice(errors.end(), getPerTopicPartitionResults(res_offsets));
374403

375-
return combineErrors(errors);
404+
return Admin::DeleteRecordsResult(combineErrors(errors));
376405
}
377406

378407
} // end of KAFKA_API

include/kafka/Consumer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "kafka/Project.h"
44

5+
#include "kafka/Error.h"
56
#include "kafka/RdKafkaHelper.h"
67
#include "kafka/Types.h"
78

@@ -37,7 +38,7 @@ namespace Consumer
3738
/**
3839
* A callback interface that the user can implement to trigger custom actions when a commit request completes.
3940
*/
40-
using OffsetCommitCallback = std::function<void(const TopicPartitionOffsets& topicPartitionOffsets, std::error_code ec)>;
41+
using OffsetCommitCallback = std::function<void(const TopicPartitionOffsets& topicPartitionOffsets, const Error& error)>;
4142

4243
/**
4344
* Null OffsetCommitCallback

include/kafka/ConsumerRecord.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class ConsumerRecord
7979
* 2. Failure
8080
* - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes)
8181
*/
82-
std::error_code error() const { return ErrorCode(_rk_msg->err); }
82+
Error error() const { return Error{_rk_msg->err}; }
8383

8484
/**
8585
* Obtains explanatory string.

0 commit comments

Comments
 (0)