Skip to content

Commit 802e47c

Browse files
authored
GH-2626: Support External Admin for Cluster Id
Resolves #2626
1 parent c2177f6 commit 802e47c

File tree

4 files changed

+96
-15
lines changed

4 files changed

+96
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
148148

149149
private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
150150

151+
@Nullable
151152
private KafkaAdmin kafkaAdmin;
152153

153154
private String clusterId;
@@ -419,22 +420,44 @@ public void setObservationConvention(KafkaTemplateObservationConvention observat
419420
this.observationConvention = observationConvention;
420421
}
421422

423+
/**
424+
* Return the {@link KafkaAdmin}, used to find the cluster id for observation, if
425+
* present.
426+
* @return the kafkaAdmin
427+
* @since 3.0.5
428+
*/
429+
@Nullable
430+
public KafkaAdmin getKafkaAdmin() {
431+
return this.kafkaAdmin;
432+
}
433+
434+
/**
435+
* Set the {@link KafkaAdmin}, used to find the cluster id for observation, if
436+
* present.
437+
* @param kafkaAdmin the admin.
438+
*/
439+
public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
440+
this.kafkaAdmin = kafkaAdmin;
441+
}
442+
422443
@Override
423444
public void afterSingletonsInstantiated() {
424445
if (this.observationEnabled && this.applicationContext != null) {
425446
this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
426447
.getIfUnique(() -> this.observationRegistry);
427-
this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
428-
if (this.kafkaAdmin != null) {
429-
Object producerServers = this.producerFactory.getConfigurationProperties()
430-
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
431-
String adminServers = this.kafkaAdmin.getBootstrapServers();
432-
if (!producerServers.equals(adminServers)) {
433-
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
434-
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
435-
int opTo = this.kafkaAdmin.getOperationTimeout();
436-
this.kafkaAdmin = new KafkaAdmin(props);
437-
this.kafkaAdmin.setOperationTimeout(opTo);
448+
if (this.kafkaAdmin == null) {
449+
this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
450+
if (this.kafkaAdmin != null) {
451+
Object producerServers = this.producerFactory.getConfigurationProperties()
452+
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
453+
String adminServers = this.kafkaAdmin.getBootstrapServers();
454+
if (!producerServers.equals(adminServers)) {
455+
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
456+
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
457+
int opTo = this.kafkaAdmin.getOperationTimeout();
458+
this.kafkaAdmin = new KafkaAdmin(props);
459+
this.kafkaAdmin.setOperationTimeout(opTo);
460+
}
438461
}
439462
}
440463
}

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.springframework.context.ApplicationEventPublisherAware;
4747
import org.springframework.core.log.LogAccessor;
4848
import org.springframework.kafka.core.ConsumerFactory;
49+
import org.springframework.kafka.core.KafkaAdmin;
4950
import org.springframework.kafka.event.ContainerStoppedEvent;
5051
import org.springframework.kafka.support.KafkaHeaders;
5152
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -132,6 +133,9 @@ public abstract class AbstractMessageListenerContainer<K, V>
132133
@NonNull
133134
private Function<MessageListenerContainer, String> threadNameSupplier = container -> container.getListenerId();
134135

136+
@Nullable
137+
private KafkaAdmin kafkaAdmin;
138+
135139
/**
136140
* Construct an instance with the provided factory and properties.
137141
* @param consumerFactory the factory.
@@ -471,6 +475,26 @@ public void setThreadNameSupplier(Function<MessageListenerContainer, String> thr
471475
this.threadNameSupplier = threadNameSupplier;
472476
}
473477

478+
/**
479+
* Return the {@link KafkaAdmin}, used to find the cluster id for observation, if
480+
* present.
481+
* @return the kafkaAdmin
482+
* @since 3.0.5
483+
*/
484+
@Nullable
485+
public KafkaAdmin getKafkaAdmin() {
486+
return this.kafkaAdmin;
487+
}
488+
489+
/**
490+
* Set the {@link KafkaAdmin}, used to find the cluster id for observation, if
491+
* present.
492+
* @param kafkaAdmin the admin.
493+
*/
494+
public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
495+
this.kafkaAdmin = kafkaAdmin;
496+
}
497+
474498
protected RecordInterceptor<K, V> getRecordInterceptor() {
475499
return this.recordInterceptor;
476500
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -950,7 +950,8 @@ private Object determineBootstrapServers(Properties consumerProperties) {
950950

951951
@Nullable
952952
private KafkaAdmin obtainAdmin() {
953-
if (this.containerProperties.isObservationEnabled()) {
953+
KafkaAdmin customAdmin = KafkaMessageListenerContainer.this.thisOrParentContainer.getKafkaAdmin();
954+
if (customAdmin == null && this.containerProperties.isObservationEnabled()) {
954955
ApplicationContext applicationContext = getApplicationContext();
955956
if (applicationContext != null) {
956957
KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
@@ -966,6 +967,9 @@ private KafkaAdmin obtainAdmin() {
966967
return admin;
967968
}
968969
}
970+
else {
971+
return customAdmin;
972+
}
969973
return null;
970974
}
971975

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.awaitility.Awaitility.await;
21+
import static org.mockito.Mockito.mock;
2122

2223
import java.util.Arrays;
2324
import java.util.Deque;
@@ -48,6 +49,7 @@
4849
import org.springframework.kafka.core.KafkaAdmin;
4950
import org.springframework.kafka.core.KafkaTemplate;
5051
import org.springframework.kafka.core.ProducerFactory;
52+
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
5153
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
5254
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
5355
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -81,15 +83,16 @@
8183
*
8284
*/
8385
@SpringJUnitConfig
84-
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2" })
86+
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" })
8587
@DirtiesContext
8688
public class ObservationTests {
8789

8890
@Test
8991
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
9092
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
9193
@Autowired MeterRegistry meterRegistry, @Autowired EmbeddedKafkaBroker broker,
92-
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin)
94+
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin,
95+
@Autowired KafkaTemplate<Integer, String> customTemplate, @Autowired Config config)
9396
throws InterruptedException, ExecutionException, TimeoutException {
9497

9598
template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS);
@@ -186,9 +189,12 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
186189
assertThat(pAdmin.getConfigurationProperties())
187190
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
188191
broker.getBrokersAsString() + "," + broker.getBrokersAsString());
192+
// custom admin
193+
assertThat(customTemplate.getKafkaAdmin()).isSameAs(config.mockAdmin);
194+
195+
// consumer factory broker different to admin
189196
Object container = KafkaTestUtils
190197
.getPropertyValue(endpointRegistry.getListenerContainer("obs1"), "containers", List.class).get(0);
191-
// consumer factory broker different to admin
192198
KafkaAdmin cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class);
193199
assertThat(cAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout());
194200
assertThat(cAdmin.getConfigurationProperties())
@@ -202,12 +208,19 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
202208
assertThat(cAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout());
203209
assertThat(cAdmin.getConfigurationProperties())
204210
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
211+
// custom admin
212+
container = KafkaTestUtils
213+
.getPropertyValue(endpointRegistry.getListenerContainer("obs3"), "containers", List.class).get(0);
214+
cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class);
215+
assertThat(cAdmin).isSameAs(config.mockAdmin);
205216
}
206217

207218
@Configuration
208219
@EnableKafka
209220
public static class Config {
210221

222+
KafkaAdmin mockAdmin = mock(KafkaAdmin.class);
223+
211224
@Bean
212225
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
213226
KafkaAdmin admin = new KafkaAdmin(
@@ -239,6 +252,14 @@ KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
239252
return template;
240253
}
241254

255+
@Bean
256+
KafkaTemplate<Integer, String> customTemplate(ProducerFactory<Integer, String> pf) {
257+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
258+
template.setObservationEnabled(true);
259+
template.setKafkaAdmin(this.mockAdmin);
260+
return template;
261+
}
262+
242263
@Bean
243264
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
244265
ConsumerFactory<Integer, String> cf) {
@@ -247,6 +268,11 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
247268
new ConcurrentKafkaListenerContainerFactory<>();
248269
factory.setConsumerFactory(cf);
249270
factory.getContainerProperties().setObservationEnabled(true);
271+
factory.setContainerCustomizer(container -> {
272+
if (container.getListenerId().equals("obs3")) {
273+
((AbstractMessageListenerContainer<Integer, String>) container).setKafkaAdmin(this.mockAdmin);
274+
}
275+
});
250276
return factory;
251277
}
252278

@@ -339,6 +365,10 @@ void listen2(ConsumerRecord<?, ?> in) {
339365
this.latch2.countDown();
340366
}
341367

368+
@KafkaListener(id = "obs3", topics = "observation.testT3")
369+
void listen3(ConsumerRecord<Integer, String> in) {
370+
}
371+
342372
}
343373

344374
}

0 commit comments

Comments
 (0)