Skip to content

Commit 910233e

Browse files
garyrussellartembilan
authored andcommitted
GH-1254: Configurable Embedded Zookeeper Port
Resolves #1254 - port `EmbeddedKafkaBroker` from scala and add ZK port configuration - add `zookeeperPort` to `@EmbeddedKafka`
1 parent 2115cf6 commit 910233e

File tree

7 files changed

+198
-16
lines changed

7 files changed

+198
-16
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.io.File;
22+
import java.io.IOException;
23+
import java.net.InetSocketAddress;
2124
import java.time.Duration;
2225
import java.util.ArrayList;
2326
import java.util.Arrays;
@@ -46,6 +49,9 @@
4649
import org.apache.kafka.common.TopicPartition;
4750
import org.apache.kafka.common.security.auth.SecurityProtocol;
4851
import org.apache.kafka.common.utils.Time;
52+
import org.apache.kafka.common.utils.Utils;
53+
import org.apache.zookeeper.server.NIOServerCnxnFactory;
54+
import org.apache.zookeeper.server.ZooKeeperServer;
4955

5056
import org.springframework.beans.factory.DisposableBean;
5157
import org.springframework.beans.factory.InitializingBean;
@@ -63,7 +69,7 @@
6369
import kafka.utils.CoreUtils;
6470
import kafka.utils.TestUtils;
6571
import kafka.utils.ZKStringSerializer$;
66-
import kafka.zk.EmbeddedZookeeper;
72+
import kafka.zk.ZkFourLetterWords;
6773

6874
/**
6975
* An embedded Kafka Broker(s) and Zookeeper manager.
@@ -114,6 +120,8 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
114120

115121
private String zkConnect;
116122

123+
private int zkPort;
124+
117125
private int[] kafkaPorts;
118126

119127
private Duration adminTimeout = DEFAULT_ADMIN_TIMEOUT;
@@ -190,6 +198,16 @@ public EmbeddedKafkaBroker kafkaPorts(int... ports) {
190198
return this;
191199
}
192200

201+
/**
202+
* Set an explicit port for the embedded Zookeeper.
203+
* @param port the port.
204+
* @return the {@link EmbeddedKafkaBroker}.
205+
* @since 2.3
206+
*/
207+
public EmbeddedKafkaBroker zkPort(int port) {
208+
this.zkPort = port;
209+
return this;
210+
}
193211
/**
194212
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
195213
* Default 30 seconds.
@@ -211,13 +229,36 @@ public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
211229
return this;
212230
}
213231

232+
/**
233+
* Get the port that the embedded Zookeeper is running on or will run on.
234+
* @param zkPort the port.
235+
* @since 2.3
236+
*/
237+
public int getZkPort() {
238+
return this.zookeeper != null ? this.zookeeper.getPort() : this.zkPort;
239+
}
240+
241+
/**
242+
* Set the port to run the embedded Zookeeper on (default random).
243+
* @param zkPort the port.
244+
* @since 2.3
245+
*/
246+
public void setZkPort(int zkPort) {
247+
this.zkPort = zkPort;
248+
}
249+
214250
@Override
215251
public void afterPropertiesSet() {
216-
this.zookeeper = new EmbeddedZookeeper();
252+
try {
253+
this.zookeeper = new EmbeddedZookeeper(this.zkPort);
254+
}
255+
catch (IOException | InterruptedException e) {
256+
throw new IllegalStateException("Failed to create embedded Zookeeper", e);
257+
}
217258
int zkConnectionTimeout = 6000; // NOSONAR magic #
218259
int zkSessionTimeout = 6000; // NOSONAR magic #
219260

220-
this.zkConnect = "127.0.0.1:" + this.zookeeper.port();
261+
this.zkConnect = "127.0.0.1:" + this.zookeeper.getPort();
221262
this.zookeeperClient = new ZkClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout,
222263
ZKStringSerializer$.MODULE$);
223264
this.kafkaServers.clear();
@@ -360,6 +401,7 @@ public void destroy() {
360401
}
361402
try {
362403
this.zookeeper.shutdown();
404+
this.zkConnect = null;
363405
}
364406
catch (Exception e) {
365407
// do nothing
@@ -510,4 +552,86 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
510552
logger.debug("Subscription Initiated");
511553
}
512554

555+
/**
556+
* Ported from scala to allow setting the port.
557+
*
558+
* @author Gary Russell
559+
* @since 2.3
560+
*/
561+
public static final class EmbeddedZookeeper {
562+
563+
private final NIOServerCnxnFactory factory;
564+
565+
private final ZooKeeperServer zookeeper;
566+
567+
private final int port;
568+
569+
private final File snapshotDir;
570+
571+
private final File logDir;
572+
573+
public EmbeddedZookeeper(int zkPort) throws IOException, InterruptedException {
574+
this.snapshotDir = TestUtils.tempDir();
575+
this.logDir = TestUtils.tempDir();
576+
int tickTime = 800; // allow a maxSessionTimeout of 20 * 800ms = 16 secs
577+
578+
System.setProperty("zookeeper.forceSync", "no"); // disable fsync to ZK txn
579+
// log in tests to avoid
580+
// timeout
581+
this.zookeeper = new ZooKeeperServer(this.snapshotDir, this.logDir, tickTime);
582+
this.factory = new NIOServerCnxnFactory();
583+
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", zkPort == 0 ? TestUtils.RandomPort() : zkPort);
584+
this.factory.configure(addr, 0);
585+
this.factory.startup(zookeeper);
586+
this.port = zookeeper.getClientPort();
587+
}
588+
589+
public int getPort() {
590+
return this.port;
591+
}
592+
593+
public File getSnapshotDir() {
594+
return this.snapshotDir;
595+
}
596+
597+
public File getLogDir() {
598+
return this.logDir;
599+
}
600+
601+
public void shutdown() throws IOException {
602+
// Also shuts down ZooKeeperServer
603+
try {
604+
this.factory.shutdown();
605+
}
606+
catch (Exception e) {
607+
logger.error(e, "ZK shutdown failed");
608+
}
609+
610+
int n = 0;
611+
while (n++ < 100) {
612+
try {
613+
ZkFourLetterWords.sendStat("127.0.0.1", port, 3000);
614+
Thread.sleep(100);
615+
}
616+
catch (@SuppressWarnings("unused") Exception e) {
617+
break;
618+
}
619+
}
620+
if (n == 100) {
621+
logger.debug("Zookeeper failed to stop");
622+
}
623+
624+
try {
625+
this.zookeeper.getZKDatabase().close();
626+
}
627+
catch (Exception e) {
628+
logger.error(e, "ZK db close failed");
629+
}
630+
631+
Utils.delete(this.logDir);
632+
Utils.delete(this.snapshotDir);
633+
}
634+
635+
}
636+
513637
}

spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ private boolean springTestContext(AnnotatedElement annotatedElement) {
115115
@SuppressWarnings("unchecked")
116116
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
117117
EmbeddedKafkaBroker broker;
118-
broker = new EmbeddedKafkaBroker(embedded.count(),
119-
embedded.controlledShutdown(), embedded.topics());
120-
broker.kafkaPorts(embedded.ports());
118+
broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(), embedded.topics())
119+
.zkPort(embedded.zookeeperPort())
120+
.kafkaPorts(embedded.ports());
121121
Properties properties = new Properties();
122122

123123
for (String pair : embedded.brokerProperties()) {

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@
9595
*/
9696
int[] ports() default {0};
9797

98+
/**
99+
* Set the port on which the embedded Zookeeper should listen;
100+
* @return the port.
101+
* @since 2.3
102+
*/
103+
int zookeeperPort() default 0;
104+
98105
/**
99106
* @return partitions per topic
100107
*/

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
6767
.toArray(String[]::new);
6868

6969
EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(this.embeddedKafka.count(),
70-
this.embeddedKafka.controlledShutdown(),
71-
this.embeddedKafka.partitions(),
72-
topics);
73-
74-
embeddedKafkaBroker.kafkaPorts(this.embeddedKafka.ports());
70+
this.embeddedKafka.controlledShutdown(),
71+
this.embeddedKafka.partitions(),
72+
topics)
73+
.kafkaPorts(this.embeddedKafka.ports())
74+
.zkPort(this.embeddedKafka.zookeeperPort());
7575

7676
Properties properties = new Properties();
7777

spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/EmbeddedKafkaRule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public EmbeddedKafkaRule kafkaPorts(int... kafkaPorts) {
9696
return this;
9797
}
9898

99+
public EmbeddedKafkaRule zkPort(int port) {
100+
this.embeddedKafka.setZkPort(port);
101+
return this;
102+
}
103+
99104
/**
100105
* Return an underlying delegator {@link EmbeddedKafkaBroker} instance.
101106
* @return the {@link EmbeddedKafkaBroker} instance.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.test;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
/**
24+
* @author Gary Russell
25+
* @since 2.3
26+
*
27+
*/
28+
public class EmbeddedKafkaBrokerTests {
29+
30+
@Test
31+
void testUpDown() {
32+
EmbeddedKafkaBroker kafka = new EmbeddedKafkaBroker(1);
33+
kafka.afterPropertiesSet();
34+
assertThat(kafka.getZookeeperConnectionString()).startsWith("127");
35+
kafka.destroy();
36+
assertThat(kafka.getZookeeperConnectionString()).isNull();
37+
}
38+
39+
}

spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,16 @@ public class AddressableEmbeddedBrokerTests {
6161

6262
@Test
6363
public void testKafkaEmbedded() {
64-
assertThat(broker.getBrokersAsString()).isEqualTo("127.0.0.1:" + this.config.port);
64+
assertThat(broker.getBrokersAsString()).isEqualTo("127.0.0.1:" + this.config.kafkaPort);
65+
assertThat(broker.getZkPort()).isEqualTo(this.config.zkPort);
6566
assertThat(broker.getBrokersAsString())
6667
.isEqualTo(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS));
6768
assertThat(broker.getZookeeperConnectionString())
6869
.isEqualTo(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT));
6970
}
7071

7172
@Test
72-
public void testLateStartedConsumer() throws Exception {
73+
public void testLateStartedConsumer() {
7374
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TEST_EMBEDDED, "false", this.broker);
7475
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
7576
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
@@ -93,16 +94,22 @@ public void testLateStartedConsumer() throws Exception {
9394
@Configuration
9495
public static class Config {
9596

96-
private int port;
97+
private int kafkaPort;
98+
99+
private int zkPort;
97100

98101
@Bean
99102
public EmbeddedKafkaBroker broker() throws IOException {
100103
ServerSocket ss = ServerSocketFactory.getDefault().createServerSocket(0);
101-
this.port = ss.getLocalPort();
104+
this.kafkaPort = ss.getLocalPort();
105+
ss.close();
106+
ss = ServerSocketFactory.getDefault().createServerSocket(0);
107+
this.zkPort = ss.getLocalPort();
102108
ss.close();
103109

104110
return new EmbeddedKafkaBroker(1, true, TEST_EMBEDDED)
105-
.kafkaPorts(this.port);
111+
.zkPort(this.zkPort)
112+
.kafkaPorts(this.kafkaPort);
106113
}
107114

108115
}

0 commit comments

Comments
 (0)