Skip to content

Commit ff9fbe1

Browse files
committed
Improve Error
1 parent 718530e commit ff9fbe1

File tree

7 files changed

+114
-80
lines changed

7 files changed

+114
-80
lines changed

.github/workflows/kafka_api_ci_tests.yml

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ jobs:
2121
BUILD_CXX: ${{ matrix.build-cxx }}
2222
BUILD_TYPE: ${{ matrix.build-type }}
2323
CXX_STANDARD: ${{ matrix.cxx-standard }}
24+
ENABLE_UT_STUBS: ${{ matrix.enable-ut-stubs }}
2425
CHECK_OPTION: ${{ matrix.check-option }}
2526
GENERATE_DOC: ${{ matrix.generate-doc }}
2627
WITH_INSTALLATION: ${{ matrix.with-installation }}
@@ -29,58 +30,60 @@ jobs:
2930
strategy:
3031
matrix:
3132
include:
32-
- os: macos-10.15
33-
build-cxx: clang++
34-
test-labels: UT|IT
35-
36-
- os: ubuntu-20.04
37-
build-cxx: g++
38-
build-type: Debug
39-
test-labels: UT|IT
40-
41-
- os: ubuntu-20.04
42-
build-cxx: g++
43-
build-type: Release
44-
test-labels: RT
45-
46-
- os: ubuntu-20.04
47-
build-cxx: g++
48-
build-type: Release
49-
cxx-standard: 14
50-
test-labels: UT|IT
51-
52-
- os: ubuntu-20.04
53-
build-cxx: g++
54-
check-option: asan
55-
test-labels: UT|IT
56-
57-
- os: ubuntu-18.04
58-
build-cxx: g++
59-
check-option: tsan
60-
test-labels: UT|IT
61-
62-
- os: ubuntu-20.04
63-
build-cxx: g++
64-
check-option: ubsan
65-
test-labels: UT|IT
66-
67-
- os: ubuntu-20.04
68-
build-cxx: clang++
69-
test-labels: UT|IT
33+
- os: macos-10.15
34+
build-cxx: clang++
35+
test-labels: UT|IT
36+
37+
- os: ubuntu-20.04
38+
build-cxx: g++
39+
build-type: Debug
40+
test-labels: UT|IT
41+
enable-ut-stubs: true
42+
43+
- os: ubuntu-20.04
44+
build-cxx: g++
45+
build-type: Release
46+
test-labels: RT
47+
48+
- os: ubuntu-20.04
49+
build-cxx: g++
50+
build-type: Release
51+
cxx-standard: 14
52+
test-labels: UT|IT
53+
54+
- os: ubuntu-20.04
55+
build-cxx: g++
56+
check-option: asan
57+
test-labels: UT|IT
58+
59+
- os: ubuntu-18.04
60+
build-cxx: g++
61+
check-option: tsan
62+
test-labels: UT|IT
63+
64+
- os: ubuntu-20.04
65+
build-cxx: g++
66+
check-option: ubsan
67+
test-labels: UT|IT
68+
69+
- os: ubuntu-20.04
70+
build-cxx: clang++
71+
test-labels: UT|IT
7072
generate-doc: true
7173
with-installation: true
7274

73-
- os: ubuntu-20.04
74-
build-cxx: clang++
75-
check-option: clang-tidy
75+
- os: ubuntu-20.04
76+
build-cxx: clang++
77+
check-option: clang-tidy
78+
enable-ut-stubs: true
7679

77-
- os: ubuntu-18.04
78-
build-cxx: g++
79-
test-labels: UT|IT
80+
- os: ubuntu-18.04
81+
build-cxx: g++
82+
test-labels: UT|IT
8083

81-
- os: ubuntu-18.04
82-
build-cxx: clang++
83-
test-labels: RT
84+
- os: ubuntu-18.04
85+
build-cxx: clang++
86+
test-labels: RT
8487

8588
steps:
8689
- uses: actions/checkout@v2
@@ -157,8 +160,12 @@ jobs:
157160
export CMAKE_BUILD_TYPE=""
158161
fi
159162
163+
if [[ ${ENABLE_UT_STUBS} ]]; then
164+
export BUILD_OPTION="${BUILD_OPTION} -DBUILD_OPTION_ENABLE_UT_STUBS=ON"
165+
fi
166+
160167
if [[ ${CHECK_OPTION} == 'clang-tidy' ]]; then
161-
export BUILD_OPTION='-DBUILD_OPTION_CLANG_TIDY=ON'
168+
export BUILD_OPTION="${BUILD_OPTION} -DBUILD_OPTION_CLANG_TIDY=ON"
162169
fi
163170
164171
if [[ ${CHECK_OPTION} == 'asan' ]]; then

CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
8282
endif ()
8383

8484

