Skip to content

Commit d9f9012

Browse files
committed
Add test for fetchBrokerMetadata
1 parent d3e68d5 commit d9f9012

File tree

1 file changed

+47
-1
lines changed

1 file changed

+47
-1
lines changed

tests/integration/TestKafkaConsumer.cc

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1302,7 +1302,7 @@ TEST(KafkaAutoCommitConsumer, WrongOperation_AssignThenSubscribe)
13021302
EXPECT_KAFKA_THROW(consumer.subscribe({topic}), RD_KAFKA_RESP_ERR__FAIL);
13031303
}
13041304

1305-
TEST(KafkaClient, GetBrokerMetadata)
1305+
TEST(KafkaClient, FetchBrokerMetadata)
13061306
{
13071307
const Topic topic = Utility::getRandomString();
13081308
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);
@@ -1939,3 +1939,49 @@ TEST(KafkaAutoCommitConsumer, CooperativeRebalance)
19391939
KafkaTestUtility::JoiningThread consumer4Thread(startConsumer, "consumer4", 10);
19401940
}
19411941

1942+
TEST(KafkaAutoCommitConsumer, FetchBrokerMetadataTriggersRejoin)
1943+
{
1944+
const std::string topicPrefix = Utility::getRandomString();
1945+
const std::string topicPattern = "^" + topicPrefix + "\\.*";
1946+
1947+
Topic topic1 = topicPrefix + "_1";
1948+
Topic topic2 = topicPrefix + "_2";
1949+
1950+
KafkaTestUtility::CreateKafkaTopic(topic1, 1, 1);
1951+
1952+
auto rebalanceCb = [](Consumer::RebalanceEventType et, const TopicPartitions& tps) {
1953+
if (et == Consumer::RebalanceEventType::PartitionsAssigned) {
1954+
std::cout << "[" << Utility::getCurrentTime() << "] assigned partitions: " << toString(tps) << std::endl;
1955+
} else if (et == Consumer::RebalanceEventType::PartitionsRevoked) {
1956+
std::cout << "[" << Utility::getCurrentTime() << "] unassigned partitions: " << toString(tps) << std::endl;
1957+
}
1958+
};
1959+
1960+
Properties props = KafkaTestUtility::GetKafkaClientCommonConfig()
1961+
.put(ConsumerConfig::PARTITION_ASSIGNMENT_STRATEGY, "cooperative-sticky");
1962+
1963+
KafkaAutoCommitConsumer consumer(props);
1964+
1965+
// Subscribe to the topic pattern
1966+
consumer.subscribe({topicPattern}, rebalanceCb);
1967+
1968+
consumer.poll(std::chrono::seconds(1));
1969+
1970+
// Create one more topic (with the same subscription pattern)
1971+
KafkaTestUtility::CreateKafkaTopic(topic2, 1, 1);
1972+
1973+
// Should be able to get the metadata for the new topic
1974+
// Note: here the Metadata response information would trigger a re-join as well
1975+
auto metadata2 = consumer.fetchBrokerMetadata(topic2);
1976+
ASSERT_TRUE(metadata2);
1977+
std::cout << "[" << Utility::getCurrentTime() << "] brokerMetadata for topic[" << topic2 << "]: " << metadata2->toString() << std::endl;
1978+
1979+
consumer.poll(std::chrono::seconds(1));
1980+
1981+
auto assignment = consumer.assignment();
1982+
std::cout << "[" << Utility::getCurrentTime() << "] assignment: " << toString(assignment) << std::endl;
1983+
1984+
// The new created topic-partitions should be within the assignment as well
1985+
EXPECT_EQ(1, assignment.count({topic2, 0}));
1986+
}
1987+

0 commit comments

Comments
 (0)