Skip to content

Commit 747b8ca

Browse files
authored
Merge pull request #5 from kenneth-jia/EnableRobustnessTest
Enable robustness test in CI
2 parents 900053e + c53eaa4 commit 747b8ca

File tree

11 files changed

+1011
-33
lines changed

11 files changed

+1011
-33
lines changed

.github/workflows/on_pull_request.yml

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,12 @@ jobs:
6565
cd $BUILD_SUB_DIR
6666
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
6767
# start kafka cluster
68+
rm -f test.env
6869
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
69-
sleep 10
70-
kafkacat -L -b 127.0.0.1:40091
7170
# run tests
72-
export KAFKA_BROKER_LIST="127.0.0.1:40091,127.0.0.1:40092,127.0.0.1:40093"; ctest -VV
71+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
72+
source test.env
73+
ctest -VV
7374
# stop kafka cluster
7475
kafka-server-stop.sh
7576
zookeeper-server-stop.sh
@@ -133,11 +134,12 @@ jobs:
133134
cd $BUILD_SUB_DIR
134135
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
135136
# start kafka cluster
137+
rm -f test.env
136138
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
137-
sleep 10
138-
kafkacat -L -b 127.0.0.1:40091
139139
# run tests
140-
export KAFKA_BROKER_LIST="127.0.0.1:40091,127.0.0.1:40092,127.0.0.1:40093"; ctest -VV
140+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
141+
source test.env
142+
ctest -VV
141143
# stop kafka cluster
142144
kafka-server-stop.sh
143145
zookeeper-server-stop.sh
@@ -201,11 +203,12 @@ jobs:
201203
cd $BUILD_SUB_DIR
202204
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
203205
# start kafka cluster
206+
rm -f test.env
204207
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
205-
sleep 10
206-
kafkacat -L -b 127.0.0.1:40091
207208
# run tests
208-
export KAFKA_BROKER_LIST="127.0.0.1:40091,127.0.0.1:40092,127.0.0.1:40093"; ctest -VV
209+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
210+
source test.env
211+
ctest -VV
209212
# stop kafka cluster
210213
kafka-server-stop.sh
211214
zookeeper-server-stop.sh
@@ -269,11 +272,12 @@ jobs:
269272
cd $BUILD_SUB_DIR
270273
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
271274
# start kafka cluster
275+
rm -f test.env
272276
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
273-
sleep 10
274-
kafkacat -L -b 127.0.0.1:40091
275277
# run tests
276-
export KAFKA_BROKER_LIST="127.0.0.1:40091,127.0.0.1:40092,127.0.0.1:40093"; ctest -VV
278+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
279+
source test.env
280+
ctest -VV
277281
# stop kafka cluster
278282
kafka-server-stop.sh
279283
zookeeper-server-stop.sh
@@ -336,11 +340,12 @@ jobs:
336340
cd $BUILD_SUB_DIR
337341
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
338342
# start kafka cluster
343+
rm -f test.env
339344
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
340-
sleep 10
341-
kafkacat -L -b 127.0.0.1:40091
342345
# run tests
343-
export KAFKA_BROKER_LIST="127.0.0.1:40091,127.0.0.1:40092,127.0.0.1:40093"; ctest -VV
346+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
347+
source test.env
348+
ctest -VV
344349
# stop kafka cluster
345350
kafka-server-stop.sh
346351
zookeeper-server-stop.sh
@@ -403,11 +408,12 @@ jobs:
403408
cd $BUILD_SUB_DIR
404409
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
405410
# start kafka cluster
411+
rm -f test.env
406412
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
407-
sleep 10
408-
kafkacat -L -b 127.0.0.1:40091
409413
# run tests
410-
export KAFKA_BROKER_LIST="127.0.0.1:40091,127.0.0.1:40092,127.0.0.1:40093"; ctest -VV
414+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
415+
source test.env
416+
ctest -VV
411417
# stop kafka cluster
412418
kafka-server-stop.sh
413419
zookeeper-server-stop.sh
@@ -471,11 +477,12 @@ jobs:
471477
cd $BUILD_SUB_DIR
472478
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
473479
# start kafka cluster
480+
rm -f test.env
474481
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
475-
sleep 10
476-
kafkacat -L -b 127.0.0.1:40091
477482
# run tests
478-
export KAFKA_BROKER_LIST="127.0.0.1:40091,127.0.0.1:40092,127.0.0.1:40093"; ctest -VV
483+
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
484+
source test.env
485+
ctest -VV
479486
# stop kafka cluster
480487
kafka-server-stop.sh
481488
zookeeper-server-stop.sh

