Skip to content

Commit e653cc4

Browse files
garyrussellartembilan
authored andcommitted
GH-1538: Embedded broker get results for addTopics
Resolves #1538 **cherry-pick to 2.5.x** # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/listener/ABSwitchClusterTests.java Add docs.
1 parent fa7917d commit e653cc4

File tree

3 files changed

+109
-9
lines changed

3 files changed

+109
-9
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.net.InetSocketAddress;
2323
import java.nio.file.Files;
2424
import java.time.Duration;
25+
import java.util.AbstractMap.SimpleEntry;
2526
import java.util.ArrayList;
2627
import java.util.Arrays;
2728
import java.util.Collection;
@@ -33,8 +34,11 @@
3334
import java.util.Properties;
3435
import java.util.Set;
3536
import java.util.UUID;
37+
import java.util.concurrent.ExecutionException;
3638
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.TimeoutException;
3740
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.function.Function;
3842
import java.util.stream.Collectors;
3943

4044
import org.apache.commons.logging.LogFactory;
@@ -417,22 +421,100 @@ private void createTopics(AdminClient admin, List<NewTopic> newTopics) {
417421
}
418422
}
419423

424+
/**
425+
* Add topics to the existing broker(s) using the configured number of partitions.
426+
* The broker(s) must be running.
427+
* @param topicsToAdd the topics.
428+
* @return the results; null values indicate success.
429+
* @since 2.5.4
430+
*/
431+
public Map<String, Exception> addTopicsWithResults(String... topicsToAdd) {
432+
Assert.notNull(this.zookeeper, "Broker must be started before this method can be called");
433+
HashSet<String> set = new HashSet<>(Arrays.asList(topicsToAdd));
434+
this.topics.addAll(set);
435+
return createKafkaTopicsWithResults(set);
436+
}
437+
438+
/**
439+
* Add topics to the existing broker(s) and returning a map of results.
440+
* The broker(s) must be running.
441+
* @param topicsToAdd the topics.
442+
* @return the results; null values indicate success.
443+
* @since 2.5.4
444+
*/
445+
public Map<String, Exception> addTopicsWithResults(NewTopic... topicsToAdd) {
446+
Assert.notNull(this.zookeeper, "Broker must be started before this method can be called");
447+
for (NewTopic topic : topicsToAdd) {
448+
Assert.isTrue(this.topics.add(topic.name()), () -> "topic already exists: " + topic);
449+
Assert.isTrue(topic.replicationFactor() <= this.count
450+
&& (topic.replicasAssignments() == null
451+
|| topic.replicasAssignments().size() <= this.count),
452+
() -> "Embedded kafka does not support the requested replication factor: " + topic);
453+
}
454+
455+
return doWithAdminFunction(admin -> createTopicsWithResults(admin, Arrays.asList(topicsToAdd)));
456+
}
457+
458+
/**
459+
* Create topics in the existing broker(s) using the configured number of partitions
460+
* and returning a map of results.
461+
* @param topicsToCreate the topics.
462+
* @return the results; null values indicate success.
463+
* @since 2.5.4
464+
*/
465+
private Map<String, Exception> createKafkaTopicsWithResults(Set<String> topicsToCreate) {
466+
return doWithAdminFunction(admin -> {
467+
return createTopicsWithResults(admin,
468+
topicsToCreate.stream()
469+
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
470+
.collect(Collectors.toList()));
471+
});
472+
}
473+
474+
private Map<String, Exception> createTopicsWithResults(AdminClient admin, List<NewTopic> newTopics) {
475+
CreateTopicsResult createTopics = admin.createTopics(newTopics);
476+
Map<String, Exception> results = new HashMap<>();
477+
createTopics.values()
478+
.entrySet()
479+
.stream()
480+
.map(entry -> {
481+
Exception result;
482+
try {
483+
entry.getValue().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS);
484+
result = null;
485+
}
486+
catch (InterruptedException | ExecutionException | TimeoutException e) {
487+
result = e;
488+
}
489+
return new SimpleEntry<>(entry.getKey(), result);
490+
})
491+
.forEach(entry -> results.put(entry.getKey(), entry.getValue()));
492+
return results;
493+
}
494+
420495
/**
421496
* Create an {@link AdminClient}; invoke the callback and reliably close the admin.
422497
* @param callback the callback.
423498
*/
424499
public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
425500
Map<String, Object> adminConfigs = new HashMap<>();
426501
adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString());
427-
AdminClient admin = null;
428-
try {
429-
admin = AdminClient.create(adminConfigs);
502+
try (AdminClient admin = AdminClient.create(adminConfigs)) {
430503
callback.accept(admin);
431504
}
432-
finally {
433-
if (admin != null) {
434-
admin.close(this.adminTimeout);
435-
}
505+
}
506+
507+
/**
508+
* Create an {@link AdminClient}; invoke the callback and reliably close the admin.
509+
* @param callback the callback.
510+
* @return a map of results.
511+
* @since 2.5.4
512+
*/
513+
public <T> T doWithAdminFunction(Function<AdminClient, T> callback) {
514+
Map<String, Object> adminConfigs = new HashMap<>();
515+
adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString());
516+
try (AdminClient admin = AdminClient.create(adminConfigs)) {
517+
return callback.apply(admin);
436518
}
437519
}
438520

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Set;
4040
import java.util.concurrent.ConcurrentHashMap;
4141
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.ExecutionException;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.concurrent.atomic.AtomicBoolean;
4445
import java.util.concurrent.atomic.AtomicReference;
@@ -60,6 +61,7 @@
6061
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
6162
import org.apache.kafka.clients.producer.ProducerConfig;
6263
import org.apache.kafka.clients.producer.ProducerRecord;
64+
import org.apache.kafka.common.errors.TopicExistsException;
6365
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
6466
import org.apache.kafka.common.serialization.ByteArraySerializer;
6567
import org.junit.jupiter.api.Test;
@@ -761,9 +763,22 @@ public void testConverterBean() throws Exception {
761763
@Test
762764
public void testAddingTopics() {
763765
int count = this.embeddedKafka.getTopics().size();
764-
this.embeddedKafka.addTopics("testAddingTopics");
766+
Map<String, Exception> results = this.embeddedKafka.addTopicsWithResults("testAddingTopics");
767+
assertThat(results).hasSize(1);
768+
assertThat(results.keySet().iterator().next()).isEqualTo("testAddingTopics");
769+
assertThat(results.get("testAddingTopics")).isNull();
765770
assertThat(this.embeddedKafka.getTopics().size()).isEqualTo(count + 1);
766-
this.embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1));
771+
results = this.embeddedKafka.addTopicsWithResults("testAddingTopics");
772+
assertThat(results).hasSize(1);
773+
assertThat(results.keySet().iterator().next()).isEqualTo("testAddingTopics");
774+
assertThat(results.get("testAddingTopics"))
775+
.isInstanceOf(ExecutionException.class)
776+
.hasCauseInstanceOf(TopicExistsException.class);
777+
assertThat(this.embeddedKafka.getTopics().size()).isEqualTo(count + 1);
778+
results = this.embeddedKafka.addTopicsWithResults(new NewTopic("morePartitions", 10, (short) 1));
779+
assertThat(results).hasSize(1);
780+
assertThat(results.keySet().iterator().next()).isEqualTo("morePartitions");
781+
assertThat(results.get("morePartitions")).isNull();
767782
assertThat(this.embeddedKafka.getTopics().size()).isEqualTo(count + 2);
768783
assertThatIllegalArgumentException()
769784
.isThrownBy(() -> this.embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1)))

src/reference/asciidoc/testing.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ public class MyTests {
142142
----
143143
====
144144

145+
By default, `addTopics` will throw an exception when problems arise (such as adding a topic that already exists).
146+
Version 2.6 added a new version of that method that returns a `Map<String, Exception>`; the key is the topic name and the value is `null` for success, or an `Exception` for a failure.
147+
145148
==== Using the Same Brokers for Multiple Test Classes
146149

147150
There is no built-in support for doing so, but you can use the same broker for multiple test classes with something similar to the following:

0 commit comments

Comments
 (0)