Skip to content

Commit cd455f6

Browse files
garyrussellartembilan
authored andcommitted
GH-2538: KafkaTemplate - Fix Eager Init Cluster Id
Resolves #2538 The `KafkaTemplate` already has code to lazily fetch the cluster id when first needed. Remove the eager fetch in `afterSingletonsInstantiated`. Also, when creating a new `KafkaAdmin` propagate the `operationTimeout` from the source admin (in both the template and listener container).
1 parent 7e336d1 commit cd455f6

File tree

4 files changed

+37
-17
lines changed

4 files changed

+37
-17
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -132,6 +132,15 @@ public void setOperationTimeout(int operationTimeout) {
132132
this.operationTimeout = operationTimeout;
133133
}
134134

135+
/**
136+
* Return the operation timeout in seconds.
137+
* @return the timeout.
138+
* @since 3.0.2
139+
*/
140+
public int getOperationTimeout() {
141+
return this.operationTimeout;
142+
}
143+
135144
/**
136145
* Set to true if you want the application context to fail to load if we are unable
137146
* to connect to the broker during initialization, to check/add topics.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -434,9 +434,10 @@ public void afterSingletonsInstantiated() {
434434
if (!producerServers.equals(adminServers)) {
435435
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
436436
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
437+
int opTo = this.kafkaAdmin.getOperationTimeout();
437438
this.kafkaAdmin = new KafkaAdmin(props);
439+
this.kafkaAdmin.setOperationTimeout(opTo);
438440
}
439-
this.clusterId = this.kafkaAdmin.clusterId();
440441
}
441442
}
442443
else if (this.micrometerEnabled) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,9 @@ private KafkaAdmin obtainAdmin() {
945945
Map<String, Object> props = new HashMap<>(admin.getConfigurationProperties());
946946
if (!props.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG).equals(this.bootstrapServers)) {
947947
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
948+
int opTo = admin.getOperationTimeout();
948949
admin = new KafkaAdmin(props);
950+
admin.setOperationTimeout(opTo);
949951
}
950952
}
951953
return admin;

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -103,12 +103,14 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
103103
SimpleSpan span = spans.poll();
104104
assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template");
105105
assertThat(span.getName()).isEqualTo("observation.testT1 send");
106+
assertThat(span.getRemoteServiceName()).startsWith("Apache Kafka: ");
106107
await().until(() -> spans.peekFirst().getTags().size() == 3);
107108
span = spans.poll();
108109
assertThat(span.getTags())
109110
.containsAllEntriesOf(
110111
Map.of("spring.kafka.listener.id", "obs1-0", "foo", "some foo value", "bar", "some bar value"));
111112
assertThat(span.getName()).isEqualTo("observation.testT1 receive");
113+
assertThat(span.getRemoteServiceName()).startsWith("Apache Kafka: ");
112114
await().until(() -> spans.peekFirst().getTags().size() == 1);
113115
span = spans.poll();
114116
assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template");
@@ -179,24 +181,27 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
179181
assertThat(admin.getConfigurationProperties())
180182
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
181183
// producer factory broker different to admin
182-
assertThat(
183-
KafkaTestUtils.getPropertyValue(template, "kafkaAdmin", KafkaAdmin.class).getConfigurationProperties())
184-
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
185-
broker.getBrokersAsString() + "," + broker.getBrokersAsString());
184+
KafkaAdmin pAdmin = KafkaTestUtils.getPropertyValue(template, "kafkaAdmin", KafkaAdmin.class);
185+
assertThat(pAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout());
186+
assertThat(pAdmin.getConfigurationProperties())
187+
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
188+
broker.getBrokersAsString() + "," + broker.getBrokersAsString());
186189
Object container = KafkaTestUtils
187190
.getPropertyValue(endpointRegistry.getListenerContainer("obs1"), "containers", List.class).get(0);
188191
// consumer factory broker different to admin
189-
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class)
190-
.getConfigurationProperties())
191-
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
192-
broker.getBrokersAsString() + "," + broker.getBrokersAsString() + ","
193-
+ broker.getBrokersAsString());
192+
KafkaAdmin cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class);
193+
assertThat(cAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout());
194+
assertThat(cAdmin.getConfigurationProperties())
195+
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
196+
broker.getBrokersAsString() + "," + broker.getBrokersAsString() + ","
197+
+ broker.getBrokersAsString());
194198
// broker override in annotation
195199
container = KafkaTestUtils
196200
.getPropertyValue(endpointRegistry.getListenerContainer("obs2"), "containers", List.class).get(0);
197-
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class)
198-
.getConfigurationProperties())
199-
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
201+
cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class);
202+
assertThat(cAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout());
203+
assertThat(cAdmin.getConfigurationProperties())
204+
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
200205
}
201206

202207
@Configuration
@@ -205,7 +210,10 @@ public static class Config {
205210

206211
@Bean
207212
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
208-
return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
213+
KafkaAdmin admin = new KafkaAdmin(
214+
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
215+
admin.setOperationTimeout(42);
216+
return admin;
209217
}
210218

211219
@Bean

0 commit comments

Comments
 (0)