Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/kafka_api_ci_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ jobs:

# Install boost headers/libraries
vcpkg install boost-optional
vcpkg install boost-assert
vcpkg install boost-algorithm
vcpkg install boost-program-options

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a ***header-only*** lib

* Or, ***C++14***, but with pre-requirements

- Need ***boost*** headers (for `boost::optional`)
- Need ***boost*** headers (for `boost::optional`, `boost::assert`)

- For ***GCC*** compiler, it needs optimization options (e.g. `-O2`)

Expand Down
21 changes: 14 additions & 7 deletions include/kafka/KafkaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,12 @@ KafkaClient::KafkaClient(ClientType clientType,
}
catch (const std::exception& e)
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be an number!").append(e.what())));
throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be an number!").append(e.what())));
}

if (_logLevel < Log::Level::Emerg || _logLevel > Log::Level::Debug)
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be a value between 0 and 7!")));
throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be a value between 0 and 7!")));
}
}

Expand All @@ -340,7 +340,7 @@ KafkaClient::KafkaClient(ClientType clientType,
}
else
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid option[" + *enableManualEventsPoll + "] for \"enable.manual.events.poll\", which must be a bool value (true or false)!")));
throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid option[" + *enableManualEventsPoll + "] for \"enable.manual.events.poll\", which must be a bool value (true or false)!")));
}
}

Expand Down Expand Up @@ -415,7 +415,10 @@ KafkaClient::KafkaClient(ClientType clientType,
setInterceptors(properties.get<Interceptors>("interceptors"));

const Error result{ rd_kafka_conf_interceptor_add_on_new(rk_conf.get(), "on_new", KafkaClient::configInterceptorOnNew, nullptr) };
KAFKA_THROW_IF_WITH_ERROR(result);
if (result) {
throw KafkaException(result);
}

}

// Other Callbacks
Expand All @@ -426,13 +429,17 @@ KafkaClient::KafkaClient(ClientType clientType,
rk_conf.release(), // rk_conf's ownship would be transferred to rk, after the "rd_kafka_new()" call
errInfo.clear().str(),
errInfo.capacity()));
KAFKA_THROW_IF_WITH_ERROR(Error(rd_kafka_last_error()));

const auto error = Error(rd_kafka_last_error());
if (error) {
throw KafkaException(error);
}

// Add brokers
auto brokers = properties.getProperty(Config::BOOTSTRAP_SERVERS);
if (!brokers || rd_kafka_brokers_add(getClientHandle(), brokers->c_str()) == 0)
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
"No broker could be added successfully, BOOTSTRAP_SERVERS=[" + (brokers ? *brokers : "NA") + "]"));
}

Expand All @@ -447,7 +454,7 @@ KafkaClient::validateAndReformProperties(const Properties& properties)
// BOOTSTRAP_SERVERS property is mandatory
if (!newProperties.getProperty(Config::BOOTSTRAP_SERVERS))
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
"Validation failed! With no property [" + std::string(Config::BOOTSTRAP_SERVERS) + "]"));
}

Expand Down
77 changes: 55 additions & 22 deletions include/kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ KafkaConsumer::KafkaConsumer(const Properties& properties)

if (!isTrue(enableAutoCommit) && !isFalse(enableAutoCommit))
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid property[enable.auto.commit=").append(enableAutoCommit).append("], which MUST be true(1) or false(0)!")));
throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid property[enable.auto.commit=").append(enableAutoCommit).append("], which MUST be true(1) or false(0)!")));
}

_enableAutoCommit = isTrue(enableAutoCommit);
Expand All @@ -405,7 +405,10 @@ KafkaConsumer::KafkaConsumer(const Properties& properties)

// Redirect the reply queue (to the client group queue)
const Error result{ rd_kafka_poll_set_consumer(getClientHandle()) };
KAFKA_THROW_IF_WITH_ERROR(result);
if (result) {
throw KafkaException(result);
}


