diff --git a/.github/workflows/kafka_api_ci_tests.yml b/.github/workflows/kafka_api_ci_tests.yml index ee952b28..42705d4e 100644 --- a/.github/workflows/kafka_api_ci_tests.yml +++ b/.github/workflows/kafka_api_ci_tests.yml @@ -305,6 +305,7 @@ jobs: # Install boost headers/libraries vcpkg install boost-optional + vcpkg install boost-assert vcpkg install boost-algorithm vcpkg install boost-program-options diff --git a/README.md b/README.md index c54da9f9..b99ce390 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a ***header-only*** lib * Or, ***C++14***, but with pre-requirements - - Need ***boost*** headers (for `boost::optional`) + - Need ***boost*** headers (for `boost::optional`, `boost::assert`) - For ***GCC*** compiler, it needs optimization options (e.g. `-O2`) diff --git a/include/kafka/KafkaClient.h b/include/kafka/KafkaClient.h index 6d07d824..b152ade5 100644 --- a/include/kafka/KafkaClient.h +++ b/include/kafka/KafkaClient.h @@ -318,12 +318,12 @@ KafkaClient::KafkaClient(ClientType clientType, } catch (const std::exception& e) { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be an number!").append(e.what()))); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be an number!").append(e.what()))); } if (_logLevel < Log::Level::Emerg || _logLevel > Log::Level::Debug) { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be a value between 0 and 7!"))); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid log_level[").append(*logLevel).append("], which must be a value between 0 and 7!"))); } } @@ -340,7 +340,7 @@ KafkaClient::KafkaClient(ClientType clientType, } else { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid option[" + *enableManualEventsPoll + "] for \"enable.manual.events.poll\", which must be a bool value (true or false)!"))); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid option[" + *enableManualEventsPoll + "] for \"enable.manual.events.poll\", which must be a bool value (true or false)!"))); } } @@ -415,7 +415,10 @@ KafkaClient::KafkaClient(ClientType clientType, setInterceptors(properties.get("interceptors")); const Error result{ rd_kafka_conf_interceptor_add_on_new(rk_conf.get(), "on_new", KafkaClient::configInterceptorOnNew, nullptr) }; - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + } // Other Callbacks @@ -426,13 +429,17 @@ KafkaClient::KafkaClient(ClientType clientType, rk_conf.release(), // rk_conf's ownship would be transferred to rk, after the "rd_kafka_new()" call errInfo.clear().str(), errInfo.capacity())); - KAFKA_THROW_IF_WITH_ERROR(Error(rd_kafka_last_error())); + + const auto error = Error(rd_kafka_last_error()); + if (error) { + throw KafkaException(error); + } // Add brokers auto brokers = properties.getProperty(Config::BOOTSTRAP_SERVERS); if (!brokers || rd_kafka_brokers_add(getClientHandle(), brokers->c_str()) == 0) { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\ + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\ "No broker could be added successfully, BOOTSTRAP_SERVERS=[" + (brokers ? *brokers : "NA") + "]")); } @@ -447,7 +454,7 @@ KafkaClient::validateAndReformProperties(const Properties& properties) // BOOTSTRAP_SERVERS property is mandatory if (!newProperties.getProperty(Config::BOOTSTRAP_SERVERS)) { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\ + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\ "Validation failed! With no property [" + std::string(Config::BOOTSTRAP_SERVERS) + "]")); } diff --git a/include/kafka/KafkaConsumer.h b/include/kafka/KafkaConsumer.h index 32dccd59..d8afeaba 100644 --- a/include/kafka/KafkaConsumer.h +++ b/include/kafka/KafkaConsumer.h @@ -391,7 +391,7 @@ KafkaConsumer::KafkaConsumer(const Properties& properties) if (!isTrue(enableAutoCommit) && !isFalse(enableAutoCommit)) { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid property[enable.auto.commit=").append(enableAutoCommit).append("], which MUST be true(1) or false(0)!"))); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, std::string("Invalid property[enable.auto.commit=").append(enableAutoCommit).append("], which MUST be true(1) or false(0)!"))); } _enableAutoCommit = isTrue(enableAutoCommit); @@ -405,7 +405,10 @@ KafkaConsumer::KafkaConsumer(const Properties& properties) // Redirect the reply queue (to the client group queue) const Error result{ rd_kafka_poll_set_consumer(getClientHandle()) }; - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + // Initialize message-fetching queue _rk_queue.reset(rd_kafka_queue_get_consumer(getClientHandle())); @@ -460,7 +463,7 @@ KafkaConsumer::subscribe(const Topics& topics, consumer::RebalanceCallback rebal { if (!_userAssignment.empty()) { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once assign() was used, subscribe() should not be called any more!")); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once assign() was used, subscribe() should not be called any more!")); } if (isCooperativeEnabled() && topics == _userSubscription) @@ -479,7 +482,10 @@ KafkaConsumer::subscribe(const Topics& topics, consumer::RebalanceCallback rebal auto rk_topics = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList(topics)); const Error result{ rd_kafka_subscribe(getClientHandle(), rk_topics.get()) }; - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + _pendingEvent = PendingEvent::PartitionsAssignment; @@ -496,7 +502,7 @@ KafkaConsumer::subscribe(const Topics& topics, consumer::RebalanceCallback rebal } _pendingEvent.reset(); - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "subscribe() timed out!")); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "subscribe() timed out!")); } inline void @@ -524,7 +530,10 @@ KafkaConsumer::unsubscribe(std::chrono::milliseconds timeout) _userSubscription.clear(); const Error result{ rd_kafka_unsubscribe(getClientHandle()) }; - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + _pendingEvent = PendingEvent::PartitionsRevocation; @@ -541,7 +550,7 @@ KafkaConsumer::unsubscribe(std::chrono::milliseconds timeout) } _pendingEvent.reset(); - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "unsubscribe() timed out!")); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__TIMED_OUT, "unsubscribe() timed out!")); } inline Topics @@ -551,7 +560,10 @@ KafkaConsumer::subscription() const const Error result{ rd_kafka_subscription(getClientHandle(), &raw_topics) }; auto rk_topics = rd_kafka_topic_partition_list_unique_ptr(raw_topics); - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + return getTopics(rk_topics.get()); } @@ -609,7 +621,10 @@ KafkaConsumer::changeAssignment(PartitionsRebalanceEvent event, const TopicParti break; } - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + } // Assign Topic-Partitions @@ -618,7 +633,7 @@ KafkaConsumer::assign(const TopicPartitions& topicPartitions) { if (!_userSubscription.empty()) { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once subscribe() was used, assign() should not be called any more!")); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__FAIL, "Unexpected Operation! Once subscribe() was used, assign() should not be called any more!")); } _userAssignment = topicPartitions; @@ -636,7 +651,10 @@ KafkaConsumer::assignment() const auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(raw_tps); - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + return getTopicPartitions(rk_tps.get()); } @@ -652,7 +670,7 @@ KafkaConsumer::seek(const TopicPartition& topicPartition, Offset offset, std::ch auto rkt = rd_kafka_topic_unique_ptr(rd_kafka_topic_new(getClientHandle(), topicPartition.first.c_str(), nullptr)); if (!rkt) { - KAFKA_THROW_ERROR(Error(rd_kafka_last_error())); + throw KafkaException(Error(rd_kafka_last_error())); } const auto end = std::chrono::steady_clock::now() + timeout; @@ -670,8 +688,11 @@ KafkaConsumer::seek(const TopicPartition& topicPartition, Offset offset, std::ch // If that's the case, we would retry again (normally, just after a very short while, the "seek" would succeed) std::this_thread::yield(); } while (std::chrono::steady_clock::now() < end); - - KAFKA_THROW_IF_WITH_ERROR(Error(respErr)); + + const auto error = Error(respErr); + if (error) { + throw KafkaException(error); + } KAFKA_API_DO_LOG(Log::Level::Info, "seeked with topic-partition[%s], offset[%d]", topicPartitionStr.c_str(), offset); } @@ -691,7 +712,9 @@ KafkaConsumer::position(const TopicPartition& topicPartition) const auto rk_tp = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList({topicPartition})); const Error error{ rd_kafka_position(getClientHandle(), rk_tp.get()) }; - KAFKA_THROW_IF_WITH_ERROR(error); + if (error) { + throw KafkaException(error); + } return rk_tp->elems[0].offset; } @@ -744,7 +767,9 @@ KafkaConsumer::getOffsets(const TopicPartitions& topicPartitions, &beginning, &end, static_cast(timeout.count())) }; - KAFKA_THROW_IF_WITH_ERROR(error); + if (error) { + throw KafkaException(error); + } result[topicPartition] = (atBeginning ? beginning : end); } @@ -765,7 +790,9 @@ KafkaConsumer::commit(const TopicPartitionOffsets& topicPartitionOffsets, Commit error = Error{}; } - KAFKA_THROW_IF_WITH_ERROR(error); + if (error) { + throw KafkaException(error); + } } // Fetch committed offset @@ -775,7 +802,9 @@ KafkaConsumer::committed(const TopicPartition& topicPartition) auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(createRkTopicPartitionList({topicPartition})); const Error error {rd_kafka_committed(getClientHandle(), rk_tps.get(), TIMEOUT_INFINITE) }; - KAFKA_THROW_IF_WITH_ERROR(error); + if (error) { + throw KafkaException(error); + } return rk_tps->elems[0].offset; } @@ -820,7 +849,7 @@ KafkaConsumer::poll(std::chrono::milliseconds timeout) auto msgReceived = rd_kafka_consume_batch_queue(_rk_queue.get(), convertMsDurationToInt(timeout), msgPtrArray.data(), _maxPollRecords); if (msgReceived < 0) { - KAFKA_THROW_ERROR(Error(rd_kafka_last_error())); + throw KafkaException(Error(rd_kafka_last_error())); } // Wrap messages with ConsumerRecord @@ -840,7 +869,9 @@ KafkaConsumer::pauseOrResumePartitions(const TopicPartitions& topicPartitions, P const Error error{ (op == PauseOrResumeOperation::Pause) ? rd_kafka_pause_partitions(getClientHandle(), rk_tpos.get()) : rd_kafka_resume_partitions(getClientHandle(), rk_tpos.get()) }; - KAFKA_THROW_IF_WITH_ERROR(error); + if (error) { + throw KafkaException(error); + } const char* opString = (op == PauseOrResumeOperation::Pause) ? "pause" : "resume"; int cnt = 0; @@ -861,7 +892,7 @@ KafkaConsumer::pauseOrResumePartitions(const TopicPartitions& topicPartitions, P if (cnt == 0 && op == PauseOrResumeOperation::Pause) { const std::string errMsg = std::string("No partition could be ") + opString + std::string("d among TopicPartitions[") + toString(topicPartitions) + std::string("]"); - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg)); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg)); } } @@ -1012,7 +1043,9 @@ KafkaConsumer::commitAsync(const TopicPartitionOffsets& topicPartitionOffsets, c getCommitCbQueue(), &KafkaConsumer::offsetCommitCallback, new consumer::OffsetCommitCallback(offsetCommitCallback)) }; - KAFKA_THROW_IF_WITH_ERROR(error); + if (error) { + throw KafkaException(error); + } } inline void diff --git a/include/kafka/KafkaException.h b/include/kafka/KafkaException.h index a4fd3d5d..8770e0e4 100644 --- a/include/kafka/KafkaException.h +++ b/include/kafka/KafkaException.h @@ -5,6 +5,7 @@ #include #include #include +#include #include @@ -12,7 +13,6 @@ #include #include - namespace KAFKA_API { /** @@ -21,10 +21,10 @@ namespace KAFKA_API { class KafkaException: public std::exception { public: - KafkaException(const char* filename, std::size_t lineno, const Error& error) + KafkaException(const SourceLocation& location = SourceLocation::current(), const Error& error) : _when(std::chrono::system_clock::now()), - _filename(filename), - _lineno(lineno), + _filename(location.file_name()), + _lineno(location.line()), _error(std::make_shared(error)) {} @@ -52,9 +52,5 @@ class KafkaException: public std::exception mutable std::string _what; }; - -#define KAFKA_THROW_ERROR(error) throw KafkaException(__FILE__, __LINE__, error) -#define KAFKA_THROW_IF_WITH_ERROR(error) if (error) KAFKA_THROW_ERROR(error) - } // end of KAFKA_API diff --git a/include/kafka/KafkaProducer.h b/include/kafka/KafkaProducer.h index 6012fd52..f4013ed7 100644 --- a/include/kafka/KafkaProducer.h +++ b/include/kafka/KafkaProducer.h @@ -273,7 +273,7 @@ KafkaProducer::validateAndReformProperties(const Properties& properties) } errMsg += "."; - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg)); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg)); } // For "idempotence" feature @@ -285,7 +285,7 @@ KafkaProducer::validateAndReformProperties(const Properties& properties) { if (std::stoi(*maxInFlight) > KAFKA_IDEMP_MAX_INFLIGHT) { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\ + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\ "`max.in.flight` must be set <= " + std::to_string(KAFKA_IDEMP_MAX_INFLIGHT) + " when `enable.idempotence` is `true`")); } } @@ -294,7 +294,7 @@ KafkaProducer::validateAndReformProperties(const Properties& properties) { if (*acks != "all" && *acks != "-1") { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\ + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\ "`acks` must be set to `all`/`-1` when `enable.idempotence` is `true`")); } } @@ -391,7 +391,9 @@ KafkaProducer::send(const producer::ProducerRecord& record, assert(uvCount == rkVUs.size()); const Error sendResult{ rd_kafka_produceva(rk, rkVUs.data(), rkVUs.size()) }; - KAFKA_THROW_IF_WITH_ERROR(sendResult); + if (sendResult){ + throw KafkaException(sendResult); + } // KafkaProducer::deliveryCallback would delete the "opaque" deliveryCbOpaque.release(); @@ -419,7 +421,9 @@ KafkaProducer::syncSend(const producer::ProducerRecord& record) std::unique_lock lock(mtx); delivered.wait(lock, [&deliveryResult]{ return static_cast(deliveryResult); }); - KAFKA_THROW_IF_WITH_ERROR(*deliveryResult); // NOLINT + if (*deliveryResult) { + throw KafkaException(*deliveryResult); + } return recordMetadata; } @@ -461,28 +465,40 @@ inline void KafkaProducer::initTransactions(std::chrono::milliseconds timeout) { const Error result{ rd_kafka_init_transactions(getClientHandle(), static_cast(timeout.count())) }; - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + } inline void KafkaProducer::beginTransaction() { const Error result{ rd_kafka_begin_transaction(getClientHandle()) }; - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + } inline void KafkaProducer::commitTransaction(std::chrono::milliseconds timeout) { const Error result{ rd_kafka_commit_transaction(getClientHandle(), static_cast(timeout.count())) }; - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + } inline void KafkaProducer::abortTransaction(std::chrono::milliseconds timeout) { const Error result{ rd_kafka_abort_transaction(getClientHandle(), static_cast(timeout.count())) }; - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + } inline void @@ -495,7 +511,10 @@ KafkaProducer::sendOffsetsToTransaction(const TopicPartitionOffsets& t rk_tpos.get(), groupMetadata.rawHandle(), static_cast(timeout.count())) }; - KAFKA_THROW_IF_WITH_ERROR(result); + if (result) { + throw KafkaException(result); + } + } } } } // end of KAFKA_API::clients::producer diff --git a/include/kafka/Properties.h b/include/kafka/Properties.h index e2920150..8a6ea8f4 100644 --- a/include/kafka/Properties.h +++ b/include/kafka/Properties.h @@ -157,7 +157,7 @@ class Properties auto search = _kvMap.find(key); if (search == _kvMap.end()) { - KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, "Failed to get \"" + key + "\" from Properties!")); + throw KafkaException(Error(RD_KAFKA_RESP_ERR__INVALID_ARG, "Failed to get property: key \"" + key + "\" not found!")); } const ValueType& v = search->second; diff --git a/include/kafka/Types.h b/include/kafka/Types.h index 3d04b7e0..53479a75 100644 --- a/include/kafka/Types.h +++ b/include/kafka/Types.h @@ -27,6 +27,14 @@ template using Optional = boost::optional; #endif +// Use `boost::source_location`, which doesn't support `std::source_location` +#if COMPILER_SUPPORTS_CPP_20 +#include +using SourceLocation = std::source_location; +#else +#include +using SourceLocation = boost::source_location; +#endif namespace KAFKA_API { diff --git a/tests/unit/TestKafkaException.cc b/tests/unit/TestKafkaException.cc index c3fad87e..2aaeb30d 100644 --- a/tests/unit/TestKafkaException.cc +++ b/tests/unit/TestKafkaException.cc @@ -65,7 +65,7 @@ TEST(KafkaException, Macros) // Try KAFKA_THROW_IF_WITH_ERROR (with error) EXPECT_KAFKA_THROW(KAFKA_THROW_IF_WITH_ERROR(kafka::Error(RD_KAFKA_RESP_ERR__TIMED_OUT)), RD_KAFKA_RESP_ERR__TIMED_OUT); - // Try KAFKA_THROW_ERROR (with no specified error message) + // Try throw KafkaException (with no specified error message) try { KAFKA_THROW_IF_WITH_ERROR(kafka::Error(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT)); @@ -77,10 +77,10 @@ TEST(KafkaException, Macros) EXPECT_TRUE(std::regex_match(e.what(), reMatch)); } - // Try KAFKA_THROW_ERROR (with specified error message) + // Try throw KafkaException (with specified error message) try { - KAFKA_THROW_ERROR(kafka::Error(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, "something wrong here")); + throw KafkaException(kafka::Error(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, "something wrong here")); EXPECT_FALSE(true); } catch (const kafka::KafkaException& e)