Skip to content

Commit 3fad193

Browse files
garyrussellartembilan
authored andcommitted
GH-1347: Fix @embeddedkafka with count > 1
Resolves #1347 When count > 1 with no port specification (default random port) tests failed because not enough ports were provided,
1 parent aec7807 commit 3fad193

File tree

5 files changed

+40
-13
lines changed

5 files changed

+40
-13
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -115,9 +115,13 @@ private boolean springTestContext(AnnotatedElement annotatedElement) {
115115
@SuppressWarnings("unchecked")
116116
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
117117
EmbeddedKafkaBroker broker;
118+
int[] ports = embedded.ports();
119+
if (embedded.count() > 1 && ports.length == 1 && ports[0] == 0) {
120+
ports = new int[embedded.count()];
121+
}
118122
broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(), embedded.topics())
119123
.zkPort(embedded.zookeeperPort())
120-
.kafkaPorts(embedded.ports());
124+
.kafkaPorts(ports);
121125
Properties properties = new Properties();
122126

123127
for (String pair : embedded.brokerProperties()) {

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -93,7 +93,7 @@
9393
* @return ports for brokers.
9494
* @since 2.2.4
9595
*/
96-
int[] ports() default {0};
96+
int[] ports() default { 0 };
9797

9898
/**
9999
* Set the port on which the embedded Zookeeper should listen;

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,11 +66,15 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
6666
.map(environment::resolvePlaceholders)
6767
.toArray(String[]::new);
6868

69+
int[] ports = this.embeddedKafka.ports();
70+
if (this.embeddedKafka.count() > 1 && ports.length == 1 && ports[0] == 0) {
71+
ports = new int[this.embeddedKafka.count()];
72+
}
6973
EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(this.embeddedKafka.count(),
7074
this.embeddedKafka.controlledShutdown(),
7175
this.embeddedKafka.partitions(),
7276
topics)
73-
.kafkaPorts(this.embeddedKafka.ports())
77+
.kafkaPorts(ports)
7478
.zkPort(this.embeddedKafka.zookeeperPort());
7579

7680
Properties properties = new Properties();

spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@
2929
* @since 2.3
3030
*
3131
*/
32-
@EmbeddedKafka(bootstrapServersProperty = "my.bss.property")
32+
@EmbeddedKafka(bootstrapServersProperty = "my.bss.property", count = 2)
3333
public class EmbeddedKafkaConditionTests {
3434

3535
@Test

spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,30 +48,30 @@ public class EmbeddedKafkaContextCustomizerTests {
4848
private EmbeddedKafka annotationFromSecondClass;
4949

5050
@BeforeEach
51-
public void beforeEachTest() {
51+
void beforeEachTest() {
5252
annotationFromFirstClass = AnnotationUtils.findAnnotation(TestWithEmbeddedKafka.class, EmbeddedKafka.class);
5353
annotationFromSecondClass =
5454
AnnotationUtils.findAnnotation(SecondTestWithEmbeddedKafka.class, EmbeddedKafka.class);
5555
}
5656

5757

5858
@Test
59-
public void testHashCode() {
59+
void testHashCode() {
6060
assertThat(new EmbeddedKafkaContextCustomizer(annotationFromFirstClass).hashCode()).isNotEqualTo(0);
6161
assertThat(new EmbeddedKafkaContextCustomizer(annotationFromFirstClass).hashCode())
6262
.isEqualTo(new EmbeddedKafkaContextCustomizer(annotationFromSecondClass).hashCode());
6363
}
6464

6565

6666
@Test
67-
public void testEquals() {
67+
void testEquals() {
6868
assertThat(new EmbeddedKafkaContextCustomizer(annotationFromFirstClass))
6969
.isEqualTo(new EmbeddedKafkaContextCustomizer(annotationFromSecondClass));
7070
assertThat(new EmbeddedKafkaContextCustomizer(annotationFromFirstClass)).isNotEqualTo(new Object());
7171
}
7272

7373
@Test
74-
public void testPorts() {
74+
void testPorts() {
7575
EmbeddedKafka annotationWithPorts =
7676
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaPorts.class, EmbeddedKafka.class);
7777
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
@@ -87,6 +87,21 @@ public void testPorts() {
8787
.isEqualTo("my.bss.prop");
8888
}
8989

90+
@Test
91+
void testMulti() {
92+
EmbeddedKafka annotationWithPorts =
93+
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaMulti.class, EmbeddedKafka.class);
94+
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
95+
ConfigurableApplicationContext context = mock(ConfigurableApplicationContext.class);
96+
BeanFactoryStub factoryStub = new BeanFactoryStub();
97+
given(context.getBeanFactory()).willReturn(factoryStub);
98+
given(context.getEnvironment()).willReturn(mock(ConfigurableEnvironment.class));
99+
customizer.customizeContext(context, null);
100+
101+
assertThat(factoryStub.getBroker().getBrokersAsString())
102+
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
103+
}
104+
90105

91106
@EmbeddedKafka
92107
private class TestWithEmbeddedKafka {
@@ -103,6 +118,10 @@ private class TestWithEmbeddedKafkaPorts {
103118

104119
}
105120

121+
@EmbeddedKafka(count = 2)
122+
private class TestWithEmbeddedKafkaMulti {
123+
124+
}
106125
@SuppressWarnings("serial")
107126
private class BeanFactoryStub extends DefaultListableBeanFactory {
108127

0 commit comments

Comments
 (0)