7070import kafka .utils .TestUtils ;
7171import kafka .utils .ZKStringSerializer$ ;
7272import kafka .zk .ZkFourLetterWords ;
73+ import kafka .zookeeper .ZooKeeperClient ;
7374
7475/**
7576 * An embedded Kafka Broker(s) and Zookeeper manager.
@@ -104,6 +105,10 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
104105
105106 private static final Duration DEFAULT_ADMIN_TIMEOUT = Duration .ofSeconds (10 );
106107
108+ private static final int ZK_CONNECTION_TIMEOUT = 6000 ;
109+
110+ private static final int ZK_SESSION_TIMEOUT = 6000 ;
111+
107112 private final int count ;
108113
109114 private final boolean controlledShutdown ;
@@ -118,8 +123,6 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
118123
119124 private EmbeddedZookeeper zookeeper ;
120125
121- private ZkClient zookeeperClient ;
122-
123126 private String zkConnect ;
124127
125128 private int zkPort ;
@@ -130,6 +133,10 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
130133
131134 private String brokerListProperty ;
132135
136+ private volatile ZooKeeperClient zooKeeperClient ;
137+
138+ private volatile ZkClient zkClient ;
139+
133140 public EmbeddedKafkaBroker (int count ) {
134141 this (count , false );
135142 }
@@ -257,12 +264,7 @@ public void afterPropertiesSet() {
257264 catch (IOException | InterruptedException e ) {
258265 throw new IllegalStateException ("Failed to create embedded Zookeeper" , e );
259266 }
260- int zkConnectionTimeout = 6000 ; // NOSONAR magic #
261- int zkSessionTimeout = 6000 ; // NOSONAR magic #
262-
263267 this .zkConnect = LOOPBACK + ":" + this .zookeeper .getPort ();
264- this .zookeeperClient = new ZkClient (this .zkConnect , zkSessionTimeout , zkConnectionTimeout ,
265- ZKStringSerializer$ .MODULE$ );
266268 this .kafkaServers .clear ();
267269 for (int i = 0 ; i < this .count ; i ++) {
268270 Properties brokerConfigProperties = createBrokerProperties (i );
@@ -396,7 +398,14 @@ public void destroy() {
396398 }
397399 }
398400 try {
399- this .zookeeperClient .close ();
401+ synchronized (this ) {
402+ if (this .zooKeeperClient != null ) {
403+ this .zooKeeperClient .close ();
404+ }
405+ if (this .zkClient != null ) {
406+ this .zkClient .close ();
407+ }
408+ }
400409 }
401410 catch (ZkInterruptedException e ) {
402411 // do nothing
@@ -426,8 +435,31 @@ public EmbeddedZookeeper getZookeeper() {
426435 return this .zookeeper ;
427436 }
428437
429- public ZkClient getZkClient () {
430- return this .zookeeperClient ;
438+ /**
439+ * Return the ZkClient.
440+ * @return the client.
441+ * @deprecated in favor of {@link #getZooKeeperClient()}.
442+ */
443+ @ Deprecated
444+ public synchronized ZkClient getZkClient () {
445+ if (this .zkClient == null ) {
446+ this .zkClient = new ZkClient (this .zkConnect , ZK_SESSION_TIMEOUT , ZK_CONNECTION_TIMEOUT ,
447+ ZKStringSerializer$ .MODULE$ );
448+ }
449+ return this .zkClient ;
450+ }
451+
452+ /**
453+ * Return the ZooKeeperClient.
454+ * @return the client.
455+ * @since 2.3.2
456+ */
457+ public synchronized ZooKeeperClient getZooKeeperClient () {
458+ if (this .zooKeeperClient == null ) {
459+ this .zooKeeperClient = new ZooKeeperClient (this .zkConnect , ZK_SESSION_TIMEOUT , ZK_CONNECTION_TIMEOUT ,
460+ 1 , Time .SYSTEM , "embeddedKafkaZK" , "embeddedKafkaZK" );
461+ }
462+ return this .zooKeeperClient ;
431463 }
432464
433465 public String getZookeeperConnectionString () {
0 commit comments