include/kafka/KafkaClient.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class KafkaClient
116116
if (level >= LOG_EMERG && level <= _logLevel && logger)
117117
{
118118
LogBuffer<LOG_BUFFER_SIZE> logBuffer;
119-
logBuffer.print(name().c_str()).print(format, args...);
119+
logBuffer.print("%s ", name().c_str()).print(format, args...);
120120
logger(level, filename, lineno, logBuffer.c_str());
121121
}
122122
}

include/kafka/Project.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
#define KAFKA_API kafka
66
#endif
77

8-
// Here is the MACRO to enable UT stubs
8+
// Here is the MACRO to enable internal stubs for UT
99
// #ifndef KAFKA_API_ENABLE_UNIT_TEST_STUBS
1010
// #define KAFKA_API_ENABLE_UNIT_TEST_STUBS
11-
// #endif
11+
// #endif
12+

scripts/start-local-kafka-cluster.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,20 @@ def addProcess(self, cmd, name, outFile, errFile):
3535
stdout=out,
3636
stderr=err)
3737

38-
print('name: {0}, pid: {1}'.format(name, p.pid))
39-
4038
if 'kafka' in name:
4139
kafkaPids.append(p.pid)
4240
else:
4341
zookeeperPids.append(p.pid)
4442

4543
self.processList.append((p, name))
4644

47-
4845
def run(self):
4946
anyFailure = False
5047
while self.processList:
5148
for (i, (p, name)) in enumerate(self.processList):
5249
ret = p.poll()
5350
if ret != None:
54-
print('failed: {0}, pid: {1}, ret: {2}'.format(name, p.pid, ret))
51+
print('Failed to start server: {0}, pid: {1}, ret: {2}'.format(name, p.pid, ret))
5552
self.processList.pop(i)
5653
anyFailure = True
5754
break
@@ -60,8 +57,8 @@ def run(self):
6057

6158
def terminate(self):
6259
for (p, name) in self.processList:
63-
print('terminate: {0}, pid: {1}'.format(name, p.pid))
6460
p.kill()
61+
print('{0} terminated'.format(name))
6562

