Skip to content

Commit 7d5ec6f

Browse files
committed
Different behavior with Empty/NULL fields
1 parent d2130b4 commit 7d5ec6f

File tree

6 files changed

+103
-10
lines changed

6 files changed

+103
-10
lines changed

.github/workflows/kafka_api_ci_tests.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ jobs:
7777
export PATH=/snap/bin:$PATH
7878
fi
7979
80+
# 2. Install clang-tidy
81+
if [[ ${CHECK_OPTION} == *"clang-tidy"* ]]; then
82+
sudo apt install -y clang-tidy
83+
fi
84+
8085
# 2. Install googletest (v1.10.0)
8186
wget -nv https://github.com/google/googletest/archive/release-1.10.0.tar.gz
8287
tar -xzf release-1.10.0.tar.gz
@@ -120,7 +125,7 @@ jobs:
120125
fi
121126
122127
if [[ ${CHECK_OPTION} == *"clang-tidy"* ]]; then
123-
export BUILD_OPTION='-DBUILD_OPTION_CLANG_TIEY=ON'
128+
export BUILD_OPTION='-DBUILD_OPTION_CLANG_TIDY=ON'
124129
fi
125130
126131
if [[ ${CHECK_OPTION} == *"asan"* ]]; then

include/kafka/BrokerMetadata.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ struct BrokerMetadata {
5555
*/
5656
struct PartitionInfo
5757
{
58-
void setLeader(Node::Id id) { leader = id; }
58+
explicit PartitionInfo(Node::Id leaderId): leader(leaderId) {}
59+
5960
void addReplica(Node::Id id) { replicas.emplace_back(id); }
6061
void addInSyncReplica(Node::Id id) { inSyncReplicas.emplace_back(id); }
6162

@@ -148,7 +149,7 @@ BrokerMetadata::toString(const PartitionInfo& partitionInfo) const
148149
{
149150
std::ostringstream oss;
150151

151-
auto streamNodes = [this](std::ostringstream& ss, const std::vector<Node::Id> nodeIds) -> std::ostringstream& {
152+
auto streamNodes = [this](std::ostringstream& ss, const std::vector<Node::Id>& nodeIds) -> std::ostringstream& {
152153
bool isTheFirst = true;
153154
for (const auto id: nodeIds)
154155
{

include/kafka/KafkaClient.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -514,8 +514,6 @@ KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::millisec
514514

515515
Partition partition = metadata_partition.id;
516516

517-
BrokerMetadata::PartitionInfo partitionInfo;
518-
519517
if (metadata_partition.err != 0)
520518
{
521519
if (!disableErrorLogging)
@@ -526,7 +524,7 @@ KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::millisec
526524
continue;
527525
}
528526

529-
partitionInfo.setLeader(metadata_partition.leader);
527+
BrokerMetadata::PartitionInfo partitionInfo(metadata_partition.leader);
530528

531529
for (int j = 0; j < metadata_partition.replica_cnt; ++j)
532530
{

include/kafka/Types.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ class ConstBuffer
9090
std::size_t size() const { return _size; }
9191
std::string toString() const
9292
{
93+
if (_size == 0) return _data ? "[empty]" : "[NULL]";
94+
9395
std::ostringstream oss;
9496

9597
auto printChar = [&oss](const unsigned char c) {

tests/integration/TestKafkaConsumer.cc

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,95 @@ TEST(KafkaAutoCommitConsumer, PollWithHeaders)
174174
consumer.close();
175175
}
176176

177+
TEST(KafkaAutoCommitConsumer, RecordWithEmptyOrNullFields)
178+
{
179+
auto sendMessages = [](const Kafka::ProducerRecord& record, std::size_t repeat, const std::string& partitioner) {
180+
Kafka::KafkaSyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
181+
.put(ProducerConfig::PARTITIONER, partitioner));
182+
producer.setLogLevel(LOG_CRIT);
183+
for (std::size_t i = 0; i < repeat; ++i) {
184+
producer.send(record);
185+
}
186+
};
187+
188+
std::cout << "[" << Utility::getCurrentTime() << "] Try with messages with empty fields" << std::endl;
189+
{
190+
const Topic topic = Utility::getRandomString();
191+
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);
192+
193+
const std::string emptyStr{};
194+
const auto emptyField = ConstBuffer(emptyStr.c_str(), emptyStr.size());
195+
196+
auto recordWithEmptyFields = Kafka::ProducerRecord(topic, emptyField, emptyField);
197+
198+
// murmur2_random, -- NULL (NOTE: not empty) keys are randomly partitioned.
199+
// This is the default for `modern-cpp-kafka` API (also is the default partitioner in the Java Producer)
200+
sendMessages(recordWithEmptyFields, 10, "murmur2_random");
201+
202+
// The auto-commit consumer
203+
KafkaAutoCommitConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig()
204+
.put(ConsumerConfig::AUTO_OFFSET_RESET, "earliest"));
205+
// Subscribe topics
206+
consumer.subscribe({topic});
207+
208+
// Poll all messages
209+
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(1));
210+
211+
// Check the key/value (should be empty: 0 length, but not NULL)
212+
std::map<int, int> counts;
213+
for (const auto& record: records)
214+
{
215+
std::cout << record.toString() << std::endl;
216+
217+
EXPECT_TRUE(record.key().size() == 0);
218+
EXPECT_TRUE(record.value().size() == 0);
219+
EXPECT_TRUE(record.key().data() != nullptr);
220+
EXPECT_TRUE(record.value().data() != nullptr);
221+
222+
counts[record.partition()] += 1;
223+
}
224+
// Should be hashed to the same partition (with empty key)
225+
EXPECT_TRUE(counts.size() == 1);
226+
}
227+
228+
std::cout << "[" << Utility::getCurrentTime() << "] Try with messages with NULL fields" << std::endl;
229+
{
230+
const Topic topic = Utility::getRandomString();
231+
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);
232+
233+
auto recordWithNullFields = Kafka::ProducerRecord(topic, Kafka::NullKey, Kafka::NullValue);
234+
235+
// murmur2_random, -- NULL (NOTE: not empty) keys are randomly partitioned.
236+
// This is the default for `modern-cpp-kafka` API (also is the default partitioner in the Java Producer)
237+
sendMessages(recordWithNullFields, 10, "murmur2_random");
238+
239+
// The auto-commit consumer
240+
KafkaAutoCommitConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig()
241+
.put(ConsumerConfig::AUTO_OFFSET_RESET, "earliest"));
242+
// Subscribe topics
243+
consumer.subscribe({topic});
244+
245+
// Poll all messages
246+
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(1));
247+
248+
// Check the key/value (should be NULL)
249+
std::map<int, int> counts;
250+
for (const auto& record: records)
251+
{
252+
std::cout << record.toString() << std::endl;
253+
254+
EXPECT_TRUE(record.key().size() == 0);
255+
EXPECT_TRUE(record.value().size() == 0);
256+
EXPECT_TRUE(record.key().data() == nullptr);
257+
EXPECT_TRUE(record.value().data() == nullptr);
258+
259+
counts[record.partition()] += 1;
260+
}
261+
// Should be hashed to random partitions (with NULL key)
262+
EXPECT_TRUE(counts.size() > 1);
263+
}
264+
}
265+
177266
TEST(KafkaAutoCommitConsumer, SeekAndPoll)
178267
{
179268
const Topic topic = Utility::getRandomString();

tests/unit/TestBrokerMetadata.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ TEST(BrokerMetadata, Basic)
3939
// Add info for partitions
4040
for (Kafka::Partition partition = 0; partition < numPartition; ++partition)
4141
{
42-
Kafka::BrokerMetadata::PartitionInfo partitionInfo;
43-
partitionInfo.setLeader(nodes[partition].id);
42+
Kafka::BrokerMetadata::PartitionInfo partitionInfo(nodes[partition].id);
4443
for (const auto& node: nodes)
4544
{
4645
partitionInfo.addReplica(node.id);
@@ -83,8 +82,7 @@ TEST(BrokerMetadata, IncompleteInfo)
8382
// Add info for partitions
8483
for (Kafka::Partition partition = 0; partition < numPartition; ++partition)
8584
{
86-
Kafka::BrokerMetadata::PartitionInfo partitionInfo;
87-
partitionInfo.setLeader(nodes[partition].id);
85+
Kafka::BrokerMetadata::PartitionInfo partitionInfo(nodes[partition].id);
8886
for (const auto& node: nodes)
8987
{
9088
partitionInfo.addReplica(node.id);

0 commit comments

Comments
 (0)