Skip to content

Commit db1ca60

Browse files
Make ZK client timeouts configurable (#1380)
* Make ZK client timeouts configurable * Wire ZK clients timeouts to @embeddedkafka properties * Splitting the ZK client timeout properties, @author * @SInCE on new API
1 parent f44193e commit db1ca60

File tree

4 files changed

+57
-8
lines changed

4 files changed

+57
-8
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,6 +77,7 @@
7777
* @author Kamill Sokol
7878
* @author Elliot Kennedy
7979
* @author Nakul Mishra
80+
* @author Pawel Lozinski
8081
*
8182
* @since 2.2
8283
*/
@@ -100,9 +101,9 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
100101

101102
private static final Duration DEFAULT_ADMIN_TIMEOUT = Duration.ofSeconds(10);
102103

103-
private static final int ZK_CONNECTION_TIMEOUT = 6000;
104+
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = 6000;
104105

105-
private static final int ZK_SESSION_TIMEOUT = 6000;
106+
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 6000;
106107

107108
private final int count;
108109

@@ -126,6 +127,10 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
126127

127128
private Duration adminTimeout = DEFAULT_ADMIN_TIMEOUT;
128129

130+
private int zkConnectionTimeout = DEFAULT_ZK_CONNECTION_TIMEOUT;
131+
132+
private int zkSessionTimeout = DEFAULT_ZK_SESSION_TIMEOUT;
133+
129134
private String brokerListProperty;
130135

131136
private volatile ZooKeeperClient zooKeeperClient;
@@ -249,6 +254,28 @@ public void setZkPort(int zkPort) {
249254
this.zkPort = zkPort;
250255
}
251256

257+
/**
258+
* Set connection timeout for the client to the embedded Zookeeper.
259+
* @param zkConnectionTimeout the connection timeout,
260+
* @return the {@link EmbeddedKafkaBroker}.
261+
* @since 2.4
262+
*/
263+
public EmbeddedKafkaBroker zkConnectionTimeout(int zkConnectionTimeout) {
264+
this.zkConnectionTimeout = zkConnectionTimeout;
265+
return this;
266+
}
267+
268+
/**
269+
* Set session timeout for the client to the embedded Zookeeper.
270+
* @param zkSessionTimeout the session timeout.
271+
* @return the {@link EmbeddedKafkaBroker}.
272+
* @since 2.4
273+
*/
274+
public EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout) {
275+
this.zkSessionTimeout = zkSessionTimeout;
276+
return this;
277+
}
278+
252279
@Override
253280
public void afterPropertiesSet() {
254281
try {
@@ -336,8 +363,8 @@ private void createKafkaTopics(Set<String> topicsToCreate) {
336363
doWithAdmin(admin -> {
337364
createTopics(admin,
338365
topicsToCreate.stream()
339-
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
340-
.collect(Collectors.toList()));
366+
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
367+
.collect(Collectors.toList()));
341368
});
342369
}
343370

@@ -428,7 +455,7 @@ public EmbeddedZookeeper getZookeeper() {
428455
*/
429456
public synchronized ZooKeeperClient getZooKeeperClient() {
430457
if (this.zooKeeperClient == null) {
431-
this.zooKeeperClient = new ZooKeeperClient(this.zkConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
458+
this.zooKeeperClient = new ZooKeeperClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout,
432459
1, Time.SYSTEM, "embeddedKafkaZK", "embeddedKafkaZK");
433460
}
434461
return this.zooKeeperClient;

spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
*
5151
* @author Gary Russell
5252
* @author Artem Bilan
53+
* @author Pawel Lozinski
5354
*
5455
* @since 2.3
5556
*
@@ -118,7 +119,9 @@ private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
118119
int[] ports = setupPorts(embedded);
119120
broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(), embedded.topics())
120121
.zkPort(embedded.zookeeperPort())
121-
.kafkaPorts(ports);
122+
.kafkaPorts(ports)
123+
.zkConnectionTimeout(embedded.zkConnectionTimeout())
124+
.zkSessionTimeout(embedded.zkSessionTimeout());
122125
Properties properties = new Properties();
123126

124127
for (String pair : embedded.brokerProperties()) {

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.junit.jupiter.api.extension.ExtendWith;
2727

2828
import org.springframework.core.annotation.AliasFor;
29+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
2930
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
3031

3132
/**
@@ -57,6 +58,7 @@
5758
* @author Zach Olauson
5859
* @author Gary Russell
5960
* @author Sergio Lourenco
61+
* @author Pawel Lozinski
6062
*
6163
* @since 1.3
6264
*
@@ -155,5 +157,19 @@
155157
*/
156158
String bootstrapServersProperty() default "";
157159

160+
/**
161+
* Timeout for internal ZK client connection.
162+
* @return default {@link EmbeddedKafkaBroker#DEFAULT_ZK_CONNECTION_TIMEOUT}.
163+
* @since 2.4
164+
*/
165+
int zkConnectionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_CONNECTION_TIMEOUT;
166+
167+
/**
168+
* Timeout for internal ZK client session.
169+
* @return default {@link EmbeddedKafkaBroker#DEFAULT_ZK_SESSION_TIMEOUT}.
170+
* @since 2.4
171+
*/
172+
int zkSessionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_SESSION_TIMEOUT;
173+
158174
}
159175

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
* @author Zach Olauson
4343
* @author Oleg Artyomov
4444
* @author Sergio Lourenco
45+
* @author Pawel Lozinski
4546
*
4647
* @since 1.3
4748
*/
@@ -72,7 +73,9 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
7273
this.embeddedKafka.partitions(),
7374
topics)
7475
.kafkaPorts(ports)
75-
.zkPort(this.embeddedKafka.zookeeperPort());
76+
.zkPort(this.embeddedKafka.zookeeperPort())
77+
.zkConnectionTimeout(this.embeddedKafka.zkConnectionTimeout())
78+
.zkSessionTimeout(this.embeddedKafka.zkSessionTimeout());
7679

7780
Properties properties = new Properties();
7881

0 commit comments

Comments
 (0)