85+
#---------------------------
86+
# Build Option: UT stubs
87+
#---------------------------
88+
if (BUILD_OPTION_ENABLE_UT_STUBS)
89+
add_definitions(-DKAFKA_API_ENABLE_UNIT_TEST_STUBS)
90+
endif ()
91+
92+
8593
#---------------------------
8694
# Build Option: clang-tidy
8795
#---------------------------

include/kafka/AdminCommon.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace KAFKA_API::clients::admin {
1313
*/
1414
struct CreateTopicsResult
1515
{
16-
explicit CreateTopicsResult(Error err): error(std::move(err)) {}
16+
explicit CreateTopicsResult(const Error& err): error(err) {}
1717

1818
/**
1919
* The result error.
@@ -26,7 +26,7 @@ struct CreateTopicsResult
2626
*/
2727
struct DeleteTopicsResult
2828
{
29-
explicit DeleteTopicsResult(Error err): error(std::move(err)) {}
29+
explicit DeleteTopicsResult(const Error& err): error(err) {}
3030

3131
/**
3232
* The result error.
@@ -39,7 +39,7 @@ struct DeleteTopicsResult
3939
*/
4040
struct DeleteRecordsResult
4141
{
42-
explicit DeleteRecordsResult(Error err): error(std::move(err)) {}
42+
explicit DeleteRecordsResult(const Error& err): error(err) {}
4343

4444
/**
4545
* The result error.
@@ -52,7 +52,7 @@ struct DeleteRecordsResult
5252
*/
5353
struct ListTopicsResult
5454
{
55-
explicit ListTopicsResult(Error err): error(std::move(err)) {}
55+
explicit ListTopicsResult(const Error& err): error(err) {}
5656
explicit ListTopicsResult(Topics names): topics(std::move(names)) {}
5757

5858
/**

include/kafka/Error.h

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,30 @@ class Error
3636
// The error with brief info
3737
explicit Error(rd_kafka_resp_err_t respErr): _respErr(respErr) {}
3838
// The error with detailed message
39-
Error(rd_kafka_resp_err_t respErr, std::string message)
40-
: _respErr(respErr), _message(std::move(message)) {}
41-
Error(rd_kafka_resp_err_t respErr, std::string message, bool fatal)
42-
: _respErr(respErr), _message(std::move(message)), _fatal(fatal) {}
39+
Error(rd_kafka_resp_err_t respErr, std::string message, bool fatal = false)
40+
: _respErr(respErr), _message(std::move(message)), _isFatal(fatal) {}
41+
// Copy constructor
42+
Error(const Error& error) { *this = error; }
4343

44+
// Assignment operator
45+
Error& operator=(const Error& error)
46+
{
47+
if (this == &error) return *this;
48+
49+
_rkError.reset();
50+
51+
_respErr = static_cast<rd_kafka_resp_err_t>(error.value());
52+
_message = error._message;
53+
_isFatal = error.isFatal();
54+
_txnRequiresAbort = error.transactionRequiresAbort();
55+
_isRetriable = error.isRetriable();
56+
57+
return *this;
58+
}
59+
60+
/**
61+
* Check if the error is valid.
62+
*/
4463
explicit operator bool() const { return static_cast<bool>(value()); }
4564

4665
/**
@@ -80,9 +99,7 @@ class Error
8099
{
81100
std::ostringstream oss;
82101

83-
oss << rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(value())) << " [" << value() << "]";
84-
85-
if (auto fatal = isFatal()) oss << " | " << (*fatal ? "fatal" : "non-fatal");
102+
oss << rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(value())) << " [" << value() << "]" << (isFatal() ? " fatal" : "");
86103
if (transactionRequiresAbort()) oss << " | transaction-requires-abort";
87104
if (auto retriable = isRetriable()) oss << " | " << (*retriable ? "retriable" : "non-retriable");
88105
if (_message) oss << " | " << *_message;
@@ -93,17 +110,17 @@ class Error
93110
/**
94111
* Fatal error indicates that the client instance is no longer usable.
95112
*/
96-
Optional<bool> isFatal() const
113+
bool isFatal() const
97114
{
98-
return _rkError ? rd_kafka_error_is_fatal(_rkError.get()) : _fatal;
115+
return _rkError ? rd_kafka_error_is_fatal(_rkError.get()) : _isFatal;
99116
}
100117

101118
/**
102119
* Show whether the operation may be retried.
103120
*/
104121
Optional<bool> isRetriable() const
105122
{
106-
return _rkError ? rd_kafka_error_is_retriable(_rkError.get()) : Optional<bool>{};
123+
return _rkError ? rd_kafka_error_is_retriable(_rkError.get()) : _isRetriable;
107124
}
108125

109126
/**
@@ -122,7 +139,9 @@ class Error
122139
rd_kafka_error_shared_ptr _rkError; // For error with rich info
123140
rd_kafka_resp_err_t _respErr{}; // For error with a simple response code
124141
Optional<std::string> _message; // Additional detailed message (if any)
125-
Optional<bool> _fatal; // Fatal flag (if any)
142+
bool _isFatal = false;
143+
bool _txnRequiresAbort = false;
144+
Optional<bool> _isRetriable; // Retriable flag (if any)
126145
};
127146

128147
} // end of KAFKA_API

include/kafka/KafkaClient.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,12 @@ class KafkaClient
154154
*/
155155
#define KAFKA_API_LOG(lvl, ...) KafkaClient::doGlobalLog(lvl, __FILE__, __LINE__, ##__VA_ARGS__)
156156

157+
#if COMPILER_SUPPORTS_CPP_17
158+
static constexpr int DEFAULT_METADATA_TIMEOUT_MS = 10000;
159+
#else
160+
enum { DEFAULT_METADATA_TIMEOUT_MS = 10000 };
161+
#endif
162+
157163
protected:
158164
// There're 3 derived classes: KafkaConsumer, KafkaProducer, AdminClient
159165
enum class ClientType { KafkaConsumer, KafkaProducer, AdminClient };
@@ -247,12 +253,6 @@ class KafkaClient
247253
static const constexpr char* LOG_LEVEL = "log_level";
248254
static const constexpr char* DEBUG = "debug";
249255

250-
#if COMPILER_SUPPORTS_CPP_17
251-
static constexpr int DEFAULT_METADATA_TIMEOUT_MS = 10000;
252-
#else
253-
enum { DEFAULT_METADATA_TIMEOUT_MS = 10000 };
254-
#endif
255-
256256
protected:
257257
struct Pollable
258258
{

include/kafka/KafkaProducer.h

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,14 @@ class KafkaProducer: public KafkaClient
153153
const consumer::ConsumerGroupMetadata& groupMetadata,
154154
std::chrono::milliseconds timeout);
155155

156+
#if COMPILER_SUPPORTS_CPP_17
157+
static constexpr int DEFAULT_INIT_TRANSACTIONS_TIMEOUT_MS = 10000;
158+
static constexpr int DEFAULT_COMMIT_TRANSACTION_TIMEOUT_MS = 10000;
159+
#else
160+
enum { DEFAULT_INIT_TRANSACTIONS_TIMEOUT_MS = 10000 };
161+
enum { DEFAULT_COMMIT_TRANSACTION_TIMEOUT_MS = 10000 };
162+
#endif
163+
156164
private:
157165
void pollCallbacks(int timeoutMs)
158166
{
@@ -177,16 +185,6 @@ class KafkaProducer: public KafkaClient
177185

178186
enum class ActionWhileQueueIsFull { Block, NoBlock };
179187

180-
static constexpr int CALLBACK_POLLING_INTERVAL_MS = 10;
181-
182-
#if COMPILER_SUPPORTS_CPP_17
183-
static constexpr int DEFAULT_INIT_TRANSACTIONS_TIMEOUT_MS = 10000;
184-
static constexpr int DEFAULT_COMMIT_TRANSACTION_TIMEOUT_MS = 10000;
185-
#else
186-
enum { DEFAULT_INIT_TRANSACTIONS_TIMEOUT_MS = 10000 };
187-
enum { DEFAULT_COMMIT_TRANSACTION_TIMEOUT_MS = 10000 };
188-
#endif
189-
190188
// Validate properties (and fix it if necesary)
191189
static Properties validateAndReformProperties(const Properties& properties);
192190

@@ -252,7 +250,7 @@ KafkaProducer::registerConfigCallbacks(rd_kafka_conf_t* conf)
252250
}
253251
else
254252
{
255-
assert(clientPtrSize == sizeof(client));
253+
assert(clientPtrSize == sizeof(client)); // NOLINT
256254
client->KAFKA_API_DO_LOG(Log::Level::Err, "failed to stub ut_handle_ProduceResponse! error[%s]", errInfo.c_str());
257255
}
258256
}

tests/unit/TestKafkaException.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ TEST(KafkaException, Basic)
3535
}
3636
catch (const kafka::KafkaException& e)
3737
{
38+
std::cout << e.what() << std::endl;
3839
std::regex reMatch(R"(.*Broker: Request timed out \[7\] \(some_filename:100\))");
3940
EXPECT_TRUE(std::regex_match(e.what(), reMatch));
4041
}
@@ -47,6 +48,7 @@ TEST(KafkaException, Basic)
4748
}
4849
catch (const kafka::KafkaException& e)
4950
{
51+
std::cout << e.what() << std::endl;
5052
std::regex reMatch(R"(.*Broker: Request timed out \[7\] \| something wrong here \(some_filename:100\))");
5153
EXPECT_TRUE(std::regex_match(e.what(), reMatch));
5254
}

0 commit comments

Comments
 (0)