Skip to content

Commit 2be7bf8

Browse files
authored
Merge pull request #7 from kenneth-jia/FixUnstableTests
Fix unstable testcases
2 parents b451f80 + 85f1db5 commit 2be7bf8

File tree

6 files changed

+66
-84
lines changed

6 files changed

+66
-84
lines changed

.github/workflows/on_pull_request.yml

Lines changed: 21 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,9 @@ jobs:
4545
./configure
4646
make -j${CPU_CORE_NUM} && sudo make install
4747
cd ../
48-
# 6. Install zookeeper
49-
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
50-
tar xvzf apache-zookeeper-3.6.2-bin.tar.gz
51-
# 7. Install kafka
48+
# 6. Install kafka
5249
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
5350
tar xvzf kafka_2.13-2.6.0.tgz
54-
# 8. Install kafkacat
55-
sudo apt install kafkacat
5651
- name: Config
5752
run: |
5853
cd $BUILD_SUB_DIR
@@ -64,10 +59,10 @@ jobs:
6459
- name: Test
6560
run: |
6661
cd $BUILD_SUB_DIR
67-
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
62+
export PATH=`pwd`/kafka_2.13-2.6.0/bin:$PATH
6863
# start kafka cluster
6964
rm -f test.env
70-
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
65+
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --temp-dir ./tmp &
7166
# run tests
7267
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
7368
source test.env
@@ -114,14 +109,9 @@ jobs:
114109
./configure
115110
make -j${CPU_CORE_NUM} && sudo make install
116111
cd ../
117-
# 6. Install zookeeper
118-
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
119-
tar xvzf apache-zookeeper-3.6.2-bin.tar.gz
120-
# 7. Install kafka
112+
# 6. Install kafka
121113
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
122114
tar xvzf kafka_2.13-2.6.0.tgz
123-
# 8. Install kafkacat
124-
sudo apt install kafkacat
125115
- name: Config
126116
run: |
127117
cd $BUILD_SUB_DIR
@@ -133,10 +123,10 @@ jobs:
133123
- name: Test
134124
run: |
135125
cd $BUILD_SUB_DIR
136-
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
126+
export PATH=`pwd`/kafka_2.13-2.6.0/bin:$PATH
137127
# start kafka cluster
138128
rm -f test.env
139-
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
129+
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --temp-dir ./tmp &
140130
# run tests
141131
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
142132
source test.env
@@ -183,14 +173,9 @@ jobs:
183173
./configure
184174
make && sudo make install
185175
cd ../
186-
# 6. Install zookeeper
187-
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
188-
tar xvzf apache-zookeeper-3.6.2-bin.tar.gz
189-
# 7. Install kafka
176+
# 6. Install kafka
190177
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
191178
tar xvzf kafka_2.13-2.6.0.tgz
192-
# 8. Install kafkacat
193-
sudo apt install kafkacat
194179
- name: Config
195180
run: |
196181
cd $BUILD_SUB_DIR
@@ -202,10 +187,10 @@ jobs:
202187
- name: Test
203188
run: |
204189
cd $BUILD_SUB_DIR
205-
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
190+
export PATH=`pwd`/kafka_2.13-2.6.0/bin:$PATH
206191
# start kafka cluster
207192
rm -f test.env
208-
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
193+
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --temp-dir ./tmp &
209194
# run tests
210195
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
211196
source test.env
@@ -252,14 +237,9 @@ jobs:
252237
./configure
253238
make -j${CPU_CORE_NUM} && sudo make install
254239
cd ../
255-
# 6. Install zookeeper
256-
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
257-
tar xvzf apache-zookeeper-3.6.2-bin.tar.gz
258-
# 7. Install kafka
240+
# 6. Install kafka
259241
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
260242
tar xvzf kafka_2.13-2.6.0.tgz
261-
# 8. Install kafkacat
262-
sudo apt install kafkacat
263243
- name: Config
264244
run: |
265245
cd $BUILD_SUB_DIR
@@ -271,10 +251,10 @@ jobs:
271251
- name: Test
272252
run: |
273253
cd $BUILD_SUB_DIR
274-
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
254+
export PATH=`pwd`/kafka_2.13-2.6.0/bin:$PATH
275255
# start kafka cluster
276256
rm -f test.env
277-
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
257+
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --temp-dir ./tmp &
278258
# run tests
279259
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
280260
source test.env
@@ -320,14 +300,9 @@ jobs:
320300
./configure
321301
make -j${CPU_CORE_NUM} && sudo make install
322302
cd ../
323-
# 6. Install zookeeper
324-
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
325-
tar xvzf apache-zookeeper-3.6.2-bin.tar.gz
326-
# 7. Install kafka
303+
# 6. Install kafka
327304
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
328305
tar xvzf kafka_2.13-2.6.0.tgz
329-
# 8. Install kafkacat
330-
sudo apt install kafkacat
331306
- name: Config
332307
run: |
333308
cd $BUILD_SUB_DIR
@@ -339,10 +314,10 @@ jobs:
339314
- name: Test
340315
run: |
341316
cd $BUILD_SUB_DIR
342-
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
317+
export PATH=`pwd`/kafka_2.13-2.6.0/bin:$PATH
343318
# start kafka cluster
344319
rm -f test.env
345-
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
320+
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --temp-dir ./tmp &
346321
# run tests
347322
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
348323
source test.env
@@ -388,14 +363,9 @@ jobs:
388363
./configure
389364
make -j${CPU_CORE_NUM} && sudo make install
390365
cd ../
391-
# 6. Install zookeeper
392-
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
393-
tar xvzf apache-zookeeper-3.6.2-bin.tar.gz
394-
# 7. Install kafka
366+
# 6. Install kafka
395367
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
396368
tar xvzf kafka_2.13-2.6.0.tgz
397-
# 8. Install kafkacat
398-
sudo apt install kafkacat
399369
- name: Config
400370
run: |
401371
cd $BUILD_SUB_DIR
@@ -407,10 +377,10 @@ jobs:
407377
- name: Test
408378
run: |
409379
cd $BUILD_SUB_DIR
410-
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
380+
export PATH=`pwd`/kafka_2.13-2.6.0/bin:$PATH
411381
# start kafka cluster
412382
rm -f test.env
413-
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
383+
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --temp-dir ./tmp &
414384
# run tests
415385
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
416386
source test.env
@@ -457,14 +427,9 @@ jobs:
457427
./configure
458428
make -j${CPU_CORE_NUM} && sudo make install
459429
cd ../
460-
# 6. Install zookeeper
461-
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
462-
tar xvzf apache-zookeeper-3.6.2-bin.tar.gz
463-
# 7. Install kafka
430+
# 6. Install kafka
464431
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
465432
tar xvzf kafka_2.13-2.6.0.tgz
466-
# 8. Install kafkacat
467-
sudo apt install kafkacat
468433
- name: Config
469434
run: |
470435
cd $BUILD_SUB_DIR
@@ -476,10 +441,10 @@ jobs:
476441
- name: Test
477442
run: |
478443
cd $BUILD_SUB_DIR
479-
export PATH=`pwd`/kafka_2.13-2.6.0/bin:`pwd`/apache-zookeeper-3.6.2-bin/bin:$PATH
444+
export PATH=`pwd`/kafka_2.13-2.6.0/bin:$PATH
480445
# start kafka cluster
481446
rm -f test.env
482-
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --log-dir ./tmp/log --output-dir ./tmp/output &
447+
../../scripts/start-local-kafka-cluster.py --zookeeper-port 42181 --broker-ports 40091 40092 40093 --temp-dir ./tmp &
483448
# run tests
484449
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
485450
source test.env

