Skip to content

Commit d6b9c73

Browse files
committed
(fix) close producer
1 parent b00ff54 commit d6b9c73

File tree

3 files changed

+10
-17
lines changed

3 files changed

+10
-17
lines changed

src/test/java/cn/leancloud/kafka/consumer/integration/Bootstrap.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,14 @@ public static void main(String[] args) throws Exception {
4848
final List<TestRunner> runners = prepareRunners();
4949

5050
try {
51-
for (TestRunner runner : runners) {
52-
runner.runTest(new SingleConsumerTest());
53-
runner.runTest(new TwoConsumerTest());
54-
runner.runTest(new JoinLeaveGroupTest());
51+
for (int i = 0; i < runners.size(); i++) {
52+
try (TestRunner runner = runners.get(i)){
53+
runner.runTest(new SingleConsumerTest());
54+
runner.runTest(new TwoConsumerTest());
55+
runner.runTest(new JoinLeaveGroupTest());
56+
}
5557
}
5658
} finally {
57-
shutdown(runners);
5859
workerPool.shutdown();
5960
}
6061
}
@@ -108,12 +109,6 @@ private static List<TestRunner> prepareRunners() {
108109
return runners;
109110
}
110111

111-
private static void shutdown(List<TestRunner> runners) throws Exception {
112-
for (TestRunner runner : runners) {
113-
runner.close();
114-
}
115-
}
116-
117112
private static class AutoCommitWithWorkerPoolConsumerFactory implements ConsumerFactory {
118113
@Override
119114
public String type() {

src/test/java/cn/leancloud/kafka/consumer/integration/TestContext.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,4 @@ public TestingProducer producer() {
2222
public String topic() {
2323
return topic;
2424
}
25-
26-
public TestStatistics newStatistics() {
27-
return new TestStatistics();
28-
}
2925
}

src/test/java/cn/leancloud/kafka/consumer/integration/TestRunner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,24 @@
99

1010
public class TestRunner implements Closeable {
1111
private static final Logger logger = LoggerFactory.getLogger(TestRunner.class);
12+
private static final Duration defaultProducerSendMsgInterval = Duration.ofMillis(100);
13+
private static final int defaultConcurrentProducerCount = 2;
1214

1315
private final TestingProducer producer;
1416
private final ConsumerFactory factory;
1517
private final TestContext context;
1618

1719
TestRunner(String topic, ConsumerFactory factory) {
1820
this.factory = factory;
19-
this.producer = new TestingProducer(Duration.ofMillis(100), 2);
21+
this.producer = new TestingProducer(defaultProducerSendMsgInterval, defaultConcurrentProducerCount);
2022
this.context = new TestContext(topic, producer, factory);
2123
}
2224

2325
void runTest(IntegrationTest test) throws Exception {
2426
logger.info("\n\n\n------------------------------------ Start {} with consumer: {} ------------------------------------\n",
2527
test.name(), factory.type());
2628

27-
final TestStatistics statistics = context.newStatistics();
29+
final TestStatistics statistics = new TestStatistics();
2830
try {
2931
test.runTest(context, statistics);
3032

0 commit comments

Comments
 (0)