Skip to content

Commit c53eaa4

Browse files
committed
Add robustness test cases
1 parent 3c31018 commit c53eaa4

File tree

10 files changed

+992
-30
lines changed

10 files changed

+992
-30
lines changed

.github/workflows/on_pull_request.yml

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,10 @@ jobs:
6565
cd $BUILD_SUB_DIR
6666
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
6767
# start kafka cluster
68+
rm -f test.env
6869
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
69-
sleep 10
70-
kafkacat -L -b 127.0.0.1:40091
7170
# run tests
72-
cat test.env
71+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
7372
source test.env
7473
ctest -VV
7574
# stop kafka cluster
@@ -135,11 +134,10 @@ jobs:
135134
cd $BUILD_SUB_DIR
136135
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
137136
# start kafka cluster
137+
rm -f test.env
138138
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
139-
sleep 10
140-
kafkacat -L -b 127.0.0.1:40091
141139
# run tests
142-
cat test.env
140+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
143141
source test.env
144142
ctest -VV
145143
# stop kafka cluster
@@ -205,11 +203,10 @@ jobs:
205203
cd $BUILD_SUB_DIR
206204
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
207205
# start kafka cluster
206+
rm -f test.env
208207
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
209-
sleep 10
210-
kafkacat -L -b 127.0.0.1:40091
211208
# run tests
212-
cat test.env
209+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
213210
source test.env
214211
ctest -VV
215212
# stop kafka cluster
@@ -275,11 +272,10 @@ jobs:
275272
cd $BUILD_SUB_DIR
276273
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
277274
# start kafka cluster
275+
rm -f test.env
278276
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
279-
sleep 10
280-
kafkacat -L -b 127.0.0.1:40091
281277
# run tests
282-
cat test.env
278+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
283279
source test.env
284280
ctest -VV
285281
# stop kafka cluster
@@ -344,11 +340,10 @@ jobs:
344340
cd $BUILD_SUB_DIR
345341
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
346342
# start kafka cluster
343+
rm -f test.env
347344
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
348-
sleep 10
349-
kafkacat -L -b 127.0.0.1:40091
350345
# run tests
351-
cat test.env
346+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
352347
source test.env
353348
ctest -VV
354349
# stop kafka cluster
@@ -413,11 +408,10 @@ jobs:
413408
cd $BUILD_SUB_DIR
414409
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
415410
# start kafka cluster
411+
rm -f test.env
416412
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
417-
sleep 10
418-
kafkacat -L -b 127.0.0.1:40091
419413
# run tests
420-
cat test.env
414+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
421415
source test.env
422416
ctest -VV
423417
# stop kafka cluster
@@ -483,11 +477,10 @@ jobs:
483477
cd $BUILD_SUB_DIR
484478
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
485479
# start kafka cluster
480+
rm -f test.env
486481
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
487-
sleep 10
488-
kafkacat -L -b 127.0.0.1:40091
489482
# run tests
490-
cat test.env
483+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
491484
source test.env
492485
ctest -VV
493486
# stop kafka cluster

include/kafka/KafkaClient.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class KafkaClient
116116
if (level >= LOG_EMERG && level <= _logLevel && logger)
117117
{
118118
LogBuffer<LOG_BUFFER_SIZE> logBuffer;
119-
logBuffer.print(name().c_str()).print(format, args...);
119+
logBuffer.print("%s ", name().c_str()).print(format, args...);
120120
logger(level, filename, lineno, logBuffer.c_str());
121121
}
122122
}

