Skip to content

Commit 8510afe

Browse files
authored
Topic Provisioning - default partitions/replicas
The `TopicBuilder` now supports deferring to the broker defaults for number of partitions and replicas. * Fix typo and check for negative partitions in KafkaAdmin to avoid bogus log.
1 parent 6dae2f1 commit 8510afe

File tree

4 files changed

+88
-10
lines changed

4 files changed

+88
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/TopicBuilder.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,12 +20,14 @@
2020
import java.util.HashMap;
2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.Optional;
2324

2425
import org.apache.kafka.clients.admin.NewTopic;
2526
import org.apache.kafka.common.config.TopicConfig;
2627

2728
/**
28-
* Builder for a {@link NewTopic}.
29+
* Builder for a {@link NewTopic}. Since 2.6 partitions and replicas default to
30+
* {@link Optional#empty()} indicating the broker defaults will be applied.
2931
*
3032
* @author Gary Russell
3133
* @since 2.3
@@ -35,9 +37,9 @@ public final class TopicBuilder {
3537

3638
private final String name;
3739

38-
private int partitions = 1;
40+
private Optional<Integer> partitions = Optional.empty();
3941

40-
private short replicas = 1;
42+
private Optional<Short> replicas = Optional.empty();
4143

4244
private Map<Integer, List<Integer>> replicasAssignments;
4345

@@ -48,22 +50,22 @@ private TopicBuilder(String name) {
4850
}
4951

5052
/**
51-
* Set the number of partitions (default 1).
53+
* Set the number of partitions (default broker 'num.partitions').
5254
* @param partitionCount the partitions.
5355
* @return the builder.
5456
*/
5557
public TopicBuilder partitions(int partitionCount) {
56-
this.partitions = partitionCount;
58+
this.partitions = Optional.of(partitionCount);
5759
return this;
5860
}
5961

6062
/**
61-
* Set the number of replicas (default 1).
63+
* Set the number of replicas (default broker 'default.replication.factor').
6264
* @param replicaCount the replicas (which will be cast to short).
6365
* @return the builder.
6466
*/
6567
public TopicBuilder replicas(int replicaCount) {
66-
this.replicas = (short) replicaCount;
68+
this.replicas = Optional.of((short) replicaCount);
6769
return this;
6870
}
6971

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ private Map<String, NewPartitions> checkPartitions(Map<String, NewTopic> topicNa
234234
NewTopic topic = topicNameToTopic.get(n);
235235
try {
236236
TopicDescription topicDescription = f.get(this.operationTimeout, TimeUnit.SECONDS);
237-
if (topic.numPartitions() < topicDescription.partitions().size()) {
237+
if (topic.numPartitions() >= 0 && topic.numPartitions() < topicDescription.partitions().size()) {
238238
LOGGER.info(() -> String.format(
239239
"Topic '%s' exists but has a different partition count: %d not %d", n,
240240
topicDescription.partitions().size(), topic.numPartitions()));

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashMap;
2626
import java.util.Map;
2727
import java.util.Optional;
28+
import java.util.concurrent.TimeUnit;
2829
import java.util.concurrent.atomic.AtomicReference;
2930

3031
import org.apache.kafka.clients.admin.AdminClient;
@@ -109,6 +110,32 @@ public void testAddTopicsAndAddPartitions() throws Exception {
109110
adminClient.close(Duration.ofSeconds(10));
110111
}
111112

113+
@Test
114+
public void testDefaultPartsAndReplicas() throws Exception {
115+
try (AdminClient adminClient = AdminClient.create(this.admin.getConfigurationProperties())) {
116+
DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("optBoth", "optPart", "optRepl"));
117+
Map<String, TopicDescription> results = topics.all().get(10, TimeUnit.SECONDS);
118+
var topicDescription = results.get("optBoth");
119+
assertThat(topicDescription.partitions()).hasSize(2);
120+
assertThat(topicDescription.partitions().stream()
121+
.map(tpi -> tpi.replicas())
122+
.flatMap(nodes -> nodes.stream())
123+
.count()).isEqualTo(4);
124+
topicDescription = results.get("optPart");
125+
assertThat(topicDescription.partitions()).hasSize(2);
126+
assertThat(topicDescription.partitions().stream()
127+
.map(tpi -> tpi.replicas())
128+
.flatMap(nodes -> nodes.stream())
129+
.count()).isEqualTo(2);
130+
topicDescription = results.get("optRepl");
131+
assertThat(topicDescription.partitions()).hasSize(3);
132+
assertThat(topicDescription.partitions().stream()
133+
.map(tpi -> tpi.replicas())
134+
.flatMap(nodes -> nodes.stream())
135+
.count()).isEqualTo(6);
136+
}
137+
}
138+
112139
@Test
113140
public void alreadyExists() throws Exception {
114141
AtomicReference<Method> addTopics = new AtomicReference<>();
@@ -153,7 +180,8 @@ public static class Config {
153180

154181
@Bean
155182
public EmbeddedKafkaBroker kafkaEmbedded() {
156-
return new EmbeddedKafkaBroker(3);
183+
return new EmbeddedKafkaBroker(3)
184+
.brokerProperty("default.replication.factor", 2);
157185
}
158186

159187
@Bean
@@ -192,6 +220,26 @@ public NewTopic topic3() {
192220
.build();
193221
}
194222

223+
@Bean
224+
public NewTopic topic4() {
225+
return TopicBuilder.name("optBoth")
226+
.build();
227+
}
228+
229+
@Bean
230+
public NewTopic topic5() {
231+
return TopicBuilder.name("optPart")
232+
.replicas(1)
233+
.build();
234+
}
235+
236+
@Bean
237+
public NewTopic topic6() {
238+
return TopicBuilder.name("optRepl")
239+
.partitions(3)
240+
.build();
241+
}
242+
195243
}
196244

197245
}

src/reference/asciidoc/kafka.adoc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,34 @@ public NewTopic topic3() {
116116
----
117117
====
118118

119+
Starting with version 2.6, you can omit `.partitions()` and/or `replicas()` and the broker defaults will be applied to those properties.
120+
The broker version must be at least 2.4.0 to support this feature - see https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic[KIP-464].
121+
122+
====
123+
[source, java]
124+
----
125+
@Bean
126+
public NewTopic topic4() {
127+
return TopicBuilder.name("defaultBoth")
128+
.build();
129+
}
130+
131+
@Bean
132+
public NewTopic topic5() {
133+
return TopicBuilder.name("defaultPart")
134+
.replicas(1)
135+
.build();
136+
}
137+
138+
@Bean
139+
public NewTopic topic6() {
140+
return TopicBuilder.name("defaultRepl")
141+
.partitions(3)
142+
.build();
143+
}
144+
----
145+
====
146+
119147
IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` `@Bean` s.
120148

121149
By default, if the broker is not available, a message is logged, but the context continues to load.

0 commit comments

Comments
 (0)