Skip to content

Commit 77599e1

Browse files
committed
Improve integration tests, etc
1 parent d4b9b41 commit 77599e1

File tree

12 files changed

+508
-175
lines changed

12 files changed

+508
-175
lines changed

include/kafka/AdminClient.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,13 @@ class AdminClient: public KafkaClient
7878
static std::list<ErrorWithDetail> getPerTopicResults(const rd_kafka_topic_result_t** topicResults, int topicCount);
7979
static ErrorWithDetail combineErrors(const std::list<ErrorWithDetail>& errors);
8080

81-
static constexpr int DEFAULT_COMMAND_TIMEOUT_MS = 30000;
82-
static constexpr int EVENT_POLLING_INTERVAL_MS = 100;
81+
#if __cplusplus >= 201703L
82+
static constexpr int DEFAULT_COMMAND_TIMEOUT_MS = 30000;
83+
static constexpr int EVENT_POLLING_INTERVAL_MS = 100;
84+
#else
85+
enum { DEFAULT_COMMAND_TIMEOUT_MS = 30000 };
86+
enum { EVENT_POLLING_INTERVAL_MS = 100 };
87+
#endif
8388
};
8489

8590

include/kafka/KafkaClient.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class KafkaClient
116116
if (level >= LOG_EMERG && level <= _logLevel && logger)
117117
{
118118
LogBuffer<LOG_BUFFER_SIZE> logBuffer;
119-
logBuffer.print("%s %s", name().c_str(), format, args...);
119+
logBuffer.print(name().c_str()).print(format, args...);
120120
logger(level, filename, lineno, logBuffer.c_str());
121121
}
122122
}
@@ -200,7 +200,11 @@ class KafkaClient
200200
static const constexpr char* SECURITY_PROTOCOL = "security.protocol";
201201
static const constexpr char* SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
202202

203+
#if __cplusplus >= 201703L
203204
static constexpr int DEFAULT_METADATA_TIMEOUT_MS = 10000;
205+
#else
206+
enum { DEFAULT_METADATA_TIMEOUT_MS = 10000 };
207+
#endif
204208