scripts/start-local-kafka-cluster.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import json
99
import argparse
1010
import os
11+
import shutil
12+
import glob
1113
import time
1214
from multiprocessing import Process
1315
from string import Template
@@ -95,44 +97,46 @@ def GenerateBrokerConfig(brokerId, brokerPort, zookeeperPort, logDir):
9597

9698
def StartZookeeperServer(name, propFile, outDir):
9799
cmd = '{0} {1}'.format(ZOOKEEPER_SERVER_START_BIN, propFile)
98-
processPool.addProcess(cmd, name, '{0}/{1}.out'.format(outDir, name), '{0}/{1}.err'.format(outDir, name))
100+
processPool.addProcess(cmd, name, os.path.join(outDir, name+'.out'), os.path.join(outDir, name+'.err'))
99101

100102
def StartKafkaServer(name, propFile, outDir):
101103
cmd = '{0} {1}'.format(KAFKA_SERVER_START_BIN, propFile)
102-
processPool.addProcess(cmd, name, '{0}/{1}.out'.format(outDir, name), '{0}/{1}.err'.format(outDir, name))
104+
processPool.addProcess(cmd, name, os.path.join(outDir, name+'.out'), os.path.join(outDir, name+'.err'))
103105

104106
################################################################################
105107

106108
def main():
107109
parser = argparse.ArgumentParser()
108110
parser.add_argument('--zookeeper-port', help='The port for zookeeper', required=True)
109111
parser.add_argument('--broker-ports', nargs='+', help='The ports for kafka brokers', required=True)
110-
parser.add_argument('--log-dir', help='The location for kafka log files', required=True)
111-
parser.add_argument('--output-dir', help='The location for console printout logging files of zookeeper/brokers', required=True)
112+
parser.add_argument('--temp-dir', help='The location for kafka/zookeeper log files, console printout, etc', required=True)
112113
parsed = parser.parse_args()
113114

