Skip to content

Commit fc21a15

Browse files
authored
KAFKA-19027 Replace ConsumerGroupCommandTestUtils#generator by ClusterTestDefaults (apache#19347)
jira: https://issues.apache.org/jira/browse/KAFKA-19027 [KAFKA-18329](https://issues.apache.org/jira/browse/KAFKA-18329) will remove old coordinator, so `ConsumerGroupCommandTestUtils#generator` creates only one config now. Hence, we should use ClusterTestDefaults to write more readable code for tests. Reviewers: Ken Huang <[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent b3bc674 commit fc21a15

File tree

6 files changed

+173
-162
lines changed

6 files changed

+173
-162
lines changed

tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,12 @@
2020
import org.apache.kafka.clients.consumer.KafkaConsumer;
2121
import org.apache.kafka.common.TopicPartition;
2222
import org.apache.kafka.common.errors.WakeupException;
23-
import org.apache.kafka.common.test.api.ClusterConfig;
2423
import org.apache.kafka.common.utils.Utils;
25-
import org.apache.kafka.server.common.Feature;
2624

2725
import java.time.Duration;
2826
import java.util.ArrayList;
2927
import java.util.Collections;
30-
import java.util.HashMap;
3128
import java.util.List;
32-
import java.util.Map;
3329
import java.util.Set;
3430
import java.util.concurrent.ExecutorService;
3531
import java.util.concurrent.Executors;
@@ -38,58 +34,15 @@
3834
import java.util.function.Consumer;
3935
import java.util.function.Supplier;
4036

41-
import static org.apache.kafka.common.test.api.Type.CO_KRAFT;
42-
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
43-
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
44-
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
45-
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
46-
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
4737