205209
protected:
206210
class Pollable
@@ -471,7 +475,7 @@ KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::millisec
471475
{
472476
if (!disableErrorLogging)
473477
{
474-
KAFKA_API_DO_LOG(LOG_ERR, "Failed to get BrokerMetadata! error[%s]", rd_kafka_err2str(err));
478+
KAFKA_API_DO_LOG(LOG_ERR, "failed to get BrokerMetadata! error[%s]", rd_kafka_err2str(err));
475479
}
476480
return ret;
477481
}
@@ -480,7 +484,7 @@ KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::millisec
480484
{
481485
if (!disableErrorLogging)
482486
{
483-
KAFKA_API_DO_LOG(LOG_ERR, "Failed to construct MetaData! topic_cnt[%d]", rk_metadata->topic_cnt);
487+
KAFKA_API_DO_LOG(LOG_ERR, "failed to construct MetaData! topic_cnt[%d]", rk_metadata->topic_cnt);
484488
}
485489
return ret;
486490
}
@@ -490,7 +494,7 @@ KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::millisec
490494
{
491495
if (!disableErrorLogging)
492496
{
493-
KAFKA_API_DO_LOG(LOG_ERR, "Failed to construct MetaData! topic.err[%s]", rd_kafka_err2str(metadata_topic.err));
497+
KAFKA_API_DO_LOG(LOG_ERR, "failed to construct MetaData! topic.err[%s]", rd_kafka_err2str(metadata_topic.err));
494498
}
495499
return ret;
496500
}
@@ -516,7 +520,7 @@ KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::millisec
516520
{
517521
if (!disableErrorLogging)
518522
{
519-
KAFKA_API_DO_LOG(LOG_ERR, "Got error[%s] while constructing BrokerMetadata for topic[%s]-partition[%d]", rd_kafka_err2str(metadata_partition.err), topic.c_str(), partition);
523+
KAFKA_API_DO_LOG(LOG_ERR, "got error[%s] while constructing BrokerMetadata for topic[%s]-partition[%d]", rd_kafka_err2str(metadata_partition.err), topic.c_str(), partition);
520524
}
521525

522526
continue;

include/kafka/KafkaConsumer.h

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <cassert>
1313
#include <chrono>
1414
#include <functional>
15+
#include <iterator>
1516
#include <memory>
1617

1718

@@ -173,6 +174,17 @@ class KafkaConsumer: public KafkaClient
173174
*/
174175
std::map<TopicPartition, Offset> endOffsets(const TopicPartitions& tps) const { return getOffsets(tps, false); }
175176

177+
/**
178+
* Get the offsets for the given partitions by time-point.
179+
* Throws KafkaException with errors:
180+
* - RD_KAFKA_RESP_ERR__TIMED_OUT: Not all offsets could be fetched in time.
181+
* - RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: All partitions are unknown.
182+
* - RD_KAFKA_RESP_ERR__LEADER_NOT_AVAILABLE: Unable to query leaders from the given partitions.
183+
*/
184+
std::map<TopicPartition, Offset> offsetsForTime(const TopicPartitions& tps,
185+
std::chrono::time_point<std::chrono::system_clock> timepoint,
186+
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_QUERY_TIMEOUT_MS)) const;
187+
176188
/**
177189
* Get the last committed offset for the given partition (whether the commit happened by this process or another).This offset will be used as the position for the consumer in the event of a failure.
178190
* This call will block to do a remote call to get the latest committed offsets from the server.
@@ -229,8 +241,15 @@ class KafkaConsumer: public KafkaClient
229241
static const constexpr char* ENABLE_AUTO_COMMIT = "enable.auto.commit";
230242
static const constexpr char* AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
231243

232-
static constexpr int DEFAULT_SEEK_TIMEOUT_MS = 10000;
233-
static constexpr int SEEK_RETRY_INTERVAL_MS = 5000;
244+
#if __cplusplus >= 201703L
245+
static constexpr int DEFAULT_QUERY_TIMEOUT_MS = 10000;
246+
static constexpr int DEFAULT_SEEK_TIMEOUT_MS = 10000;
247+
static constexpr int SEEK_RETRY_INTERVAL_MS = 5000;
248+
#else
249+
enum { DEFAULT_QUERY_TIMEOUT_MS = 10000 };
250+
enum { DEFAULT_SEEK_TIMEOUT_MS = 10000 };
251+
enum { SEEK_RETRY_INTERVAL_MS = 5000 };
252+
#endif
234253

235254
const OffsetCommitOption _offsetCommitOption;
236255

@@ -512,6 +531,38 @@ KafkaConsumer::position(const TopicPartition& tp) const
512531
return rk_tp->elems[0].offset;
513532
}
514533

534+
inline std::map<TopicPartition, Offset>
535+
KafkaConsumer::offsetsForTime(const TopicPartitions& tps,
536+
std::chrono::time_point<std::chrono::system_clock> timepoint,
537+
std::chrono::milliseconds timeout) const
538+
{
539+
if (tps.empty()) return TopicPartitionOffsets();
540+
541+
auto msSinceEpoch = std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count();
542+
543+
auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(tps));
544+
545+
for (int i = 0; i < rk_tpos->cnt; ++i)
546+
{
547+
rd_kafka_topic_partition_t& rk_tp = rk_tpos->elems[i];
548+
// Here the `msSinceEpoch` would be overridden by the offset result (after called by `rd_kafka_offsets_for_times`)
549+
rk_tp.offset = msSinceEpoch;
550+
}
551+
552+
rd_kafka_resp_err_t err = rd_kafka_offsets_for_times(getClientHandle(), rk_tpos.get(), timeout.count());
553+
KAFKA_THROW_IF_WITH_ERROR(err);
554+
555+
auto results = getTopicPartitionOffsets(rk_tpos.get());
556+
557+
// Remove invalid results (which are not updated with an valid offset)
558+
for (auto it = results.begin(); it != results.end(); )
559+
{
560+
it = ((it->second == msSinceEpoch) ? results.erase(it) : std::next(it));
561+
}
562+
563+
return results;
564+
}
565+
515566
inline std::map<TopicPartition, Offset>
516567
KafkaConsumer::getOffsets(const TopicPartitions& tps, bool atBeginning) const
517568
{

include/kafka/KafkaException.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class KafkaException: public std::exception
4646

4747
#define KAFKA_THROW(respErr) throw KafkaException(__FILE__, __LINE__, ErrorCode(respErr))
4848
#define KAFKA_THROW_WITH_MSG(respErr, ...) throw KafkaException(__FILE__, __LINE__, ErrorCode(respErr), __VA_ARGS__)
49-
#define KAFKA_THROW_IF_WITH_ERROR(respErr) if ((respErr) != RD_KAFKA_RESP_ERR_NO_ERROR) KAFKA_THROW(respErr)
49+
#define KAFKA_THROW_IF_WITH_ERROR(respErr) if (respErr != RD_KAFKA_RESP_ERR_NO_ERROR) KAFKA_THROW(respErr)
5050

5151
} // end of KAFKA_API
5252

include/kafka/KafkaProducer.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -283,14 +283,14 @@ class KafkaProducer: public KafkaClient
283283
* Stub for ProduceResponse handing.
284284
* Note: Only for internal unit tests
285285
*/
286-
void stubHandleProduceResponse(HandleProduceResponseCb cb = HandleProduceResponseCb()) { _handleProduceRespCb = cb; }
286+
void stubHandleProduceResponse(HandleProduceResponseCb cb = HandleProduceResponseCb()) { _handleProduceRespCb = std::move(cb); }
287287

