Skip to content

Commit 8ed6644

Browse files
committed
Improve fetchBrokerMetadata, -- to enable cgrp_update
1 parent d9f9012 commit 8ed6644

File tree

3 files changed

+30
-24
lines changed

3 files changed

+30
-24
lines changed

include/kafka/KafkaClient.h

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class KafkaClient
6868
/**
6969
* Set a log callback for kafka clients, which do not have a client specific logging callback configured (see `setLogger`).
7070
*/
71-
static void setGlobalLogger(Logger logger = NoneLogger)
71+
static void setGlobalLogger(Logger logger = NullLogger)
7272
{
7373
std::call_once(Global<>::initOnce, [](){}); // Then no need to init within KafkaClient constructor
7474
Global<>::logger = std::move(logger);
@@ -103,6 +103,7 @@ class KafkaClient
103103

104104
/**
105105
* Fetch matadata from a available broker.
106+
* Note: the Metadata response information may trigger a re-join if any subscribed topic has changed partition count or existence state.
106107
*/
107108
Optional<BrokerMetadata> fetchBrokerMetadata(const std::string& topic,
108109
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_METADATA_TIMEOUT_MS),
@@ -436,7 +437,7 @@ KafkaClient::setLogLevel(int level)
436437
inline void
437438
KafkaClient::onLog(int level, const char* fac, const char* buf) const
438439
{
439-
doLog(level, nullptr, 0, "%s | %s", fac, buf); // The `filename`/`lineno` here is NULL (just wouldn't help)
440+
doLog(level, "LIBRDKAFKA", 0, "%s | %s", fac, buf); // The log is coming from librdkafka
440441
}
441442

442443
inline void
@@ -461,11 +462,10 @@ KafkaClient::statsCallback(rd_kafka_t* rk, char* jsonStrBuf, size_t jsonStrLen,
461462
inline Optional<BrokerMetadata>
462463
KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::milliseconds timeout, bool disableErrorLogging)
463464
{
464-
Optional<BrokerMetadata> ret;
465-
auto rkt = rd_kafka_topic_unique_ptr(rd_kafka_topic_new(getClientHandle(), topic.c_str(), nullptr));
466-
467465
const rd_kafka_metadata_t* rk_metadata = nullptr;
468-
rd_kafka_resp_err_t err = rd_kafka_metadata(getClientHandle(), false, rkt.get(), &rk_metadata, convertMsDurationToInt(timeout));
466+
// Here the input parameter for `all_topics` is `true`, since we want the `cgrp_update`
467+
rd_kafka_resp_err_t err = rd_kafka_metadata(getClientHandle(), true, nullptr, &rk_metadata, convertMsDurationToInt(timeout));
468+
469469
auto guard = rd_kafka_metadata_unique_ptr(rk_metadata);
470470

471471
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
@@ -474,40 +474,47 @@ KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::millisec
474474
{
475475
KAFKA_API_DO_LOG(Log::Level::Err, "failed to get BrokerMetadata! error[%s]", rd_kafka_err2str(err));
476476
}
477-
return ret;
477+
return Optional<BrokerMetadata>{};
478478
}
479479

480-
if (rk_metadata->topic_cnt != 1)
480+
const rd_kafka_metadata_topic* metadata_topic = nullptr;
481+
for (int i = 0; i < rk_metadata->topic_cnt; ++i)
481482
{
482-
if (!disableErrorLogging)
483+
if (rk_metadata->topics[i].topic == topic)
483484
{
484-
KAFKA_API_DO_LOG(Log::Level::Err, "failed to construct MetaData! topic_cnt[%d]", rk_metadata->topic_cnt);
485+
metadata_topic = &rk_metadata->topics[i];
486+
break;
485487
}
486-
return ret;
487488
}
488489

489-
const rd_kafka_metadata_topic& metadata_topic = rk_metadata->topics[0];
490-
if (metadata_topic.err != 0)
490+
if (!metadata_topic || metadata_topic->err)
491491
{
492492
if (!disableErrorLogging)
493493
{
494-
KAFKA_API_DO_LOG(Log::Level::Err, "failed to construct MetaData! topic.err[%s]", rd_kafka_err2str(metadata_topic.err));
494+
if (!metadata_topic)
495+
{
496+
KAFKA_API_DO_LOG(Log::Level::Err, "failed to find BrokerMetadata for topic[%s]", topic.c_str());
497+
}
498+
else
499+
{
500+
KAFKA_API_DO_LOG(Log::Level::Err, "failed to get BrokerMetadata for topic[%s]! error[%s]", topic.c_str(), rd_kafka_err2str(metadata_topic->err));
501+
}
495502
}
496-
return ret;
503+
return Optional<BrokerMetadata>{};
497504
}
498505

499506
// Construct the BrokerMetadata
500-
BrokerMetadata metadata(metadata_topic.topic);
507+
BrokerMetadata metadata(metadata_topic->topic);
501508
metadata.setOrigNodeName(rk_metadata->orig_broker_name ? std::string(rk_metadata->orig_broker_name) : "");
502509

503510
for (int i = 0; i < rk_metadata->broker_cnt; ++i)
504511
{
505512
metadata.addNode(rk_metadata->brokers[i].id, rk_metadata->brokers[i].host, rk_metadata->brokers[i].port);
506513
}
507514

508-
for (int i = 0; i < metadata_topic.partition_cnt; ++i)
515+
for (int i = 0; i < metadata_topic->partition_cnt; ++i)
509516
{
510-
const rd_kafka_metadata_partition& metadata_partition = metadata_topic.partitions[i];
517+
const rd_kafka_metadata_partition& metadata_partition = metadata_topic->partitions[i];
511518

512519
Partition partition = metadata_partition.id;
513520

@@ -536,8 +543,7 @@ KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::millisec
536543
metadata.addPartitionInfo(partition, partitionInfo);
537544
}
538545

539-
ret = metadata;
540-
return ret;
546+
return metadata;
541547
}
542548