scripts/start-local-kafka-cluster.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ def GenerateBrokerConfig(brokerId, brokerPort, zookeeperPort, logDir):
8383
zookeeper.connect=127.0.0.1:${zookeeper_port}
8484
num.partitions=5
8585
default.replication.factor=3
86+
offsets.topic.replication.factor=3
87+
offsets.commit.timeout.ms=10000
88+
unclean.leader.election.enable=false
8689
min.insync.replicas=2
8790
''')
8891
properties = brokerTemplate.substitute(broker_id=brokerId, listener_port=brokerPort, zookeeper_port=zookeeperPort, log_dir=logDir)
@@ -138,13 +141,35 @@ def main():
138141
for (i, brokerPort) in enumerate(brokerPorts):
139142
StartKafkaServer('kafka{0}'.format(i), kafkaPropFiles[i].filename, outDir)
140143

141-
print('Kafka server started... (zookeeper pid: {0}, kafka pids: {1})'.format(zookeeperPids, kafkaPids))
142-
143-
with open(r'test.env', 'w') as envFile:
144-
envFile.write('export KAFKA_BROKER_LIST={0}\n'.format(','.join(['127.0.0.1:{0}'.format(port) for port in brokerPorts])))
145-
envFile.write('export KAFKA_BROKER_PIDS={0}\n'.format(','.join([str(pid) for pid in kafkaPids])))
146-
147-
processPool.run()
144+
MAX_RETRY = 60
145+
retry = 0
146+
while retry < MAX_RETRY:
147+
time.sleep(1)
148+
149+
kafkaBrokerPids = []
150+
netstatCall = subprocess.Popen(['netstat', '-tlp'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
151+
(out, err) = netstatCall.communicate();
152+
for brokerPort in brokerPorts:
153+
matched = re.search('tcp[4 6] +[0-9]+ +[0-9]+ +localhost:{0} +.+ +LISTEN *([0-9]+)/java.*'.format(brokerPort), out.decode('utf-8'))
154+
if matched:
155+
kafkaBrokerPids.append(matched.group(1))
156+
157+
if len(kafkaBrokerPids) != len(brokerPorts):
158+
continue
159+
160+
with open(r'test.env', 'w') as envFile:
161+
envFile.write('export KAFKA_BROKER_LIST={0}\n'.format(','.join(['127.0.0.1:{0}'.format(port) for port in brokerPorts])))
162+
envFile.write('export KAFKA_BROKER_PIDS={0}\n'.format(','.join([pid for pid in kafkaBrokerPids])))
163+
break
164+
165+
retry += 1
166+
167+
if retry < MAX_RETRY:
168+
print('Kafka cluster started with ports: {0}!'.format(brokerPorts))
169+
processPool.run()
170+
else:
171+
print('Kafka cluster failed to start with ports: {0}!'.format(brokerPorts))
172+
processPoll.terminate()
148173

149174

150175
if __name__ == '__main__':

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,5 @@ message(STATUS "googletest root directory: ${GTEST_ROOT}")
2828

2929
add_subdirectory(unit)
3030
add_subdirectory(integration)
31+
add_subdirectory(robustness)
32+

tests/integration/TestKafkaConsumer.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1494,6 +1494,9 @@ TEST(KafkaManualCommitConsumer, RecoverByTime)
14941494
assignedPartitions = tps;
14951495
}
14961496
});
1497+
// No message yet
1498+
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(1));
1499+
EXPECT_EQ(0, records.size());
14971500

14981501
// Query where the consumer should start from
14991502
const auto offsetsToSeek = consumer.offsetsForTime(assignedPartitions, persistedTimepoint);
@@ -1509,7 +1512,7 @@ TEST(KafkaManualCommitConsumer, RecoverByTime)
15091512
}
15101513

15111514
// Process messages
1512-
const auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer);
1515+
records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer);
15131516
for (const auto& record: records)
15141517
{
15151518
messagesProcessed.emplace_back(record.key().toString(), record.value().toString());

tests/robustness/CMakeLists.txt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
project("kafka-robustness-test")
2+
3+
# Directories
4+
include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
5+
6+
include_directories(SYSTEM ${GTEST_ROOT}/include)
7+
link_directories(SYSTEM ${GTEST_ROOT}/lib)
8+
9+
# Target
10+
file(GLOB TEST_SRCS *.cc)
11+
12+
add_executable("${PROJECT_NAME}" ${TEST_SRCS})
13+
14+
target_link_libraries("${PROJECT_NAME}" modern-cpp-kafka-api gmock_main pthread)
15+
16+
add_test(NAME ${PROJECT_NAME} COMMAND ./${PROJECT_NAME})
17+
18+
if (BUILD_OPTION_USE_ASAN OR BUILD_OPTION_USE_TSAN)
19+
target_compile_options(${PROJECT_NAME} PRIVATE "-fno-sanitize=all")
20+
target_link_options(${PROJECT_NAME} PRIVATE "-fno-sanitize=all")
21+
if (BUILD_OPTION_USE_ASAN)
22+
set_property(TEST ${PROJECT_NAME} PROPERTY ENVIRONMENT ASAN_OPTIONS=detect_leaks=1)
23+
endif ()
24+
endif ()
25+
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
#include "../utils/TestUtility.h"
2+
3+
#include "kafka/AdminClient.h"
4+
5+
#include "gtest/gtest.h"
6+
7+
#include <iostream>
8+
9+
10+
using namespace KAFKA_API;
11+
12+
13+
TEST(AdminClient, BrokersTimeout)
14+
{
15+
Topic topic = Utility::getRandomString();
16+
const int numPartitions = 5;
17+
const int replicaFactor = 3;
18+
19+
{
20+
AdminClient adminClient(KafkaTestUtility::GetKafkaClientCommonConfig());
21+
std::cout << "[" << Utility::getCurrentTime() << "] " << adminClient.name() << " started" << std::endl;
22+
23+
KafkaTestUtility::PauseBrokers();
24+
25+
// Fetch metadata, -- timeout
26+
std::cout << "[" << Utility::getCurrentTime() << "] will ListTopics" << std::endl;
27+
{
28+
auto fetchResult = adminClient.fetchBrokerMetadata(topic, std::chrono::seconds(1));
29+
std::cout << "[" << Utility::getCurrentTime() << "] FetchMetadata: result[" << (fetchResult ? fetchResult->toString() : "NA") << "]" << std::endl;
30+
EXPECT_FALSE(fetchResult);
31+
}
32+
33+
// List Topics, -- timeout
34+
std::cout << "[" << Utility::getCurrentTime() << "] will ListTopics" << std::endl;
35+
{
36+
auto listResult = adminClient.listTopics(std::chrono::seconds(1));
37+
std::cout << "[" << Utility::getCurrentTime() << "] ListTopics: result[" << listResult.detail << "]" << std::endl;
38+
EXPECT_TRUE(listResult.error.value() == RD_KAFKA_RESP_ERR__TRANSPORT || listResult.error.value() == RD_KAFKA_RESP_ERR__TIMED_OUT);
39+
EXPECT_EQ(0, listResult.topics.size());
40+
}
41+
42+
// Create Topics, -- timeout
43+
std::cout << "[" << Utility::getCurrentTime() << "] will CreateTopics" << std::endl;
44+
{
45+
auto createResult = adminClient.createTopics({topic}, numPartitions, replicaFactor, Properties(), std::chrono::seconds(3));
46+
std::cout << "[" << Utility::getCurrentTime() << "] createTopics: result[" << createResult.detail << "]" << std::endl;
47+
EXPECT_TRUE(createResult.error);
48+
}
49+
}
50+
51+
KafkaTestUtility::ResumeBrokers();
52+
53+
// Since the brokers might not be ready during the short time, sometimes we have to retry...
54+
constexpr int maxRetry = 5;
55+
for (int i = 0; i < maxRetry; ++i)
56+
{
57+
AdminClient adminClient(KafkaTestUtility::GetKafkaClientCommonConfig());
58+
59+
// Create Topics, -- success
60+
std::cout << "[" << Utility::getCurrentTime() << "] will CreateTopics" << std::endl;
61+
auto createResult = adminClient.createTopics({topic}, numPartitions, replicaFactor);
62+
std::cout << "[" << Utility::getCurrentTime() << "] CreateTopics: result[" << createResult.detail << "]" << std::endl;
63+
if (!createResult.error || createResult.error.value() == RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS)
64+
{
65+
break;
66+
}
67+
68+
std::this_thread::sleep_for(std::chrono::seconds(1));
69+
70+
auto listResult = adminClient.listTopics(std::chrono::seconds(1));
71+
if (!listResult.error && listResult.topics.count(topic) == 1)
72+
{
73+
auto metadata = adminClient.fetchBrokerMetadata(topic, std::chrono::seconds(5), false);
74+
std::cout << "[" << Utility::getCurrentTime() << "] broker metadata: " << (metadata ? metadata->toString() : "NA") << std::endl;
75+
}
76+
77+
EXPECT_NE(maxRetry - 1, i);
78+
}
79+
80+
KafkaTestUtility::WaitMetadataSyncUpBetweenBrokers();
81+
82+
{
83+
AdminClient adminClient(KafkaTestUtility::GetKafkaClientCommonConfig());
84+
85+
// List Topics, -- success
86+
std::cout << "[" << Utility::getCurrentTime() << "] will ListTopics" << std::endl;
87+
{
88+
auto listResult = adminClient.listTopics();
89+
std::cout << "[" << Utility::getCurrentTime() << "] ListTopics: result[" << listResult.detail << "]" << std::endl;
90+
EXPECT_FALSE(listResult.error);
91+
EXPECT_EQ(1, listResult.topics.count(topic));
92+
}
93+
94+
// Fetch metadata, -- success
95+
std::cout << "[" << Utility::getCurrentTime() << "] will FetchMetadata" << std::endl;
96+
{
97+
auto fetchResult = adminClient.fetchBrokerMetadata(topic, std::chrono::seconds(5));
98+
std::cout << "[" << Utility::getCurrentTime() << "] FetchMetadata: result[" << (fetchResult ? fetchResult->toString() : "NA") << "]" << std::endl;
99+
EXPECT_TRUE(fetchResult);
100+
}
101+
}
102+
103+
KafkaTestUtility::PauseBrokers();
104+
105+
{
106+
AdminClient adminClient(KafkaTestUtility::GetKafkaClientCommonConfig());
107+
108+
// Delete Topics, -- timeout
109+
std::cout << "[" << Utility::getCurrentTime() << "] will DeleteTopics" << std::endl;
110+
{
111+
auto deleteResult = adminClient.deleteTopics({topic}, std::chrono::seconds(5));
112+
std::cout << "[" << Utility::getCurrentTime() << "] DeleteTopics: result[" << deleteResult.detail << "]" << std::endl;
113+
EXPECT_TRUE(deleteResult.error);
114+
}
115+
}
116+
117+
KafkaTestUtility::ResumeBrokers();
118+
119+
// Since the brokers might not be ready during the short time, sometimes we have to retry...
120+
for (int i = 0; i < maxRetry; ++i)
121+
{
122+
AdminClient adminClient(KafkaTestUtility::GetKafkaClientCommonConfig());
123+
// Delete Topics, -- success
124+
std::cout << "[" << Utility::getCurrentTime() << "] will DeleteTopics" << std::endl;
125+
auto deleteResult = adminClient.deleteTopics({topic});
126+
std::cout << "[" << Utility::getCurrentTime() << "] DeleteTopics: result[" << deleteResult.detail << "]" << std::endl;
127+
// In some edge cases, the result might be "timed out", while in fact the topic had already been deleted.
128+
// Then, even we keep retrying, we could only get response of "unknown topic or partition", and this should be treated as "SUCCESS".
129+
if (!deleteResult.error || deleteResult.error.value() == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
130+
{
131+
break;
132+
}
133+
134+
std::this_thread::sleep_for(std::chrono::seconds(1));
135+
EXPECT_NE(maxRetry - 1, i);
136+
}
137+
138+
KafkaTestUtility::WaitMetadataSyncUpBetweenBrokers();
139+
140+
{
141+
AdminClient adminClient(KafkaTestUtility::GetKafkaClientCommonConfig());
142+
143+
// List Topics, -- success
144+
std::cout << "[" << Utility::getCurrentTime() << "] will ListTopics" << std::endl;
145+
{
146+
auto listResult = adminClient.listTopics(std::chrono::seconds(1));
147+
std::cout << "[" << Utility::getCurrentTime() << "] ListTopics: result[" << listResult.detail << "]" << std::endl;
148+
EXPECT_FALSE(listResult.error);
149+
EXPECT_EQ(0, listResult.topics.count(topic));
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)