114115
zookeeperPort = parsed.zookeeper_port
115-
brokerPorts = parsed.broker_ports
116+
brokerPorts = parsed.broker_ports
116117

117-
logDir = parsed.log_dir
118-
outDir = parsed.output_dir
119-
currentDir = os.getcwd()
118+
if os.path.exists(parsed.temp_dir):
119+
shutil.rmtree(parsed.temp_dir)
120+
121+
logDir = os.path.join(parsed.temp_dir, 'log')
122+
outDir = os.path.join(parsed.temp_dir, 'out')
123+
propDir = os.path.join(parsed.temp_dir, 'properties')
120124

121125
PropFile = namedtuple('PropertiesFile', 'filename context')
126+
122127
# Generate properties files
123-
propDir = '{0}/properties'.format(currentDir)
124128
zookeeperPropFiles = []
125-
zookeeperPropFiles.append(PropFile('{0}/zookeeper.properties'.format(propDir), GenerateZookeeperConfig(zookeeperPort, '{0}/{1}'.format(logDir, 'zookeeper'))))
129+
zookeeperPropFiles.append(PropFile(os.path.join(propDir, 'zookeeper.properties'), GenerateZookeeperConfig(zookeeperPort, os.path.join(logDir, 'zookeeper'))))
126130
kafkaPropFiles = []
127131
for (i, brokerPort) in enumerate(brokerPorts):
128-
kafkaPropFiles.append(PropFile('{0}/kafka{1}.properties'.format(propDir, i), GenerateBrokerConfig(i, brokerPort, zookeeperPort, '{0}/kafka{1}'.format(logDir, i))))
132+
kafkaPropFiles.append(PropFile(os.path.join(propDir, 'kafka{0}.properties'.format(i)), GenerateBrokerConfig(i, brokerPort, zookeeperPort, os.path.join(logDir, 'kafka{0}'.format(i)))))
129133

130-
os.makedirs(propDir, exist_ok=True)
134+
os.makedirs(propDir)
131135
for propFile in (set(zookeeperPropFiles) | set(kafkaPropFiles)):
132136
with open(propFile.filename, 'w') as f:
133137
f.write(propFile.context)
134138

