Skip to content

Commit 6270601

Browse files
authored
GH-2870: EmbeddedKafka: register BeanDefinition (#2872)
* GH-2870: EmbeddedKafka: register BeanDefinition Fixes #2870 The `EmbeddedKafkaContextCustomizer` uses `beanFactory.initializeBean()` which is too early according to the `ApplicationContext` lifecycle since it is not refreshed yet for `ContextCustomizer` * Rework the logic in the `EmbeddedKafkaContextCustomizer` to register a `BeanDefinition` for an `EmbeddedKafkaBroker` to include it into standard `ApplicationContext` lifecycle * Also mark `@EmbeddedKafka` with a `@DisabledInAotMode` to disallow this kind of tests in native images * * Call `embeddedKafkaBroker.afterPropertiesSet()` explicitly in the `EmbeddedKafkaContextCustomizer` since we need to have broker started and system props initialized before the rest of `ApplicationContext`
1 parent 9a64395 commit 6270601

File tree

6 files changed

+68
-86
lines changed

6 files changed

+68
-86
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ ext {
7575
springBootVersion = '3.0.9' // docs module
7676
springDataVersion = '2023.1.0-RC1'
7777
springRetryVersion = '2.0.4'
78-
springVersion = '6.1.0-RC1'
78+
springVersion = '6.1.0-SNAPSHOT'
7979
zookeeperVersion = '3.6.4'
8080

8181
idPrefix = 'kafka'

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.ExecutionException;
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.concurrent.TimeoutException;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3637
import java.util.concurrent.atomic.AtomicReference;
3738
import java.util.function.Function;
3839
import java.util.stream.Collectors;
@@ -92,6 +93,8 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {
9293

9394
private final Properties brokerProperties = new Properties();
9495

96+
private final AtomicBoolean initialized = new AtomicBoolean();
97+
9598
private KafkaClusterTestKit cluster;
9699

97100
private int[] kafkaPorts;
@@ -191,9 +194,11 @@ public void setAdminTimeout(int adminTimeout) {
191194

192195
@Override
193196
public void afterPropertiesSet() {
194-
overrideExitMethods();
195-
addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count);
196-
start();
197+
if (this.initialized.compareAndSet(false, true)) {
198+
overrideExitMethods();
199+
addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count);
200+
start();
201+
}
197202
}
198203

199204

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.ExecutionException;
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.TimeoutException;
40+
import java.util.concurrent.atomic.AtomicBoolean;
4041
import java.util.concurrent.atomic.AtomicReference;
4142
import java.util.function.Function;
4243
import java.util.stream.Collectors;
@@ -111,6 +112,8 @@ public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {
111112

112113
private final Map<String, Object> brokerProperties = new HashMap<>();
113114

115+
private final AtomicBoolean initialized = new AtomicBoolean();
116+
114117
private EmbeddedZookeeper zookeeper;
115118

116119
private String zkConnect;
@@ -289,45 +292,47 @@ public synchronized EmbeddedKafkaZKBroker zkSessionTimeout(int zkSessionTimeout)
289292

290293
@Override
291294
public void afterPropertiesSet() {
292-
overrideExitMethods();
293-
try {
294-
this.zookeeper = new EmbeddedZookeeper(this.zkPort);
295-
}
296-
catch (IOException | InterruptedException e) {
297-
throw new IllegalStateException("Failed to create embedded Zookeeper", e);
298-
}
299-
this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort();
300-
this.kafkaServers.clear();
301-
boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
302-
for (int i = 0; i < this.count; i++) {
303-
Properties brokerConfigProperties = createBrokerProperties(i);
304-
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
305-
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
306-
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
307-
brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(),
308-
String.valueOf(Long.MAX_VALUE));
309-
this.brokerProperties.forEach(brokerConfigProperties::put);
310-
if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
311-
brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
295+
if (this.initialized.compareAndSet(false, true)) {
296+
overrideExitMethods();
297+
try {
298+
this.zookeeper = new EmbeddedZookeeper(this.zkPort);
299+
}
300+
catch (IOException | InterruptedException e) {
301+
throw new IllegalStateException("Failed to create embedded Zookeeper", e);
312302
}
313-
if (!userLogDir) {
314-
logDir(brokerConfigProperties);
303+
this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort();
304+
this.kafkaServers.clear();
305+
boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
306+
for (int i = 0; i < this.count; i++) {
307+
Properties brokerConfigProperties = createBrokerProperties(i);
308+
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
309+
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
310+
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
311+
brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(),
312+
String.valueOf(Long.MAX_VALUE));
313+
this.brokerProperties.forEach(brokerConfigProperties::put);
314+
if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
315+
brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
316+
}
317+
if (!userLogDir) {
318+
logDir(brokerConfigProperties);
319+
}
320+
KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM);
321+
this.kafkaServers.add(server);
322+
if (this.kafkaPorts[i] == 0) {
323+
this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT);
324+
}
315325
}
316-
KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM);
317-
this.kafkaServers.add(server);
318-
if (this.kafkaPorts[i] == 0) {
319-
this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT);
326+
createKafkaTopics(this.topics);
327+
if (this.brokerListProperty == null) {
328+
this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
320329
}
330+
if (this.brokerListProperty != null) {
331+
System.setProperty(this.brokerListProperty, getBrokersAsString());
332+
}
333+
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
334+
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
321335
}
322-
createKafkaTopics(this.topics);
323-
if (this.brokerListProperty == null) {
324-
this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
325-
}
326-
if (this.brokerListProperty != null) {
327-
System.setProperty(this.brokerListProperty, getBrokersAsString());
328-
}
329-
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
330-
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
331336
}
332337