// Initialize message-fetching queue
_rk_queue.reset(rd_kafka_queue_get_consumer(getClientHandle()));
Expand Down Expand Up @@ -460,7 +463,7 @@ KafkaConsumer::subscribe(const Topics& topics, consumer::RebalanceCallback rebal
{
if (!_userAssignment.empty())
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once assign() was used, subscribe() should not be called any more!"));
throw KafkaException(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once assign() was used, subscribe() should not be called any more!"));
}

if (isCooperativeEnabled() && topics == _userSubscription)
Expand All @@ -479,7 +482,10 @@ KafkaConsumer::subscribe(const Topics& topics, consumer::RebalanceCallback rebal
auto rk_topics = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topics));

const Error result{ rd_kafka_subscribe(getClientHandle(), rk_topics.get()) };
KAFKA_THROW_IF_WITH_ERROR(result);
if (result) {
throw KafkaException(result);
}


_pendingEvent = PendingEvent::PartitionsAssignment;

Expand All @@ -496,7 +502,7 @@ KafkaConsumer::subscribe(const Topics& topics, consumer::RebalanceCallback rebal
}

_pendingEvent.reset();
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "subscribe() timed out!"));
throw KafkaException(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "subscribe() timed out!"));
}

inline void
Expand Down Expand Up @@ -524,7 +530,10 @@ KafkaConsumer::unsubscribe(std::chrono::milliseconds timeout)
_userSubscription.clear();

const Error result{ rd_kafka_unsubscribe(getClientHandle()) };
KAFKA_THROW_IF_WITH_ERROR(result);
if (result) {
throw KafkaException(result);
}


_pendingEvent = PendingEvent::PartitionsRevocation;

Expand All @@ -541,7 +550,7 @@ KafkaConsumer::unsubscribe(std::chrono::milliseconds timeout)
}

_pendingEvent.reset();
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "unsubscribe() timed out!"));
throw KafkaException(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "unsubscribe() timed out!"));
}

inline Topics
Expand All @@ -551,7 +560,10 @@ KafkaConsumer::subscription() const
const Error result{ rd_kafka_subscription(getClientHandle(), &raw_topics) };
auto rk_topics = rd_kafka_topic_partition_list_unique_ptr(raw_topics);

KAFKA_THROW_IF_WITH_ERROR(result);
if (result) {
throw KafkaException(result);
}


return getTopics(rk_topics.get());
}
Expand Down Expand Up @@ -609,7 +621,10 @@ KafkaConsumer::changeAssignment(PartitionsRebalanceEvent event, const TopicParti
break;
}

KAFKA_THROW_IF_WITH_ERROR(result);
if (result) {
throw KafkaException(result);
}

}

// Assign Topic-Partitions
Expand All @@ -618,7 +633,7 @@ KafkaConsumer::assign(const TopicPartitions& topicPartitions)
{
if (!_userSubscription.empty())
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once subscribe() was used, assign() should not be called any more!"));
throw KafkaException(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once subscribe() was used, assign() should not be called any more!"));
}

_userAssignment = topicPartitions;
Expand All @@ -636,7 +651,10 @@ KafkaConsumer::assignment() const

auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(raw_tps);

KAFKA_THROW_IF_WITH_ERROR(result);
if (result) {
throw KafkaException(result);
}


return getTopicPartitions(rk_tps.get());
}
Expand All @@ -652,7 +670,7 @@ KafkaConsumer::seek(const TopicPartition& topicPartition, Offset offset, std::ch
auto rkt = rd_kafka_topic_unique_ptr(rd_kafka_topic_new(getClientHandle(), topicPartition.first.c_str(), nullptr));
if (!rkt)
{
KAFKA_THROW_ERROR(Error(rd_kafka_last_error()));
throw KafkaException(Error(rd_kafka_last_error()));
}

const auto end = std::chrono::steady_clock::now() + timeout;
Expand All @@ -670,8 +688,11 @@ KafkaConsumer::seek(const TopicPartition& topicPartition, Offset offset, std::ch
// If that's the case, we would retry again (normally, just after a very short while, the "seek" would succeed)
std::this_thread::yield();
} while (std::chrono::steady_clock::now() < end);

KAFKA_THROW_IF_WITH_ERROR(Error(respErr));

const auto error = Error(respErr);
if (error) {
throw KafkaException(error);
}

KAFKA_API_DO_LOG(Log::Level::Info, "seeked with topic-partition[%s], offset[%d]", topicPartitionStr.c_str(), offset);
}
Expand All @@ -691,7 +712,9 @@ KafkaConsumer::position(const TopicPartition& topicPartition) const
auto rk_tp = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList({topicPartition}));

const Error error{ rd_kafka_position(getClientHandle(), rk_tp.get()) };
KAFKA_THROW_IF_WITH_ERROR(error);
if (error) {
throw KafkaException(error);
}

