Skip to content

Commit abf2bfe

Browse files
pawellozinskigaryrussell
authored andcommitted
Make ZK client timeouts configurable (#1380)
* Make ZK client timeouts configurable * Wire ZK clients timeouts to @embeddedkafka properties * Splitting the ZK client timeout properties, @author * @SInCE on new API
1 parent 0001d20 commit abf2bfe

File tree

4 files changed

+58
-9
lines changed

4 files changed

+58
-9
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -80,6 +80,7 @@
8080
* @author Kamill Sokol
8181
* @author Elliot Kennedy
8282
* @author Nakul Mishra
83+
* @author Pawel Lozinski
8384
*
8485
* @since 2.2
8586
*/
@@ -103,9 +104,9 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
103104

104105
private static final Duration DEFAULT_ADMIN_TIMEOUT = Duration.ofSeconds(10);
105106

106-
private static final int ZK_CONNECTION_TIMEOUT = 6000;
107+
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = 6000;
107108

108-
private static final int ZK_SESSION_TIMEOUT = 6000;
109+
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 6000;
109110

110111
private final int count;
111112

@@ -129,6 +130,10 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
129130

130131
private Duration adminTimeout = DEFAULT_ADMIN_TIMEOUT;
131132

133+
private int zkConnectionTimeout = DEFAULT_ZK_CONNECTION_TIMEOUT;
134+
135+
private int zkSessionTimeout = DEFAULT_ZK_SESSION_TIMEOUT;
136+
132137
private String brokerListProperty;
133138

134139
private volatile ZooKeeperClient zooKeeperClient;
@@ -254,6 +259,28 @@ public void setZkPort(int zkPort) {
254259
this.zkPort = zkPort;
255260
}
256261

262+
/**
263+
* Set connection timeout for the client to the embedded Zookeeper.
264+
* @param zkConnectionTimeout the connection timeout,
265+
* @return the {@link EmbeddedKafkaBroker}.
266+
* @since 2.4
267+
*/
268+
public EmbeddedKafkaBroker zkConnectionTimeout(int zkConnectionTimeout) {
269+
this.zkConnectionTimeout = zkConnectionTimeout;
270+
return this;
271+
}
272+
273+
/**
274+
* Set session timeout for the client to the embedded Zookeeper.
275+
* @param zkSessionTimeout the session timeout.
276+
* @return the {@link EmbeddedKafkaBroker}.
277+
* @since 2.4
278+
*/
279+
public EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout) {
280+
this.zkSessionTimeout = zkSessionTimeout;
281+
return this;
282+
}
283+
257284
@Override
258285
public void afterPropertiesSet() {
259286
try {
@@ -340,8 +367,8 @@ private void createKafkaTopics(Set<String> topicsToCreate) {
340367
doWithAdmin(admin -> {
341368
createTopics(admin,
342369
topicsToCreate.stream()
343-
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
344-
.collect(Collectors.toList()));
370+
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
371+
.collect(Collectors.toList()));
345372
});
346373
}
347374

@@ -441,7 +468,7 @@ public EmbeddedZookeeper getZookeeper() {
441468
@Deprecated
442469
public synchronized ZkClient getZkClient() {
443470
if (this.zkClient == null) {
444-
this.zkClient = new ZkClient(this.zkConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
471+
this.zkClient = new ZkClient(this.zkConnect, this.zkSessionTimeout, this.zkConnectionTimeout,
445472
ZKStringSerializer$.MODULE$);
446473
}
447474
return this.zkClient;
@@ -454,7 +481,7 @@ public synchronized ZkClient getZkClient() {
454481
*/
455482
public synchronized ZooKeeperClient getZooKeeperClient() {
456483
if (this.zooKeeperClient == null) {
457-
this.zooKeeperClient = new ZooKeeperClient(this.zkConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
484+
this.zooKeeperClient = new ZooKeeperClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout,
458485
1, Time.SYSTEM, "embeddedKafkaZK", "embeddedKafkaZK");
459486
}
460487
return this.zooKeeperClient;

spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
*
5151
* @author Gary Russell
5252
* @author Artem Bilan
53+
* @author Pawel Lozinski
5354
*
5455
* @since 2.3
5556
*
@@ -121,7 +122,9 @@ private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
121122
}
122123
broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(), embedded.topics())
123124
.zkPort(embedded.zookeeperPort())
124-
.kafkaPorts(ports);
125+
.kafkaPorts(ports)
126+
.zkConnectionTimeout(embedded.zkConnectionTimeout())
127+
.zkSessionTimeout(embedded.zkSessionTimeout());
125128
Properties properties = new Properties();
126129

127130
for (String pair : embedded.brokerProperties()) {

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.junit.jupiter.api.extension.ExtendWith;
2727

2828
import org.springframework.core.annotation.AliasFor;
29+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
2930
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
3031

3132
/**
@@ -57,6 +58,7 @@
5758
* @author Zach Olauson
5859
* @author Gary Russell
5960
* @author Sergio Lourenco
61+
* @author Pawel Lozinski
6062
*
6163
* @since 1.3
6264
*
@@ -155,5 +157,19 @@
155157
*/
156158
String bootstrapServersProperty() default "";
157159

160+
/**
161+
* Timeout for internal ZK client connection.
162+
* @return default {@link EmbeddedKafkaBroker#DEFAULT_ZK_CONNECTION_TIMEOUT}.
163+
* @since 2.4
164+
*/
165+
int zkConnectionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_CONNECTION_TIMEOUT;
166+
167+
/**
168+
* Timeout for internal ZK client session.
169+
* @return default {@link EmbeddedKafkaBroker#DEFAULT_ZK_SESSION_TIMEOUT}.
170+
* @since 2.4
171+
*/
172+
int zkSessionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_SESSION_TIMEOUT;
173+
158174
}
159175

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
* @author Zach Olauson
4343
* @author Oleg Artyomov
4444
* @author Sergio Lourenco
45+
* @author Pawel Lozinski
4546
*
4647
* @since 1.3
4748
*/
@@ -75,7 +76,9 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
7576
this.embeddedKafka.partitions(),
7677
topics)
7778
.kafkaPorts(ports)
78-
.zkPort(this.embeddedKafka.zookeeperPort());
79+
.zkPort(this.embeddedKafka.zookeeperPort())
80+
.zkConnectionTimeout(this.embeddedKafka.zkConnectionTimeout())
81+
.zkSessionTimeout(this.embeddedKafka.zkSessionTimeout());
7982

8083
Properties properties = new Properties();
8184

0 commit comments

Comments
 (0)