543549

include/kafka/Log.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ inline void DefaultLogger(int level, const char* /*filename*/, int /*lineno*/, c
7979
std::cout << std::endl;
8080
}
8181

82-
inline void NoneLogger(int /*level*/, const char* /*filename*/, int /*lineno*/, const char* /*msg*/)
82+
inline void NullLogger(int /*level*/, const char* /*filename*/, int /*lineno*/, const char* /*msg*/)
8383
{
8484
}
8585

tests/integration/TestKafkaConsumer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1951,9 +1951,9 @@ TEST(KafkaAutoCommitConsumer, FetchBrokerMetadataTriggersRejoin)
19511951

19521952
auto rebalanceCb = [](Consumer::RebalanceEventType et, const TopicPartitions& tps) {
19531953
if (et == Consumer::RebalanceEventType::PartitionsAssigned) {
1954-
std::cout << "[" << Utility::getCurrentTime() << "] assigned partitions: " << toString(tps) << std::endl;
1954+
std::cout << "[" << Utility::getCurrentTime() << "] newly assigned partitions: " << toString(tps) << std::endl;
19551955
} else if (et == Consumer::RebalanceEventType::PartitionsRevoked) {
1956-
std::cout << "[" << Utility::getCurrentTime() << "] unassigned partitions: " << toString(tps) << std::endl;
1956+
std::cout << "[" << Utility::getCurrentTime() << "] newly unassigned partitions: " << toString(tps) << std::endl;
19571957
}
19581958
};
19591959

@@ -1981,7 +1981,7 @@ TEST(KafkaAutoCommitConsumer, FetchBrokerMetadataTriggersRejoin)
19811981
auto assignment = consumer.assignment();
19821982
std::cout << "[" << Utility::getCurrentTime() << "] assignment: " << toString(assignment) << std::endl;
19831983

1984-
// The new created topic-partitions should be within the assignment as well
1984+
// The newly created topic-partitions should be within the assignment as well
19851985
EXPECT_EQ(1, assignment.count({topic2, 0}));
19861986
}
19871987

0 commit comments

Comments
 (0)