Skip to content

Commit 0eb7a51

Browse files
committed
GH-3148 Add KafkaBindingRebalanceListener to the shared.beans
Resolves #3148
1 parent 7ebcc06 commit 0eb7a51

File tree

3 files changed

+161
-0
lines changed

3 files changed

+161
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2023-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka.integration;
18+
19+
import java.io.IOException;
20+
import java.lang.reflect.Field;
21+
import java.util.Collection;
22+
import java.util.Map;
23+
import java.util.Properties;
24+
import java.util.function.Consumer;
25+
26+
import org.apache.kafka.common.TopicPartition;
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
31+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
32+
import org.springframework.boot.test.context.SpringBootTest;
33+
import org.springframework.cloud.stream.binder.Binder;
34+
import org.springframework.cloud.stream.binder.DefaultBinderFactory;
35+
import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener;
36+
import org.springframework.context.ApplicationContext;
37+
import org.springframework.context.ConfigurableApplicationContext;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.context.annotation.Configuration;
40+
import org.springframework.context.annotation.PropertySource;
41+
import org.springframework.core.env.PropertiesPropertySource;
42+
import org.springframework.core.io.support.EncodedResource;
43+
import org.springframework.core.io.support.PropertySourceFactory;
44+
import org.springframework.kafka.test.context.EmbeddedKafka;
45+
import org.springframework.util.ReflectionUtils;
46+
47+
import static org.assertj.core.api.Assertions.assertThat;
48+
import static org.assertj.core.api.Assertions.fail;
49+
50+
/**
51+
*
52+
*/
53+
@SpringBootTest(classes = KafkaMultiBinderCustomConfigurationTests3148.SampleApplication.class,
54+
webEnvironment = SpringBootTest.WebEnvironment.NONE
55+
)
56+
@EmbeddedKafka(controlledShutdown = true)
57+
public class KafkaMultiBinderCustomConfigurationTests3148 {
58+
59+
@Autowired
60+
ApplicationContext applicationContext;
61+
62+
@Autowired
63+
private DefaultBinderFactory binderFactory;
64+
65+
@Test
66+
public void test() {
67+
ConfigurableApplicationContext kafkaContext = getBinderContext("test-binder");
68+
69+
KafkaBindingRebalanceListener kafkaBindingRebalanceListener = kafkaContext.getBean(KafkaBindingRebalanceListener.class);
70+
assertThat(kafkaBindingRebalanceListener).isNotNull();
71+
}
72+
73+
private ConfigurableApplicationContext getBinderContext(String binderName) {
74+
Field binderInstanceCacheField = ReflectionUtils.findField(DefaultBinderFactory.class, "binderInstanceCache");
75+
assertThat(binderInstanceCacheField).isNotNull();
76+
ReflectionUtils.makeAccessible(binderInstanceCacheField);
77+
try {
78+
Map<String, Map.Entry<Binder<?, ?, ?>, ConfigurableApplicationContext>> binderInstanceCache =
79+
(Map<String, Map.Entry<Binder<?, ?, ?>, ConfigurableApplicationContext>>) binderInstanceCacheField.get(this.binderFactory);
80+
return binderInstanceCache.get(binderName).getValue();
81+
} catch (Exception e) {
82+
fail();
83+
}
84+
return null;
85+
}
86+
@Configuration
87+
@EnableAutoConfiguration
88+
@PropertySource(value= "classpath:test3148.yml", factory = KafkaMultiBinderCustomConfigurationTests3148.YamlPropertySourceFactory.class)
89+
public static class SampleApplication {
90+
91+
@Bean
92+
public Consumer<String> testConsumer() {
93+
return consumer -> {};
94+
}
95+
96+
@Bean
97+
public KafkaBindingRebalanceListener rebalanceListener() {
98+
return new KafkaBindingRebalanceListener() {
99+
public void onPartitionsAssigned(String bindingName, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
100+
Collection<TopicPartition> partitions, boolean initial) {
101+
// do nothing
102+
}
103+
};
104+
}
105+
}
106+
107+
108+
109+
public static class YamlPropertySourceFactory implements PropertySourceFactory {
110+
111+
@Override
112+
public org.springframework.core.env.PropertySource<?> createPropertySource(String name, EncodedResource encodedResource)
113+
throws IOException {
114+
YamlPropertiesFactoryBean factory = new YamlPropertiesFactoryBean();
115+
factory.setResources(encodedResource.getResource());
116+
117+
Properties properties = factory.getObject();
118+
119+
return new PropertiesPropertySource(encodedResource.getResource().getFilename(), properties);
120+
}
121+
}
122+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
spring:
2+
cloud:
3+
function:
4+
definition: testConsumer
5+
stream:
6+
kafka:
7+
binder:
8+
brokers: localhost:9092
9+
binders:
10+
test-binder:
11+
type: 'kafka'
12+
environment:
13+
spring.cloud.stream.kafka.binder:
14+
consumer-properties:
15+
max.poll.records: 1 # < at least one property here
16+
bindings:
17+
testConsumer-in-0:
18+
binder: test-binder
19+
group: test-group
20+
destination: test-topic
21+
#spring:
22+
# application:
23+
# name: "spring-cloud-stream-multibinder"
24+
# cloud:
25+
# function:
26+
# definition: testConsumer
27+
# stream:
28+
# bindings:
29+
# testConsumer-in-0:
30+
# destination: "foo"
31+
# binder: kafka1
32+
# binders:
33+
# kafka1:
34+
# type: kafka
35+
# environment:
36+
# spring.cloud.stream.kafka.binder:
37+
# consumer-properties:
38+
# max.poll.records: 1

core/spring-cloud-stream/src/main/resources/META-INF/shared.beans

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ org.springframework.cloud.stream.binder.test.InputDestination
1515
org.springframework.cloud.stream.binder.test.OutputDestination
1616
org.springframework.cloud.stream.config.BindingHandlerAdvise
1717
io.micrometer.observation.ObservationRegistry
18+
org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener

0 commit comments

Comments
 (0)