Skip to content

Commit 6ab911c

Browse files
authored
Merge pull request #3 from leancloud/feat/integration-test
integration test
2 parents bc24a2e + 1e84ca7 commit 6ab911c

File tree

10 files changed

+448
-183
lines changed

10 files changed

+448
-183
lines changed

src/test/java/cn/leancloud/kafka/consumer/integration/Bootstrap.java

Lines changed: 249 additions & 49 deletions
Large diffs are not rendered by default.

src/test/java/cn/leancloud/kafka/consumer/integration/ConsumerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import cn.leancloud.kafka.consumer.LcKafkaConsumer;
44

55
public interface ConsumerFactory {
6-
String getName();
6+
String type();
7+
78
LcKafkaConsumer<Integer, String> buildConsumer(String consumerName, TestStatistics statistics);
89
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package cn.leancloud.kafka.consumer.integration;
2+
3+
public interface IntegrationTest {
4+
String name();
5+
6+
void runTest(TestContext context, TestStatistics statistics) throws Exception;
7+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package cn.leancloud.kafka.consumer.integration;
2+
3+
import cn.leancloud.kafka.consumer.LcKafkaConsumer;
4+
5+
import java.util.ArrayList;
6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.ThreadLocalRandom;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
12+
import static java.util.concurrent.TimeUnit.SECONDS;
13+
import static org.awaitility.Awaitility.await;
14+
15+
class JoinLeaveGroupTest implements IntegrationTest{
16+
@Override
17+
public String name() {
18+
return "Join-Leave-Group";
19+
}
20+
21+
@Override
22+
public void runTest(TestContext cxt, TestStatistics statistics) throws Exception {
23+
final AtomicBoolean stopProducer = new AtomicBoolean();
24+
final CompletableFuture<Integer> producerSentCountFuture = cxt.producer().startNonStopTest(cxt.topic(), stopProducer);
25+
final LcKafkaConsumer<Integer, String> lingerConsumer = cxt.factory().buildConsumer("linger-consumer", statistics);
26+
lingerConsumer.subscribe(Collections.singletonList(cxt.topic()));
27+
28+
final List<LcKafkaConsumer<Integer, String>> postJoinConsumers = new ArrayList<>();
29+
for (int i = 0; i < 5; ++i) {
30+
final LcKafkaConsumer<Integer, String> consumer = cxt.factory().buildConsumer("consumer-" + i, statistics);
31+
consumer.subscribe(Collections.singletonList(cxt.topic()));
32+
postJoinConsumers.add(consumer);
33+
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
34+
}
35+
36+
for (LcKafkaConsumer<Integer, String> consumer : postJoinConsumers) {
37+
consumer.close();
38+
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
39+
}
40+
41+
stopProducer.set(true);
42+
int totalSent = producerSentCountFuture.get();
43+
44+
try {
45+
statistics.recordTotalSent(totalSent);
46+
await().atMost(10, SECONDS)
47+
.pollInterval(1, SECONDS)
48+
.until(() -> statistics.getReceiveRecordsCount() == totalSent);
49+
} finally {
50+
lingerConsumer.close();
51+
}
52+
}
53+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package cn.leancloud.kafka.consumer.integration;
2+
3+
import cn.leancloud.kafka.consumer.LcKafkaConsumer;
4+
5+
import java.time.Duration;
6+
import java.util.Collections;
7+
8+
import static java.util.concurrent.TimeUnit.SECONDS;
9+
import static org.awaitility.Awaitility.await;
10+
11+
class SingleConsumerTest implements IntegrationTest{
12+
@Override
13+
public String name() {
14+
return "Single-Consumer-Test";
15+
}
16+
17+
@Override
18+
public void runTest(TestContext cxt, TestStatistics statistics) throws Exception {
19+
final LcKafkaConsumer<Integer, String> consumer = cxt.factory().buildConsumer(name(), statistics);
20+
consumer.subscribe(Collections.singletonList(cxt.topic()));
21+
final int totalSent = cxt.producer().startFixedDurationTest(cxt.topic(), Duration.ofSeconds(10)).get();
22+
23+
try {
24+
statistics.recordTotalSent(totalSent);
25+
await().atMost(10, SECONDS)
26+
.pollInterval(1, SECONDS)
27+
.until(() -> statistics.getReceiveRecordsCount() == totalSent);
28+
} finally {
29+
consumer.close();
30+
}
31+
}
32+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package cn.leancloud.kafka.consumer.integration;
2+
3+
public class TestContext {
4+
private final TestingProducer producer;
5+
private final ConsumerFactory factory;
6+
private final String topic;
7+
8+
TestContext(String topic, TestingProducer producer, ConsumerFactory factory) {
9+
this.topic = topic;
10+
this.producer = producer;
11+
this.factory = factory;
12+
}
13+
14+
public ConsumerFactory factory() {
15+
return factory;
16+
}
17+
18+
public TestingProducer producer() {
19+
return producer;
20+
}
21+
22+
public String topic() {
23+
return topic;
24+
}
25+
26+
public TestStatistics newStatistics() {
27+
return new TestStatistics();
28+
}
29+
}
Lines changed: 21 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,131 +1,50 @@
11
package cn.leancloud.kafka.consumer.integration;
22

3-
import cn.leancloud.kafka.consumer.LcKafkaConsumer;
43
import org.slf4j.Logger;
54
import org.slf4j.LoggerFactory;
65

76
import java.io.Closeable;
87
import java.io.IOException;
98
import java.time.Duration;
10-
import java.util.ArrayList;
11-
import java.util.Collections;
12-
import java.util.List;
13-
import java.util.concurrent.CompletableFuture;
14-
import java.util.concurrent.ThreadLocalRandom;
15-
import java.util.concurrent.atomic.AtomicBoolean;
16-
17-
import static java.util.concurrent.TimeUnit.SECONDS;
18-
import static org.awaitility.Awaitility.await;
199

2010
public class TestRunner implements Closeable {
2111
private static final Logger logger = LoggerFactory.getLogger(TestRunner.class);
2212

2313
private final TestingProducer producer;
2414
private final ConsumerFactory factory;
25-
private final String topic;
26-
private final TestStatistics statistics;
15+
private final TestContext context;
2716

28-
TestRunner(ConsumerFactory factory) {
29-
this.topic = "Testing";
17+
TestRunner(String topic, ConsumerFactory factory) {
3018
this.factory = factory;
3119
this.producer = new TestingProducer(Duration.ofMillis(100), 2);
32-
this.statistics = new TestStatistics();
33-
}
34-
35-
void startTest() throws Exception {
36-
runSingleConsumerTest();
37-
38-
runTwoConsumersInSameGroupTest();
39-
40-
runJoinLeaveGroupTest();
20+
this.context = new TestContext(topic, producer, factory);
4121
}
4222

43-
@Override
44-
public void close() throws IOException {
45-
producer.close();
46-
}
47-
48-
private void runSingleConsumerTest() throws Exception {
49-
logger.info("Run single consumer test.");
50-
51-
final LcKafkaConsumer<Integer, String> client = factory.buildConsumer("cn/leancloud/kafka/consumer", statistics);
52-
client.subscribe(Collections.singletonList(topic));
53-
final int totalSent = producer.startFixedDurationTest(topic, Duration.ofSeconds(10)).get();
23+
void runTest(IntegrationTest test) throws Exception {
24+
logger.info("\n\n\n------------------------------------ Start {} with consumer: {} ------------------------------------\n",
25+
test.name(), factory.type());
5426

27+
final TestStatistics statistics = context.newStatistics();
5528
try {
56-
waitTestDone(totalSent);
57-
} finally {
58-
client.close();
59-
}
60-
61-
logger.info("Single consumer test finished. \n");
62-
}
63-
64-
private void runTwoConsumersInSameGroupTest() throws Exception {
65-
logger.info("Run two consumers in same group test.");
66-
67-
final LcKafkaConsumer<Integer, String> client = factory.buildConsumer("consumer-1", statistics);
68-
final LcKafkaConsumer<Integer, String> client2 = factory.buildConsumer("consumer-2", statistics);
69-
client.subscribe(Collections.singletonList(topic));
70-
client2.subscribe(Collections.singletonList(topic));
71-
final int totalSent = producer.startFixedDurationTest(topic, Duration.ofSeconds(10)).get();
72-
73-
try {
74-
waitTestDone(totalSent);
75-
} finally {
76-
client.close();
77-
client2.close();
78-
}
29+
test.runTest(context, statistics);
7930

80-
logger.info("Two consumers in same group test finished. \n");
81-
}
82-
83-
private void runJoinLeaveGroupTest() throws Exception {
84-
logger.info("Run join leave group test.");
85-
86-
final AtomicBoolean stopProducer = new AtomicBoolean();
87-
final CompletableFuture<Integer> producerSentCountFuture = producer.startNonStopTest(topic, stopProducer);
88-
final LcKafkaConsumer<Integer, String> lingerConsumer = factory.buildConsumer("linger-consumer", statistics);
89-
lingerConsumer.subscribe(Collections.singletonList(topic));
90-
91-
List<LcKafkaConsumer<Integer, String>> postJoinConsumers = new ArrayList<>();
92-
for (int i = 0; i < 5; ++i) {
93-
final LcKafkaConsumer<Integer, String> consumer = factory.buildConsumer("consumer-" + i, statistics);
94-
consumer.subscribe(Collections.singletonList(topic));
95-
postJoinConsumers.add(consumer);
96-
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
97-
}
98-
99-
for (LcKafkaConsumer<Integer, String> consumer : postJoinConsumers) {
100-
consumer.close();
101-
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
102-
}
103-
104-
stopProducer.set(true);
105-
int totalSent = producerSentCountFuture.get();
106-
107-
try {
108-
waitTestDone(totalSent);
31+
logger.info("{} test finished. sent: {}, received: {}, duplicates: {}",
32+
test.name(), statistics.getTotalSentCount(),
33+
statistics.getReceiveRecordsCount(), statistics.getDuplicateRecordsCount());
34+
} catch (Exception ex) {
35+
logger.error("{} test got unexpected exception. sent: {}, received: {}, duplicates: {}", test.name(),
36+
statistics.getTotalSentCount(), statistics.getReceiveRecordsCount(), statistics.getDuplicateRecordsCount(), ex);
37+
throw ex;
10938
} finally {
110-
lingerConsumer.close();
39+
statistics.clear();
11140
}
11241

113-
logger.info("Join leave group test finished. \n");
42+
logger.info("------------------------------------ {} with consumer: {} finished ------------------------------------\n",
43+
test.name(), factory.type());
11444
}
11545

116-
private void waitTestDone(int totalSent) {
117-
try {
118-
await().atMost(10, SECONDS)
119-
.pollInterval(1, SECONDS)
120-
.until(() -> statistics.getReceiveRecordsCount() >= totalSent);
121-
122-
logger.info("Integration test finished. sent: {}, received: {}, duplicates: {}",
123-
totalSent, statistics.getReceiveRecordsCount(), statistics.getDuplicateRecordsCount());
124-
} catch (Exception ex) {
125-
logger.error("Integration test got unexpected exception. sent: {}, received: {}, duplicates: {}",
126-
totalSent, statistics.getReceiveRecordsCount(), statistics.getDuplicateRecordsCount(), ex);
127-
} finally {
128-
statistics.clear();
129-
}
46+
@Override
47+
public void close() throws IOException {
48+
producer.close();
13049
}
13150
}

src/test/java/cn/leancloud/kafka/consumer/integration/TestStatistics.java

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,19 @@
22

33
import org.apache.kafka.clients.consumer.ConsumerRecord;
44

5-
import java.util.Objects;
65
import java.util.Set;
76
import java.util.concurrent.ConcurrentHashMap;
87
import java.util.concurrent.atomic.AtomicInteger;
98

10-
public final class TestStatistics {
9+
final class TestStatistics {
1110
private final Set<String> receivedRecords;
1211
private final AtomicInteger duplicateRecordsCounter;
12+
private final AtomicInteger totalSentCounter;
1313

1414
TestStatistics() {
1515
this.receivedRecords = ConcurrentHashMap.newKeySet();
1616
this.duplicateRecordsCounter = new AtomicInteger();
17+
this.totalSentCounter = new AtomicInteger();
1718
}
1819

1920
boolean recordReceivedRecord(ConsumerRecord<Integer, String> record) {
@@ -24,6 +25,10 @@ boolean recordReceivedRecord(ConsumerRecord<Integer, String> record) {
2425
return true;
2526
}
2627

28+
int recordTotalSent(int sent) {
29+
return totalSentCounter.addAndGet(sent);
30+
}
31+
2732
int getReceiveRecordsCount() {
2833
return receivedRecords.size();
2934
}
@@ -32,30 +37,13 @@ int getDuplicateRecordsCount() {
3237
return duplicateRecordsCounter.get();
3338
}
3439

40+
int getTotalSentCount() {
41+
return totalSentCounter.get();
42+
}
43+
3544
void clear() {
3645
receivedRecords.clear();
3746
duplicateRecordsCounter.set(0);
38-
}
39-
40-
@Override
41-
public boolean equals(Object o) {
42-
if (this == o) return true;
43-
if (o == null || getClass() != o.getClass()) return false;
44-
final TestStatistics that = (TestStatistics) o;
45-
return Objects.equals(receivedRecords, that.receivedRecords) &&
46-
Objects.equals(duplicateRecordsCounter, that.duplicateRecordsCounter);
47-
}
48-
49-
@Override
50-
public int hashCode() {
51-
return Objects.hash(receivedRecords, duplicateRecordsCounter);
52-
}
53-
54-
@Override
55-
public String toString() {
56-
return "TestStatistics{" +
57-
"receivedRecords=" + receivedRecords +
58-
", duplicateRecordsCounter=" + duplicateRecordsCounter +
59-
'}';
47+
totalSentCounter.set(0);
6048
}
6149
}

0 commit comments

Comments
 (0)