Skip to content

Commit 4e54723

Browse files
GH-3483: Allow @OverRide of KafkaAdmin createAdmin()
Signed-off-by: Anders Swanson <[email protected]>
1 parent 233f934 commit 4e54723

File tree

2 files changed

+16
-14
lines changed

2 files changed

+16
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.stream.Collectors;
3737

3838
import org.apache.commons.logging.LogFactory;
39+
import org.apache.kafka.clients.admin.Admin;
3940
import org.apache.kafka.clients.admin.AdminClient;
4041
import org.apache.kafka.clients.admin.AdminClientConfig;
4142
import org.apache.kafka.clients.admin.AlterConfigOp;
@@ -68,7 +69,7 @@
6869
import org.springframework.util.Assert;
6970

7071
/**
71-
* An admin that delegates to an {@link AdminClient} to create topics defined
72+
* An admin that delegates to an {@link Admin} to create topics defined
7273
* in the application context.
7374
*
7475
* @author Gary Russell
@@ -114,9 +115,9 @@ public class KafkaAdmin extends KafkaResourceFactory
114115
private String clusterId;
115116

116117
/**
117-
* Create an instance with an {@link AdminClient} based on the supplied
118+
* Create an instance with an {@link Admin} based on the supplied
118119
* configuration.
119-
* @param config the configuration for the {@link AdminClient}.
120+
* @param config the configuration for the {@link Admin}.
120121
*/
121122
public KafkaAdmin(Map<String, Object> config) {
122123
this.configs = new HashMap<>(config);
@@ -251,7 +252,7 @@ public void afterSingletonsInstantiated() {
251252
public final boolean initialize() {
252253
Collection<NewTopic> newTopics = newTopics();
253254
if (!newTopics.isEmpty()) {
254-
AdminClient adminClient = null;
255+
Admin adminClient = null;
255256
try {
256257
adminClient = createAdmin();
257258
}
@@ -347,7 +348,7 @@ protected Collection<NewTopic> newTopics() {
347348
@Nullable
348349
public String clusterId() {
349350
if (this.clusterId == null) {
350-
try (AdminClient client = createAdmin()) {
351+
try (Admin client = createAdmin()) {
351352
this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
352353
if (this.clusterId == null) {
353354
this.clusterId = "null";
@@ -365,14 +366,14 @@ public String clusterId() {
365366

366367
@Override
367368
public void createOrModifyTopics(NewTopic... topics) {
368-
try (AdminClient client = createAdmin()) {
369+
try (Admin client = createAdmin()) {
369370
addOrModifyTopicsIfNeeded(client, Arrays.asList(topics));
370371
}
371372
}
372373

373374
@Override
374375
public Map<String, TopicDescription> describeTopics(String... topicNames) {
375-
try (AdminClient admin = createAdmin()) {
376+
try (Admin admin = createAdmin()) {
376377
Map<String, TopicDescription> results = new HashMap<>();
377378
DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames));
378379
try {
@@ -389,7 +390,7 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
389390
}
390391
}
391392

392-
AdminClient createAdmin() {
393+
protected Admin createAdmin() {
393394
return AdminClient.create(getAdminConfig());
394395
}
395396

@@ -409,7 +410,7 @@ protected Map<String, Object> getAdminConfig() {
409410
return configs2;
410411
}
411412

412-
private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
413+
private void addOrModifyTopicsIfNeeded(Admin adminClient, Collection<NewTopic> topics) {
413414
if (!topics.isEmpty()) {
414415
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
415416
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
@@ -439,7 +440,7 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTo
439440
}
440441

441442
private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
442-
AdminClient adminClient, Collection<NewTopic> topics) {
443+
Admin adminClient, Collection<NewTopic> topics) {
443444

444445
List<ConfigResource> configResources = topics.stream()
445446
.map(topic -> new ConfigResource(Type.TOPIC, topic.name()))
@@ -484,7 +485,7 @@ private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
484485
}
485486
}
486487

487-
private void adjustConfigMismatches(AdminClient adminClient, Collection<NewTopic> topics,
488+
private void adjustConfigMismatches(Admin adminClient, Collection<NewTopic> topics,
488489
Map<ConfigResource, List<ConfigEntry>> mismatchingConfigs) {
489490
for (Map.Entry<ConfigResource, List<ConfigEntry>> mismatchingConfigsOfTopic : mismatchingConfigs.entrySet()) {
490491
ConfigResource topicConfigResource = mismatchingConfigsOfTopic.getKey();
@@ -556,7 +557,7 @@ else if (topic.numPartitions() > topicDescription.partitions().size()) {
556557
return topicsToModify;
557558
}
558559

559-
private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
560+
private void addTopics(Admin adminClient, List<NewTopic> topicsToAdd) {
560561
CreateTopicsResult topicResults = adminClient.createTopics(topicsToAdd);
561562
try {
562563
topicResults.all().get(this.operationTimeout, TimeUnit.SECONDS);
@@ -579,7 +580,7 @@ private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
579580
}
580581
}
581582

582-
private void createMissingPartitions(AdminClient adminClient, Map<String, NewPartitions> topicsToModify) {
583+
private void createMissingPartitions(Admin adminClient, Map<String, NewPartitions> topicsToModify) {
583584
CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify);
584585
try {
585586
partitionsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.atomic.AtomicReference;
3535

3636
import org.apache.kafka.clients.CommonClientConfigs;
37+
import org.apache.kafka.clients.admin.Admin;
3738
import org.apache.kafka.clients.admin.AdminClient;
3839
import org.apache.kafka.clients.admin.AdminClientConfig;
3940
import org.apache.kafka.clients.admin.AlterConfigOp;
@@ -286,7 +287,7 @@ void nullClusterId() {
286287
KafkaAdmin admin = new KafkaAdmin(Map.of()) {
287288

288289
@Override
289-
AdminClient createAdmin() {
290+
protected Admin createAdmin() {
290291
return mock;
291292
}
292293

0 commit comments

Comments
 (0)