4838
/**
49-
* The old test framework {@link kafka.api.BaseConsumerTest#getTestGroupProtocolParametersAll} test for the following cases:
50-
* <ul>
51-
* <li>(KRAFT server) with (group.coordinator.new.enable=true) with (classic group protocol) = 1 case</li>
52-
* <li>(KRAFT server) with (group.coordinator.new.enable=true) with (consumer group protocol) = 1 case</li>
53-
* </ul>
54-
* <p>
55-
* The new test framework run seven cases for the following cases:
56-
* <ul>
57-
* <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=false) with (classic group protocol) = 2 cases</li>
58-
* <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true) with (classic group protocol) = 2 cases</li>
59-
* <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true) with (consumer group protocol) = 2 cases</li>
60-
* </ul>
61-
* <p>
62-
* We can reduce the number of cases as same as the old test framework by using the following methods:
63-
* <ul>
64-
* <li>(CO_KRAFT servers) with (group.coordinator.new.enable=true) with (classic / consumer group protocols) = 2 cases</li>
65-
* </ul>
66-
* <ul>
67-
* <li>(KRAFT server) with (group.coordinator.new.enable=false) with (classic group protocol) = 1 case</li>
68-
* </ul>
39+
* This class provides methods to build and manage consumer instances.
6940
*/
7041
class ConsumerGroupCommandTestUtils {
7142

7243
private ConsumerGroupCommandTestUtils() {
7344
}
7445

75-
static List<ClusterConfig> generator() {
76-
Map<String, String> serverProperties = new HashMap<>();
77-
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
78-
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
79-
serverProperties.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "1000");
80-
serverProperties.put(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "500");
81-
serverProperties.put(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "500");
82-
83-
return Collections.singletonList(ClusterConfig.defaultBuilder()
84-
.setTypes(Collections.singleton(CO_KRAFT))
85-
.setServerProperties(serverProperties)
86-
.setTags(Collections.singletonList("kraftGroupCoordinator"))
87-
.setFeatures(Utils.mkMap(
88-
Utils.mkEntry(Feature.TRANSACTION_VERSION, (short) 2),
89-
Utils.mkEntry(Feature.GROUP_VERSION, (short) 1)))
90-
.build());
91-
}
92-
9346
static <T> AutoCloseable buildConsumers(int numberOfConsumers,
9447
Set<TopicPartition> partitions,
9548
Supplier<KafkaConsumer<T, T>> consumerSupplier) {

tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import org.apache.kafka.common.protocol.Errors;
2727
import org.apache.kafka.common.serialization.StringDeserializer;
2828
import org.apache.kafka.common.test.ClusterInstance;
29-
import org.apache.kafka.common.test.api.ClusterConfig;
30-
import org.apache.kafka.common.test.api.ClusterTemplate;
29+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
30+
import org.apache.kafka.common.test.api.ClusterTest;
31+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
32+
import org.apache.kafka.common.test.api.Type;
3133
import org.apache.kafka.test.TestUtils;
3234
import org.apache.kafka.tools.ToolsTestUtils;
3335

@@ -36,7 +38,6 @@
3638
import java.util.Arrays;
3739
import java.util.HashMap;
3840
import java.util.HashSet;
39-
import java.util.List;
4041
import java.util.Map;
4142
import java.util.Objects;
4243
import java.util.Set;
@@ -56,6 +57,9 @@
5657
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
5758
import static org.apache.kafka.common.GroupState.EMPTY;
5859
import static org.apache.kafka.common.GroupState.STABLE;
60+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
61+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
62+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
5963
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
6064
import static org.junit.jupiter.api.Assertions.assertEquals;
6165
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -64,19 +68,24 @@
6468
import static org.junit.jupiter.api.Assertions.assertThrows;
6569
import static org.junit.jupiter.api.Assertions.assertTrue;
6670

67-
public class DeleteConsumerGroupsTest {
68-
69-
private static List<ClusterConfig> generator() {
70-
return ConsumerGroupCommandTestUtils.generator();
71+
@ClusterTestDefaults(
72+
types = {Type.CO_KRAFT},
73+
serverProperties = {
74+
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
75+
@ClusterConfigProperty(key = CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
76+
@ClusterConfigProperty(key = CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
7177
}
78+
)
79+
public class DeleteConsumerGroupsTest {
7280

7381
@Test
7482
public void testDeleteWithTopicOption() {
7583
String[] cgcArgs = new String[]{"--bootstrap-server", "localhost:62241", "--delete", "--group", getDummyGroupId(), "--topic"};
7684
assertThrows(OptionException.class, () -> ConsumerGroupCommandOptions.fromArgs(cgcArgs));
7785
}
7886

79-
@ClusterTemplate("generator")
87+
88+
@ClusterTest
8089
public void testDeleteCmdNonExistingGroup(ClusterInstance cluster) {
8190
String missingGroupId = getDummyGroupId();
8291
String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroupId};
@@ -87,7 +96,7 @@ public void testDeleteCmdNonExistingGroup(ClusterInstance cluster) {
8796
}
8897
}
8998

90-
@ClusterTemplate("generator")
99+
@ClusterTest
91100
public void testDeleteNonExistingGroup(ClusterInstance cluster) {
92101
String missingGroupId = getDummyGroupId();
93102
String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroupId};
@@ -101,7 +110,7 @@ public void testDeleteNonExistingGroup(ClusterInstance cluster) {
101110
}
102111
}
103112

104-
@ClusterTemplate("generator")
113+
@ClusterTest
105114
public void testDeleteNonEmptyGroup(ClusterInstance cluster) throws Exception {
106115
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
107116
String groupId = composeGroupId(groupProtocol);
@@ -134,7 +143,7 @@ public void testDeleteNonEmptyGroup(ClusterInstance cluster) throws Exception {
134143
}
135144
}
136145

137-
@ClusterTemplate("generator")
146+
@ClusterTest
138147
void testDeleteEmptyGroup(ClusterInstance cluster) throws Exception {
139148
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
140149
String groupId = composeGroupId(groupProtocol);
@@ -168,7 +177,7 @@ void testDeleteEmptyGroup(ClusterInstance cluster) throws Exception {
168177
}
169178
}
170179

171-
@ClusterTemplate("generator")
180+
@ClusterTest
172181
public void testDeleteCmdAllGroups(ClusterInstance cluster) throws Exception {
173182
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
174183
String topicName = composeTopicName(groupProtocol);
@@ -206,7 +215,7 @@ public void testDeleteCmdAllGroups(ClusterInstance cluster) throws Exception {
206215
}
207216
}
208217

209-
@ClusterTemplate("generator")
218+
@ClusterTest
210219
public void testDeleteCmdWithMixOfSuccessAndError(ClusterInstance cluster) throws Exception {
211220
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
212221
String groupId = composeGroupId(groupProtocol);
@@ -239,7 +248,7 @@ public void testDeleteCmdWithMixOfSuccessAndError(ClusterInstance cluster) throw
239248
}
240249
}
241250

242-
@ClusterTemplate("generator")
251+
@ClusterTest
243252
public void testDeleteWithMixOfSuccessAndError(ClusterInstance cluster) throws Exception {
244253
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
245254
String groupId = composeGroupId(groupProtocol);

tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,40 @@
3232
import org.apache.kafka.common.protocol.Errors;
3333
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
3434
import org.apache.kafka.common.test.ClusterInstance;
35-
import org.apache.kafka.common.test.api.ClusterConfig;
36-
import org.apache.kafka.common.test.api.ClusterTemplate;
35+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
36+
import org.apache.kafka.common.test.api.ClusterTest;
37+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
38+
import org.apache.kafka.common.test.api.Type;
3739
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
3840

3941
import org.junit.jupiter.api.Assertions;
4042

4143
import java.time.Duration;
4244
import java.util.Collections;
4345
import java.util.HashMap;
44-
import java.util.List;
4546
import java.util.Map;
4647
import java.util.Map.Entry;
4748

49+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
50+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
51+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
52+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
53+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
4854
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
4955
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
5056
import static org.junit.jupiter.api.Assertions.assertEquals;
5157
import static org.junit.jupiter.api.Assertions.assertNull;
5258

59+
@ClusterTestDefaults(
60+
types = {Type.CO_KRAFT},
61+
serverProperties = {
62+
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
63+
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
64+
@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
65+
@ClusterConfigProperty(key = CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
66+
@ClusterConfigProperty(key = CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
67+
}
68+
)
5369
public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
5470
public static final String TOPIC_PREFIX = "foo.";
5571
public static final String GROUP_PREFIX = "test.group.";
@@ -59,11 +75,7 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
5975
this.clusterInstance = clusterInstance;
6076
}
6177

62-
private static List<ClusterConfig> generator() {
63-
return ConsumerGroupCommandTestUtils.generator();
64-
}
65-
66-
@ClusterTemplate("generator")
78+
@ClusterTest
6779
public void testDeleteOffsetsNonExistingGroup() {
6880
String group = "missing.group";
6981
String topic = "foo:1";
@@ -73,7 +85,7 @@ public void testDeleteOffsetsNonExistingGroup() {
7385
}
7486
}
7587

76-
@ClusterTemplate("generator")
88+
@ClusterTest
7789
public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
7890
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
7991
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -85,7 +97,7 @@ public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
8597
}
8698
}
8799

88-
@ClusterTemplate("generator")
100+
@ClusterTest
89101
public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
90102
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
91103
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -97,7 +109,7 @@ public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
97109
}
98110
}
99111

100-
@ClusterTemplate("generator")
112+
@ClusterTest
101113
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
102114
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
103115
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -107,7 +119,7 @@ public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
107119
}
108120
}
109121

110-
@ClusterTemplate("generator")
122+
@ClusterTest
111123
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
112124
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
113125
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -117,7 +129,7 @@ public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
117129
}
118130
}
119131

120-
@ClusterTemplate("generator")
132+
@ClusterTest
121133
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
122134
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
123135
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -129,7 +141,7 @@ public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
129141
}
130142
}
131143

132-
@ClusterTemplate("generator")
144+
@ClusterTest
133145
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
134146
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
135147
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -141,7 +153,7 @@ public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
141153
}
142154
}
143155

144-
@ClusterTemplate("generator")
156+
@ClusterTest
145157
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
146158
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
147159
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -151,7 +163,7 @@ public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
151163
}
152164
}
153165

154-
@ClusterTemplate("generator")
166+
@ClusterTest
155167
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly() {
156168
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
157169
String topic = TOPIC_PREFIX + groupProtocol.name();

0 commit comments

Comments
 (0)