135-
os.makedirs(outDir, exist_ok=True)
139+
os.makedirs(outDir)
136140

137141
StartZookeeperServer('zookeeper', zookeeperPropFiles[0].filename, outDir)
138142

@@ -155,21 +159,25 @@ def main():
155159
kafkaBrokerPids.append(matched.group(1))
156160

157161
if len(kafkaBrokerPids) != len(brokerPorts):
162+
retry += 1
158163
continue
159164

160165
with open(r'test.env', 'w') as envFile:
161166
envFile.write('export KAFKA_BROKER_LIST={0}\n'.format(','.join(['127.0.0.1:{0}'.format(port) for port in brokerPorts])))
162167
envFile.write('export KAFKA_BROKER_PIDS={0}\n'.format(','.join([pid for pid in kafkaBrokerPids])))
163168
break
164169

165-
retry += 1
166-
167170
if retry < MAX_RETRY:
168171
print('Kafka cluster started with ports: {0}!'.format(brokerPorts))
169172
processPool.run()
170173
else:
171174
print('Kafka cluster failed to start with ports: {0}!'.format(brokerPorts))
172-
processPoll.terminate()
175+
processPool.terminate()
176+
for filename in glob.glob(os.path.join(outDir, '*')):
177+
with open(filename, 'r') as f:
178+
print('^^^^^^^^^^ {0} ^^^^^^^^^^'.format(os.path.basename(filename)))
179+
print(f.read())
180+
print('vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv')
173181

174182

175183
if __name__ == '__main__':

tests/integration/TestAdminClient.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ TEST(AdminClient, createListDeleteTopics)
7070
}
7171
EXPECT_TRUE(areTopicsSuccessfullyCreated);
7272

73+
KafkaTestUtility::WaitMetadataSyncUpBetweenBrokers();
74+
7375
// Delete Topics
7476
auto deleteResult = adminClient.deleteTopics(topics);
7577
std::cout << "[" << Utility::getCurrentTime() << "] " << adminClient.name() << " topics deleted, result: " << deleteResult.detail << std::endl;

tests/integration/TestKafkaConsumer.cc

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ TEST(KafkaAutoCommitConsumer, PollWithHeaders)
159159
auto value = record.lastHeaderValue("k1");
160160
// The last header value for "k1", should be "v3", instead of "v1"
161161
ASSERT_EQ(sizeof(int), value.size());
162-
EXPECT_EQ(v3, *static_cast<const int*>(value.data()));
162+
EXPECT_EQ(0, std::memcmp(&v3, value.data(), value.size()));
163163

164164
// Nothing for a nonexist key
165165
value = record.lastHeaderValue("nonexist");
@@ -185,8 +185,9 @@ TEST(KafkaAutoCommitConsumer, SeekAndPoll)
185185

186186
// The auto-commit consumer
187187
const auto props = KafkaTestUtility::GetKafkaClientCommonConfig()
188-
.put(ConsumerConfig::MAX_POLL_RECORDS, "1") // Only poll 1 message each time
189-
.put(ConsumerConfig::AUTO_OFFSET_RESET, "earliest"); // Seek to the earliest offset at the beginning
188+
.put(ConsumerConfig::SESSION_TIMEOUT_MS, "30000")
189+
.put(ConsumerConfig::MAX_POLL_RECORDS, "1") // Only poll 1 message each time
190+
.put(ConsumerConfig::AUTO_OFFSET_RESET, "earliest"); // Seek to the earliest offset at the beginning
190191

191192
KafkaAutoCommitConsumer consumer(props);
192193

@@ -345,8 +346,9 @@ TEST(KafkaManualCommitConsumer, OffsetCommitCallback)
345346