288288
private:
289289
static rd_kafka_resp_err_t handleProduceResponse(rd_kafka_t* rk, int32_t brokerId, uint64_t msgSeq, rd_kafka_resp_err_t err)
290290
{
291-
KafkaClient* client = static_cast<KafkaClient*>(rd_kafka_opaque(rk));
292-
KafkaProducer* producer = dynamic_cast<KafkaProducer*>(client);
293-
auto respCb = producer->_handleProduceRespCb;
291+
auto* client = static_cast<KafkaClient*>(rd_kafka_opaque(rk));
292+
auto* producer = dynamic_cast<KafkaProducer*>(client);
293+
auto respCb = producer->_handleProduceRespCb;
294294
return respCb ? respCb(rk, brokerId, msgSeq, err) : err;
295295
}
296296

@@ -307,13 +307,13 @@ KafkaProducer::registerConfigCallbacks(rd_kafka_conf_t* conf)
307307
#ifdef KAFKA_API_ENABLE_UNIT_TEST_STUBS
308308
// UT stub for ProduceResponse
309309
LogBuffer<LOG_BUFFER_SIZE> errInfo;
310-
if (rd_kafka_conf_set(conf, "ut_handle_ProduceResponse", reinterpret_cast<char*>(&handleProduceResponse), errInfo.str(), errInfo.capacity()))
310+
if (rd_kafka_conf_set(conf, "ut_handle_ProduceResponse", reinterpret_cast<char*>(&handleProduceResponse), errInfo.str(), errInfo.capacity())) // NOLINT
311311
{
312312
KafkaClient* client = nullptr;
313313
size_t clientPtrSize = 0;
314-
if (rd_kafka_conf_get(conf, "opaque", reinterpret_cast<char*>(&client), &clientPtrSize))
314+
if (rd_kafka_conf_get(conf, "opaque", reinterpret_cast<char*>(&client), &clientPtrSize)) // NOLINT
315315
{
316-
LOG(LOG_CRIT, "failed to stub ut_handle_ProduceResponse! error[%s]. Meanwhile, failed to get the Kafka client!", errInfo.c_str());
316+
KAFKA_API_LOG(LOG_CRIT, "failed to stub ut_handle_ProduceResponse! error[%s]. Meanwhile, failed to get the Kafka client!", errInfo.c_str());
317317
}
318318
else
319319
{
@@ -397,7 +397,7 @@ KafkaProducer::sendMessage(const ProducerRecord& record,
397397
const auto* topic = record.topic().c_str();
398398
const auto partition = record.partition();
399399
const auto msgFlags = (static_cast<unsigned int>(option == SendOption::ToCopyRecordValue ? RD_KAFKA_MSG_F_COPY : 0)
400-
| static_cast<unsigned int>(action == ActionWhileQueueIsFull::Block ? RD_KAFKA_MSG_F_BLOCK : 0));
400+
| static_cast<unsigned int>(action == ActionWhileQueueIsFull::Block ? RD_KAFKA_MSG_F_BLOCK : 0));
401401
const auto* keyPtr = record.key().data();
402402
const auto keyLen = record.key().size();
403403
const auto* valuePtr = record.value().data();

include/kafka/Project.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,7 @@
55
#define KAFKA_API kafka
66
#endif
77

8+
// Here is the MACRO to enable UT stubs
9+
// #ifndef KAFKA_API_ENABLE_UNIT_TEST_STUBS
10+
// #define KAFKA_API_ENABLE_UNIT_TEST_STUBS
11+
// #endif

include/kafka/Timestamp.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ struct Timestamp
4141
(tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME ? Type::LogAppendTime : Type::NotAvailable);
4242
}
4343

44+
operator std::chrono::time_point<std::chrono::system_clock>() const // NOLINT
45+
{
46+
return std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(msSinceEpoch));
47+
}
48+
4449
static std::string toString(Type t)
4550
{
4651
switch (t)

tests/integration/TestAdminClient.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
#include <iostream>
88

99
using namespace KAFKA_API;
10-
using namespace KafkaTestUtility;
1110

1211

1312
TEST(AdminClient, createListDeleteTopics)
1413
{
15-
AdminClient adminClient(getKafkaClientCommonConfig());
14+
AdminClient adminClient(KafkaTestUtility::GetKafkaClientCommonConfig());
1615
std::cout << "[" << Utility::getCurrentTime() << "] " << adminClient.name() << " started" << std::endl;
1716

1817
const Topics topics = {Utility::getRandomString(), Utility::getRandomString()};
@@ -83,7 +82,7 @@ TEST(AdminClient, DuplicatedCreateDeleteTopics)
8382
const int numPartitions = 5;
8483
const int replicaFactor = 3;
8584

86-
AdminClient adminClient(getKafkaClientCommonConfig());
85+
AdminClient adminClient(KafkaTestUtility::GetKafkaClientCommonConfig());
8786
std::cout << "[" << Utility::getCurrentTime() << "] " << adminClient.name() << " started" << std::endl;
8887

8988
constexpr int MAX_REPEAT = 10;

0 commit comments

Comments
 (0)