Skip to content

Commit e0aec46

Browse files
committed
GH-3124 Add testing for accessing additional configuration properties
1 parent 899469a commit e0aec46

File tree

5 files changed

+134
-3
lines changed

5 files changed

+134
-3
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@
4646
<groupId>org.springframework.kafka</groupId>
4747
<artifactId>spring-kafka</artifactId>
4848
</dependency>
49+
<dependency>
50+
<groupId>com.fasterxml.jackson.datatype</groupId>
51+
<artifactId>jackson-datatype-jsr310</artifactId>
52+
</dependency>
4953
<dependency>
5054
<groupId>org.springframework.boot</groupId>
5155
<artifactId>spring-boot-test</artifactId>

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import java.util.regex.Pattern;
4141
import java.util.stream.Collectors;
4242

43+
import com.fasterxml.jackson.databind.ObjectMapper;
44+
import com.fasterxml.jackson.databind.SerializationFeature;
4345
import org.apache.kafka.clients.consumer.Consumer;
4446
import org.apache.kafka.clients.consumer.ConsumerConfig;
4547
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -250,6 +252,9 @@ public class KafkaMessageChannelBinder extends
250252

251253
private final KafkaAdmin kafkaAdmin;
252254

255+
// used strictly for serializeing additional configuration properties. See 'doGetAdditionalConfigurationProperties'
256+
private ObjectMapper objectMapper = new ObjectMapper();
257+
253258
public KafkaMessageChannelBinder(
254259
KafkaBinderConfigurationProperties configurationProperties,
255260
KafkaTopicProvisioner provisioningProvider) {
@@ -284,6 +289,8 @@ public KafkaMessageChannelBinder(
284289
this.dlqPartitionFunction = dlqPartitionFunction;
285290
this.dlqDestinationResolver = dlqDestinationResolver;
286291
this.kafkaAdmin = new KafkaAdmin(new HashMap<>(provisioningProvider.getAdminClientProperties()));
292+
this.objectMapper.findAndRegisterModules();
293+
this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
287294
}
288295

289296
@Override
@@ -851,14 +858,30 @@ private void propagateContainerProperties(ContainerProperties containerPropertie
851858
containerProperties.setStopImmediate(listener.isImmediateStop());
852859
}
853860

861+
/**
862+
* Returns an unmodifiable copy of {@link ContainerProperties} associated with the destination name
863+
* which corresponds to a particular binding which could be accessed under 'containerProperties' key.
864+
*
865+
* @param destinationName the name of the destination (or binding name if destination is not specified)
866+
* @return map of
867+
*/
868+
@Override
869+
protected Map<String, Object> doGetAdditionalConfigurationProperties(String destinationName) {
870+
ContainerProperties kafkaContainerProperties = this.kafkaMessageListenerContainers.iterator().next().getContainerProperties();
871+
Map mapOfContainerProperties = this.objectMapper.convertValue(kafkaContainerProperties, Map.class);
872+
Map<String, Object> additionalConfigurationProperties = new HashMap<>();
873+
additionalConfigurationProperties.put("containerProperties", mapOfContainerProperties);
874+
return additionalConfigurationProperties;
875+
}
876+
854877
private BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> createDestResolver(
855-
KafkaConsumerProperties extension) {
878+
KafkaConsumerProperties extension) {
856879

857880
Integer dlqPartitions = extension.getDlqPartitions();
858881
if (extension.isEnableDlq()) {
859882
return (rec, ex) -> dlqPartitions == null || dlqPartitions > 1
860-
? new TopicPartition(extension.getDlqName(), rec.partition())
861-
: new TopicPartition(extension.getDlqName(), 0);
883+
? new TopicPartition(extension.getDlqName(), rec.partition())
884+
: new TopicPartition(extension.getDlqName(), 0);
862885
}
863886
else {
864887
return null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka;
18+
19+
import java.util.Map;
20+
import java.util.function.Consumer;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
26+
import org.springframework.boot.test.context.SpringBootTest;
27+
import org.springframework.cloud.stream.binder.Binding;
28+
import org.springframework.cloud.stream.binding.BindingService;
29+
import org.springframework.context.annotation.Bean;
30+
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.kafka.test.context.EmbeddedKafka;
32+
import org.springframework.test.annotation.DirtiesContext;
33+
34+
import static org.assertj.core.api.Assertions.assertThat;
35+
36+
/**
37+
* @author Oleg Zhurakousky
38+
*/
39+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
40+
"spring.cloud.function.definition=barConsumer;fooConsumer",
41+
"spring.kafka.listener.immediate-stop=true",
42+
"spring.cloud.stream.bindings.fooConsumer-in-0.destination=foo"
43+
})
44+
@EmbeddedKafka
45+
@DirtiesContext
46+
public class KafkaConfigurationTests {
47+
48+
49+
@Autowired
50+
private BindingService bindingService;
51+
52+
@Test
53+
void testKafkaContainerConfigurationPropagation() throws Exception {
54+
Binding<?> fooDestination = this.bindingService.getConsumerBindings("fooConsumer-in-0").iterator().next();
55+
Map<String, Object> fooAdditionalConfigurationProperties = fooDestination.getAdditionalConfigurationProperties();
56+
assertThat(((Map) fooAdditionalConfigurationProperties.get("containerProperties")).get("stopImmediate")).isEqualTo(true);
57+
58+
Binding<?> barDestination = this.bindingService.getConsumerBindings("barConsumer-in-0").iterator().next();
59+
Map<String, Object> barAdditionalConfigurationProperties = barDestination.getAdditionalConfigurationProperties();
60+
assertThat(((Map) barAdditionalConfigurationProperties.get("containerProperties")).get("stopImmediate")).isEqualTo(true);
61+
}
62+
63+
@EnableAutoConfiguration
64+
@Configuration
65+
public static class Config {
66+
67+
@Bean
68+
Consumer<String> barConsumer() {
69+
return message -> {
70+
};
71+
}
72+
@Bean
73+
Consumer<String> fooConsumer() {
74+
return message -> {
75+
};
76+
}
77+
}
78+
}

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,11 @@ public String getBinderType() {
582582
return resolveBinderType(getBindingName(), bsp);
583583
}
584584

585+
@Override
586+
public Map<String, Object> getAdditionalConfigurationProperties() {
587+
return doGetAdditionalConfigurationProperties(this.getName());
588+
}
589+
585590
@Override
586591
protected void afterUnbind() {
587592
try {
@@ -617,6 +622,17 @@ else if (e instanceof ProvisioningException provisioningException) {
617622
}
618623
}
619624

625+
/**
626+
* This method must be implemented by an individual binders to produce an
627+
* immutable version of additional configuration properties primarily for testing and diagnosing/debugging issues.
628+
* @return map pf additional configuration properties as key/vaalue.
629+
*/
630+
protected Map<String, Object> doGetAdditionalConfigurationProperties(String name) {
631+
logger.warn("This method must be implemented by an individual binders to produce " +
632+
"an immutable version of additional configuration properties primarily for testing and diagnosing/debugging issues");
633+
return null;
634+
}
635+
620636
@Override
621637
public Binding<PollableSource<MessageHandler>> bindPollableConsumer(String name,
622638
String group, final PollableSource<MessageHandler> inboundBindTarget,

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/Binding.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ default <P> P getExtension() {
5050
return null;
5151
}
5252

53+
/**
54+
* Will return additional configuration properties associated with this binding.
55+
* Must be implemented by individual binders and is primarily designed for testing and
56+
* debugging/diagnosing issues.
57+
* @return additional configuration properties associated with this binding.
58+
*/
59+
default Map<String, Object> getAdditionalConfigurationProperties() {
60+
return Collections.emptyMap();
61+
}
62+
5363
/**
5464
* Starts the target component represented by this instance. NOTE: At the time the
5565
* instance is created the component is already started. This operation is typically

0 commit comments

Comments
 (0)