333338
private void logDir(Properties brokerConfigProperties) {

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.kafka.test.EmbeddedKafkaBroker;
3030
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
3131
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
32+
import org.springframework.test.context.aot.DisabledInAotMode;
3233

3334
/**
3435
* Annotation that can be specified on a test class that runs Spring for Apache Kafka
@@ -72,6 +73,7 @@
7273
@Retention(RetentionPolicy.RUNTIME)
7374
@Documented
7475
@Inherited
76+
@DisabledInAotMode
7577
public @interface EmbeddedKafka {
7678

7779
/**

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.util.Properties;
2525

2626
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
27+
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
2728
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
29+
import org.springframework.beans.factory.support.RootBeanDefinition;
2830
import org.springframework.context.ConfigurableApplicationContext;
2931
import org.springframework.core.env.ConfigurableEnvironment;
3032
import org.springframework.core.io.Resource;
@@ -124,9 +126,11 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
124126
embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty());
125127
}
126128

127-
beanFactory.initializeBean(embeddedKafkaBroker, EmbeddedKafkaBroker.BEAN_NAME);
128-
beanFactory.registerSingleton(EmbeddedKafkaBroker.BEAN_NAME, embeddedKafkaBroker);
129-
((DefaultSingletonBeanRegistry) beanFactory).registerDisposableBean(EmbeddedKafkaBroker.BEAN_NAME, embeddedKafkaBroker);
129+
// Safe to start an embedded broker eagerly before context refresh
130+
embeddedKafkaBroker.afterPropertiesSet();
131+
132+
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(EmbeddedKafkaBroker.BEAN_NAME,
133+
new RootBeanDefinition(EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker));
130134
}
131135

132136
private int[] setupPorts() {

spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,13 @@
1717
package org.springframework.kafka.test.context;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.mockito.BDDMockito.given;
21-
import static org.mockito.Mockito.mock;
2220

2321
import org.junit.jupiter.api.BeforeEach;
2422
import org.junit.jupiter.api.Test;
2523

26-
import org.springframework.beans.BeansException;
27-
import org.springframework.beans.factory.DisposableBean;
28-
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
2924
import org.springframework.context.ConfigurableApplicationContext;
25+
import org.springframework.context.support.GenericApplicationContext;
3026
import org.springframework.core.annotation.AnnotationUtils;
31-
import org.springframework.core.env.ConfigurableEnvironment;
3227
import org.springframework.kafka.test.EmbeddedKafkaBroker;
3328
import org.springframework.kafka.test.utils.KafkaTestUtils;
3429

@@ -75,15 +70,14 @@ void testPorts() {
7570
EmbeddedKafka annotationWithPorts =
7671
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaPorts.class, EmbeddedKafka.class);
7772
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
78-
ConfigurableApplicationContext context = mock(ConfigurableApplicationContext.class);
79-
BeanFactoryStub factoryStub = new BeanFactoryStub();
80-
given(context.getBeanFactory()).willReturn(factoryStub);
81-
given(context.getEnvironment()).willReturn(mock(ConfigurableEnvironment.class));
73+
ConfigurableApplicationContext context = new GenericApplicationContext();
8274
customizer.customizeContext(context, null);
75+
context.refresh();
8376

84-
assertThat(factoryStub.getBroker().getBrokersAsString())
77+
EmbeddedKafkaBroker embeddedKafkaBroker = context.getBean(EmbeddedKafkaBroker.class);
78+
assertThat(embeddedKafkaBroker.getBrokersAsString())
8579
.isEqualTo("127.0.0.1:" + annotationWithPorts.ports()[0]);
86-
assertThat(KafkaTestUtils.getPropertyValue(factoryStub.getBroker(), "brokerListProperty"))
80+
assertThat(KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerListProperty"))
8781
.isEqualTo("my.bss.prop");
8882
}
8983

@@ -92,14 +86,12 @@ void testMulti() {
9286
EmbeddedKafka annotationWithPorts =
9387
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaMulti.class, EmbeddedKafka.class);
9488
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
95-
ConfigurableApplicationContext context = mock(ConfigurableApplicationContext.class);
96-
BeanFactoryStub factoryStub = new BeanFactoryStub();
97-
given(context.getBeanFactory()).willReturn(factoryStub);
98-
given(context.getEnvironment()).willReturn(mock(ConfigurableEnvironment.class));
89+
ConfigurableApplicationContext context = new GenericApplicationContext();
9990
customizer.customizeContext(context, null);
91+
context.refresh();
10092

101-
assertThat(factoryStub.getBroker().getBrokersAsString())
102-
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
93+
assertThat(context.getBean(EmbeddedKafkaBroker.class).getBrokersAsString())
94+
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
10395
}
10496

10597

@@ -122,31 +114,5 @@ private class TestWithEmbeddedKafkaPorts {
122114
private class TestWithEmbeddedKafkaMulti {
123115

124116
}
125-
@SuppressWarnings("serial")
126-
private class BeanFactoryStub extends DefaultListableBeanFactory {
127-
128-
private Object bean;
129-
130-
public EmbeddedKafkaBroker getBroker() {
131-
return (EmbeddedKafkaBroker) bean;
132-
}
133-
134-
@Override
135-
public Object initializeBean(Object existingBean, String beanName) throws BeansException {
136-
this.bean = existingBean;
137-
return bean;
138-
}
139-
140-
@Override
141-
public void registerSingleton(String beanName, Object singletonObject) {
142-
143-
}
144-
145-
@Override
146-
public void registerDisposableBean(String beanName, DisposableBean bean) {
147-
148-
}
149-
150-
}
151117

152118
}

0 commit comments

Comments
 (0)