346347
// The manual-commit consumer
347348
const auto props = KafkaTestUtility::GetKafkaClientCommonConfig()
348-
.put(ConsumerConfig::AUTO_OFFSET_RESET, "earliest") // Seek to the earliest offset at the beginning
349-
.put(ConsumerConfig::MAX_POLL_RECORDS, "1"); // Only poll 1 message each time
349+
.put(ConsumerConfig::SESSION_TIMEOUT_MS, "30000")
350+
.put(ConsumerConfig::AUTO_OFFSET_RESET, "earliest") // Seek to the earliest offset at the beginning
351+
.put(ConsumerConfig::MAX_POLL_RECORDS, "1"); // Only poll 1 message each time
350352

351353
KafkaManualCommitConsumer consumer(props);
352354

@@ -378,9 +380,8 @@ TEST(KafkaManualCommitConsumer, OffsetCommitCallback)
378380
});
379381
}
380382

381-
382383
KafkaTestUtility::WaitUntil([&commitCbCount, expectedCnt = messages.size()](){ return expectedCnt == commitCbCount; },
383-
KafkaTestUtility::MAX_OFFSET_COMMIT_TIMEOUT);
384+
KafkaTestUtility::MAX_OFFSET_COMMIT_TIMEOUT);
384385

385386
EXPECT_EQ(messages.size(), commitCbCount);
386387

@@ -466,8 +467,9 @@ TEST(KafkaManualCommitConsumer, OffsetCommitCallback_ManuallyPollEvents)
466467

467468
// The manual-commit consumer
468469
const auto props = KafkaTestUtility::GetKafkaClientCommonConfig()
469-
.put(ConsumerConfig::AUTO_OFFSET_RESET, "earliest") // Seek to the earliest offset at the beginning
470-
.put(ConsumerConfig::MAX_POLL_RECORDS, "1"); // Only poll 1 message each time
470+
.put(ConsumerConfig::SESSION_TIMEOUT_MS, "30000")
471+
.put(ConsumerConfig::AUTO_OFFSET_RESET, "earliest") // Seek to the earliest offset at the beginning
472+
.put(ConsumerConfig::MAX_POLL_RECORDS, "1"); // Only poll 1 message each time
471473

472474
KafkaManualCommitConsumer consumer(props, KafkaClient::EventsPollingOption::Manual);
473475

@@ -508,6 +510,8 @@ TEST(KafkaManualCommitConsumer, OffsetCommitCallback_ManuallyPollEvents)
508510
} while (std::chrono::steady_clock::now() < end && commitCbCount != messages.size());
509511

510512
EXPECT_EQ(messages.size(), commitCbCount);
513+
514+
consumer.close();
511515
}
512516

513517
TEST(KafkaManualCommitConsumer, OffsetCommitAndPosition)
@@ -538,7 +542,9 @@ TEST(KafkaManualCommitConsumer, OffsetCommitAndPosition)
538542

539543
// Start consumer a few times, but only commit the offset for the first message each time
540544
{
541-
auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(ConsumerConfig::MAX_POLL_RECORDS, "1"); // Only poll 1 message each time
545+
auto props = KafkaTestUtility::GetKafkaClientCommonConfig()
546+
.put(ConsumerConfig::SESSION_TIMEOUT_MS, "30000")
547+
.put(ConsumerConfig::MAX_POLL_RECORDS, "1"); // Only poll 1 message each time
542548

543549
KafkaManualCommitConsumer consumer(props);
544550
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl;

tests/robustness/TestAdminClient.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ TEST(AdminClient, BrokersTimeout)
6464
{
6565
break;
6666
}
67-
67+
6868
std::this_thread::sleep_for(std::chrono::seconds(1));
6969

7070
auto listResult = adminClient.listTopics(std::chrono::seconds(1));
@@ -150,3 +150,4 @@ TEST(AdminClient, BrokersTimeout)
150150
}
151151
}
152152
}
153+

0 commit comments

Comments
 (0)