Skip to content

Commit adc9463

Browse files
garyrussellartembilan
authored andcommitted
Embedded broker support different partition counts
Add `addTopics(NewTopic... topics)` to the embedded broker to enable creation of topics with different partition counts. * Polishing according PR comments
1 parent 2502a8f commit adc9463

File tree

6 files changed

+132
-51
lines changed

6 files changed

+132
-51
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Properties;
3131
import java.util.Set;
32+
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.stream.Collectors;
3435

@@ -87,6 +88,8 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
8788

8889
public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";
8990

91+
private static final int DEFAULT_ADMIN_TIMEOUT = 30;
92+
9093
private final int count;
9194

9295
private final boolean controlledShutdown;
@@ -107,6 +110,8 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
107110

108111
private int[] kafkaPorts;
109112

113+
private int adminTimeout = DEFAULT_ADMIN_TIMEOUT;
114+
110115
public EmbeddedKafkaBroker(int count) {
111116
this(count, false);
112117
}
@@ -177,6 +182,16 @@ public EmbeddedKafkaBroker kafkaPorts(int... kafkaPorts) {
177182
return this;
178183
}
179184

185+
/**
186+
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
187+
* Default 30 seconds.
188+
* @param adminTimeout the timeout.
189+
* @since 2.2
190+
*/
191+
public void setAdminTimeout(int adminTimeout) {
192+
this.adminTimeout = adminTimeout;
193+
}
194+
180195
@Override
181196
public void afterPropertiesSet() {
182197
this.zookeeper = new EmbeddedZookeeper();
@@ -218,33 +233,57 @@ private Properties createBrokerProperties(int i) {
218233
}
219234

220235
/**
221-
* Create topics in the existing broker(s) using the configured number of partitions.
236+
* Add topics to the existing broker(s) using the configured number of partitions.
237+
* The broker(s) must be running.
222238
* @param topics the topics.
223239
*/
224-
private void createKafkaTopics(Set<String> topics) {
225-
doWithAdmin(admin -> {
226-
List<NewTopic> newTopics = topics.stream()
227-
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
228-
.collect(Collectors.toList());
229-
CreateTopicsResult createTopics = admin.createTopics(newTopics);
230-
try {
231-
createTopics.all().get();
232-
}
233-
catch (Exception e) {
234-
throw new KafkaException(e);
235-
}
236-
});
240+
public void addTopics(String... topics) {
241+
Assert.notNull(this.zookeeper, "Broker must be started before this method can be called");
242+
HashSet<String> set = new HashSet<>(Arrays.asList(topics));
243+
createKafkaTopics(set);
244+
this.topics.addAll(set);
237245
}
238246

247+
/**
248+
* Add topics to the existing broker(s).
249+
* The broker(s) must be running.
250+
* @param topics the topics.
251+
* @since 2.2
252+
*/
253+
public void addTopics(NewTopic... topics) {
254+
Assert.notNull(this.zookeeper, "Broker must be started before this method can be called");
255+
for (NewTopic topic : topics) {
256+
Assert.isTrue(this.topics.add(topic.name()), () -> "topic already exists: " + topic);
257+
Assert.isTrue(topic.replicationFactor() <= this.count
258+
&& (topic.replicasAssignments() == null
259+
|| topic.replicasAssignments().size() <= this.count),
260+
() -> "Embedded kafka does not support the requested replication factor: " + topic);
261+
}
262+
263+
doWithAdmin(admin -> createTopics(admin, Arrays.asList(topics)));
264+
}
239265

240266
/**
241-
* Add topics to the existing broker(s) using the configured number of partitions.
267+
* Create topics in the existing broker(s) using the configured number of partitions.
242268
* @param topics the topics.
243269
*/
244-
public void addTopics(String... topics) {
245-
HashSet<String> set = new HashSet<>(Arrays.asList(topics));
246-
createKafkaTopics(set);
247-
this.topics.addAll(set);
270+
private void createKafkaTopics(Set<String> topics) {
271+
doWithAdmin(admin -> {
272+
createTopics(admin,
273+
topics.stream()
274+
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
275+
.collect(Collectors.toList()));
276+
});
277+
}
278+
279+
private void createTopics(AdminClient admin, List<NewTopic> newTopics) {
280+
CreateTopicsResult createTopics = admin.createTopics(newTopics);
281+
try {
282+
createTopics.all().get(this.adminTimeout, TimeUnit.SECONDS);
283+
}
284+
catch (Exception e) {
285+
throw new KafkaException(e);
286+
}
248287
}
249288

250289
/**
@@ -254,9 +293,16 @@ public void addTopics(String... topics) {
254293
public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
255294
Map<String, Object> adminConfigs = new HashMap<>();
256295
adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString());
257-
try (AdminClient admin = AdminClient.create(adminConfigs)) {
296+
AdminClient admin = null;
297+
try {
298+
admin = AdminClient.create(adminConfigs);
258299
callback.accept(admin);
259300
}
301+
finally {
302+
if (admin != null) {
303+
admin.close(this.adminTimeout, TimeUnit.SECONDS);
304+
}
305+
}
260306
}
261307

262308
@Override

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
* @author Artem Bilan
5454
* @author Elliot Metsger
5555
* @author Zach Olauson
56+
* @author Gary Russell
5657
*
5758
* @since 1.3
5859
*
@@ -87,8 +88,11 @@
8788
int partitions() default 2;
8889

8990
/**
90-
* Topics that should be created
91-
* Topics may contain property placeholders, e.g. {@code topics = "${kafka.topic.one:topicOne}"}
91+
* Topics that should be created Topics may contain property placeholders, e.g.
92+
* {@code topics = "${kafka.topic.one:topicOne}"} The topics will be created with
93+
* {@link #partitions()} partitions; to provision other topics with other partition
94+
* counts call the {@code addTopics(NewTopic... topics)} method on the autowired
95+
* broker.
9296
* @return the topics to create
9397
*/
9498
String[] topics() default { };

spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,6 @@ public void afterPropertiesSet() throws Exception {
131131
before();
132132
}
133133

134-
/**
135-
* Add topics to the existing broker(s) using the configured number of partitions.
136-
* @param topics the topics.
137-
* @since 2.1.6
138-
*/
139-
public void addTopics(String... topics) {
140-
getEmbeddedKafka().addTopics(topics);
141-
}
142-
143134
/**
144135
* Create an {@link AdminClient}; invoke the callback and reliably close the
145136
* admin.

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* @author Gary Russell
3434
* @author Vladimir Tsanev
3535
*/
36-
public interface MessageListenerContainer extends SmartLifecycle {
36+
public interface MessageListenerContainer extends SmartLifecycle {
3737

3838
/**
3939
* Setup the message listener to use. Throws an {@link IllegalArgumentException}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.annotation;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2021
import static org.mockito.ArgumentMatchers.anyMap;
2122
import static org.mockito.ArgumentMatchers.anyString;
2223
import static org.mockito.BDDMockito.willAnswer;
@@ -40,6 +41,7 @@
4041
import javax.validation.ValidationException;
4142
import javax.validation.constraints.Max;
4243

44+
import org.apache.kafka.clients.admin.NewTopic;
4345
import org.apache.kafka.clients.consumer.Consumer;
4446
import org.apache.kafka.clients.consumer.ConsumerConfig;
4547
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -686,6 +688,18 @@ public void testAddingTopics() {
686688
int count = embeddedKafka.getTopics().size();
687689
embeddedKafka.addTopics("testAddingTopics");
688690
assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 1);
691+
embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1));
692+
assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 2);
693+
assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1)))
694+
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("exists");
695+
assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions2", 10, (short) 2)))
696+
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("replication");
697+
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
698+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying");
699+
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
700+
Consumer<Integer, String> consumer = cf.createConsumer();
701+
assertThat(consumer.partitionsFor("morePartitions")).hasSize(10);
702+
consumer.close();
689703
}
690704

691705
@Test

src/reference/asciidoc/testing.adoc

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -103,41 +103,66 @@ Convenient constants `EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS` and `Em
103103
With the `EmbeddedKafkaBroker.brokerProperties(Map<String, String>)` you can provide additional properties for the Kafka server(s).
104104
See https://kafka.apache.org/documentation/#brokerconfigs[Kafka Config] for more information about possible broker properties.
105105

106+
==== Configuring Topics
107+
108+
====
109+
[source, java]
110+
----
111+
public class MyTests {
112+
113+
@ClassRule
114+
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "foo", "bar");
115+
116+
@Test
117+
public void test() {
118+
embeddedKafkaRule.getEmbeddedKafka()
119+
.addTopics(new NewTopic("baz", 10, (short) 1), new NewTopic("qux", 15, (short) 1));
120+
...
121+
}
122+
123+
}
124+
----
125+
====
126+
127+
The above configuration will create topics `foo` and `bar` with 5 partitions, `baz` with 10 and `qux` with 15.
106128

107129
==== Using the Same Broker(s) for Multiple Test Classes
108130

109131
There is no built-in support for this, but it can be achieved with something similar to the following:
110132

133+
====
111134
[source, java]
112135
----
113136
public final class EmbeddedKafkaHolder {
114137
115-
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false);
138+
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false);
116139
117-
private static boolean started;
140+
private static boolean started;
118141
119-
public static EmbeddedKafkaRule getEmbeddedKafka() {
120-
if (!started) {
121-
try {
122-
embeddedKafka.before();
123-
}
124-
catch (Exception e) {
125-
throw new KafkaException(e);
126-
}
127-
started = true;
128-
}
129-
return embeddedKafka;
130-
}
142+
public static EmbeddedKafkaRule getEmbeddedKafka() {
143+
if (!started) {
144+
try {
145+
embeddedKafka.before();
146+
}
147+
catch (Exception e) {
148+
throw new KafkaException(e);
149+
}
150+
started = true;
151+
}
152+
return embeddedKafka;
153+
}
131154
132-
private EmbeddedKafkaHolder() {
133-
super();
134-
}
155+
private EmbeddedKafkaHolder() {
156+
super();
157+
}
135158
136159
}
137160
----
161+
====
138162

139163
And then, in each test class:
140164

165+
====
141166
[source, java]
142167
----
143168
static {
@@ -146,6 +171,7 @@ static {
146171
147172
private static EmbeddedKafkaRule embeddedKafka = EmbeddedKafkaHolder.getEmbeddedKafka();
148173
----
174+
====
149175

150176
IMPORTANT: This example provides no mechanism for shutting down the broker(s) when all tests are complete.
151177
This could be a problem if, say, you run your tests in a Gradle daemon.
@@ -333,8 +359,8 @@ public class KafkaTemplateTests {
333359
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
334360
container.setupMessageListener(new MessageListener<Integer, String>() {
335361
336-
@Override
337-
public void onMessage(ConsumerRecord<Integer, String> record) {
362+
@Override
363+
public void onMessage(ConsumerRecord<Integer, String> record) {
338364
System.out.println(record);
339365
records.add(record);
340366
}

0 commit comments

Comments
 (0)