return rk_tp->elems[0].offset;
}
Expand Down Expand Up @@ -744,7 +767,9 @@ KafkaConsumer::getOffsets(const TopicPartitions& topicPartitions,
&beginning,
&end,
static_cast<int>(timeout.count())) };
KAFKA_THROW_IF_WITH_ERROR(error);
if (error) {
throw KafkaException(error);
}

result[topicPartition] = (atBeginning ? beginning : end);
}
Expand All @@ -765,7 +790,9 @@ KafkaConsumer::commit(const TopicPartitionOffsets& topicPartitionOffsets, Commit
error = Error{};
}

KAFKA_THROW_IF_WITH_ERROR(error);
if (error) {
throw KafkaException(error);
}
}

// Fetch committed offset
Expand All @@ -775,7 +802,9 @@ KafkaConsumer::committed(const TopicPartition& topicPartition)
auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList({topicPartition}));

const Error error {rd_kafka_committed(getClientHandle(), rk_tps.get(), TIMEOUT_INFINITE) };
KAFKA_THROW_IF_WITH_ERROR(error);
if (error) {
throw KafkaException(error);
}

return rk_tps->elems[0].offset;
}
Expand Down Expand Up @@ -820,7 +849,7 @@ KafkaConsumer::poll(std::chrono::milliseconds timeout)
auto msgReceived = rd_kafka_consume_batch_queue(_rk_queue.get(), convertMsDurationToInt(timeout), msgPtrArray.data(), _maxPollRecords);
if (msgReceived < 0)
{
KAFKA_THROW_ERROR(Error(rd_kafka_last_error()));
throw KafkaException(Error(rd_kafka_last_error()));
}

// Wrap messages with ConsumerRecord
Expand All @@ -840,7 +869,9 @@ KafkaConsumer::pauseOrResumePartitions(const TopicPartitions& topicPartitions, P
const Error error{ (op == PauseOrResumeOperation::Pause) ?
rd_kafka_pause_partitions(getClientHandle(), rk_tpos.get())
: rd_kafka_resume_partitions(getClientHandle(), rk_tpos.get()) };
KAFKA_THROW_IF_WITH_ERROR(error);
if (error) {
throw KafkaException(error);
}

const char* opString = (op == PauseOrResumeOperation::Pause) ? "pause" : "resume";
int cnt = 0;
Expand All @@ -861,7 +892,7 @@ KafkaConsumer::pauseOrResumePartitions(const TopicPartitions& topicPartitions, P
if (cnt == 0 && op == PauseOrResumeOperation::Pause)
{
const std::string errMsg = std::string("No partition could be ") + opString + std::string("d among TopicPartitions[") + toString(topicPartitions) + std::string("]");
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg));
throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg));
}
}

Expand Down Expand Up @@ -1012,7 +1043,9 @@ KafkaConsumer::commitAsync(const TopicPartitionOffsets& topicPartitionOffsets, c
getCommitCbQueue(),
&KafkaConsumer::offsetCommitCallback,
new consumer::OffsetCommitCallback(offsetCommitCallback)) };
KAFKA_THROW_IF_WITH_ERROR(error);
if (error) {
throw KafkaException(error);
}
}

inline void
Expand Down
12 changes: 4 additions & 8 deletions include/kafka/KafkaException.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
#include <kafka/Error.h>
#include <kafka/RdKafkaHelper.h>
#include <kafka/Utility.h>
#include <kafka/Type.h>

#include <librdkafka/rdkafka.h>

#include <chrono>
#include <exception>
#include <string>


namespace KAFKA_API {

/**
Expand All @@ -21,10 +21,10 @@ namespace KAFKA_API {
class KafkaException: public std::exception
{
public:
KafkaException(const char* filename, std::size_t lineno, const Error& error)
KafkaException(const SourceLocation& location = SourceLocation::current(), const Error& error)
: _when(std::chrono::system_clock::now()),
_filename(filename),
_lineno(lineno),
_filename(location.file_name()),
_lineno(location.line()),
_error(std::make_shared<Error>(error))
{}

Expand Down Expand Up @@ -52,9 +52,5 @@ class KafkaException: public std::exception
mutable std::string _what;
};


#define KAFKA_THROW_ERROR(error) throw KafkaException(__FILE__, __LINE__, error)
#define KAFKA_THROW_IF_WITH_ERROR(error) if (error) KAFKA_THROW_ERROR(error)

} // end of KAFKA_API

Loading