6663
def __del__(self):
6764
self.terminate()
@@ -86,6 +83,9 @@ def GenerateBrokerConfig(brokerId, brokerPort, zookeeperPort, logDir):
8683
zookeeper.connect=127.0.0.1:${zookeeper_port}
8784
num.partitions=5
8885
default.replication.factor=3
86+
offsets.topic.replication.factor=3
87+
offsets.commit.timeout.ms=10000
88+
unclean.leader.election.enable=false
8989
min.insync.replicas=2
9090
''')
9191
properties = brokerTemplate.substitute(broker_id=brokerId, listener_port=brokerPort, zookeeper_port=zookeeperPort, log_dir=logDir)
@@ -141,9 +141,35 @@ def main():
141141
for (i, brokerPort) in enumerate(brokerPorts):
142142
StartKafkaServer('kafka{0}'.format(i), kafkaPropFiles[i].filename, outDir)
143143

144-
print('Kafka server started... (zookeeper pid: {0}, kafka pids: {1})'.format(zookeeperPids, kafkaPids))
145-
146-
processPool.run()
144+
MAX_RETRY = 60
145+
retry = 0
146+
while retry < MAX_RETRY:
147+
time.sleep(1)
148+
149+
kafkaBrokerPids = []
150+
netstatCall = subprocess.Popen(['netstat', '-tlp'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
151+
(out, err) = netstatCall.communicate();
152+
for brokerPort in brokerPorts:
153+
matched = re.search('tcp[4 6] +[0-9]+ +[0-9]+ +localhost:{0} +.+ +LISTEN *([0-9]+)/java.*'.format(brokerPort), out.decode('utf-8'))
154+
if matched:
155+
kafkaBrokerPids.append(matched.group(1))
156+
157+
if len(kafkaBrokerPids) != len(brokerPorts):
158+
continue
159+
160+
with open(r'test.env', 'w') as envFile:
161+
envFile.write('export KAFKA_BROKER_LIST={0}\n'.format(','.join(['127.0.0.1:{0}'.format(port) for port in brokerPorts])))
162+
envFile.write('export KAFKA_BROKER_PIDS={0}\n'.format(','.join([pid for pid in kafkaBrokerPids])))
163+
break
164+
165+
retry += 1
166+
167+
if retry < MAX_RETRY:
168+
print('Kafka cluster started with ports: {0}!'.format(brokerPorts))
169+
processPool.run()
170+
else:
171+
print('Kafka cluster failed to start with ports: {0}!'.format(brokerPorts))
172+
processPoll.terminate()
147173

148174

149175
if __name__ == '__main__':

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,5 @@ message(STATUS "googletest root directory: ${GTEST_ROOT}")
2828

2929
add_subdirectory(unit)
3030
add_subdirectory(integration)
31+
add_subdirectory(robustness)
32+

tests/integration/TestKafkaConsumer.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1494,6 +1494,9 @@ TEST(KafkaManualCommitConsumer, RecoverByTime)
14941494
assignedPartitions = tps;
14951495
}
14961496
});
1497+
// No message yet
1498+
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(1));
1499+
EXPECT_EQ(0, records.size());
14971500

14981501
// Query where the consumer should start from
14991502
const auto offsetsToSeek = consumer.offsetsForTime(assignedPartitions, persistedTimepoint);
@@ -1509,7 +1512,7 @@ TEST(KafkaManualCommitConsumer, RecoverByTime)
15091512
}
15101513

15111514
// Process messages
1512-
const auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer);
1515+
records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer);
15131516
for (const auto& record: records)
15141517
{
15151518
messagesProcessed.emplace_back(record.key().toString(), record.value().toString());

tests/robustness/CMakeLists.txt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
project("kafka-robustness-test")
2+
3+
# Directories
4+
include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
5+
6+
include_directories(SYSTEM ${GTEST_ROOT}/include)
7+
link_directories(SYSTEM ${GTEST_ROOT}/lib)
8+
9+
# Target
10+
file(GLOB TEST_SRCS *.cc)
11+
12+
add_executable("${PROJECT_NAME}" ${TEST_SRCS})
13+
14+
target_link_libraries("${PROJECT_NAME}" modern-cpp-kafka-api gmock_main pthread)
15+
16+
add_test(NAME ${PROJECT_NAME} COMMAND ./${PROJECT_NAME})
17+
18+
if (BUILD_OPTION_USE_ASAN OR BUILD_OPTION_USE_TSAN)
19+
target_compile_options(${PROJECT_NAME} PRIVATE "-fno-sanitize=all")
20+
target_link_options(${PROJECT_NAME} PRIVATE "-fno-sanitize=all")
21+
if (BUILD_OPTION_USE_ASAN)
22+
set_property(TEST ${PROJECT_NAME} PROPERTY ENVIRONMENT ASAN_OPTIONS=detect_leaks=1)
23+
endif ()
24+
endif ()
25+

0 commit comments

Comments
 (0)