8686 */
8787public class EmbeddedKafkaBroker implements InitializingBean , DisposableBean {
8888
89+ private static final String LOOPBACK = "127.0.0.1" ;
90+
8991 private static final LogAccessor logger = new LogAccessor (LogFactory .getLog (EmbeddedKafkaBroker .class )); // NOSONAR
9092
9193 public static final String BEAN_NAME = "embeddedKafka" ;
@@ -258,7 +260,7 @@ public void afterPropertiesSet() {
258260 int zkConnectionTimeout = 6000 ; // NOSONAR magic #
259261 int zkSessionTimeout = 6000 ; // NOSONAR magic #
260262
261- this .zkConnect = "127.0.0.1 :" + this .zookeeper .getPort ();
263+ this .zkConnect = LOOPBACK + " :" + this .zookeeper .getPort ();
262264 this .zookeeperClient = new ZkClient (this .zkConnect , zkSessionTimeout , zkConnectionTimeout ,
263265 ZKStringSerializer$ .MODULE$ );
264266 this .kafkaServers .clear ();
@@ -434,13 +436,13 @@ public String getZookeeperConnectionString() {
434436
435437 public BrokerAddress getBrokerAddress (int i ) {
436438 KafkaServer kafkaServer = this .kafkaServers .get (i );
437- return new BrokerAddress ("127.0.0.1" , kafkaServer .config ().port ());
439+ return new BrokerAddress (LOOPBACK , kafkaServer .config ().port ());
438440 }
439441
440442 public BrokerAddress [] getBrokerAddresses () {
441443 List <BrokerAddress > addresses = new ArrayList <BrokerAddress >();
442444 for (int kafkaPort : this .kafkaPorts ) {
443- addresses .add (new BrokerAddress ("127.0.0.1" , kafkaPort ));
445+ addresses .add (new BrokerAddress (LOOPBACK , kafkaPort ));
444446 }
445447 return addresses .toArray (new BrokerAddress [0 ]);
446448 }
@@ -560,6 +562,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
560562 */
561563 public static final class EmbeddedZookeeper {
562564
565+ private static final int THREE_K = 3000 ;
566+
567+ private static final int HUNDRED = 100 ;
568+
569+ private static final int TICK_TIME = 800 ; // allow a maxSessionTimeout of 20 * 800ms = 16 secs
570+
563571 private final NIOServerCnxnFactory factory ;
564572
565573 private final ZooKeeperServer zookeeper ;
@@ -573,14 +581,12 @@ public static final class EmbeddedZookeeper {
573581 public EmbeddedZookeeper (int zkPort ) throws IOException , InterruptedException {
574582 this .snapshotDir = TestUtils .tempDir ();
575583 this .logDir = TestUtils .tempDir ();
576- int tickTime = 800 ; // allow a maxSessionTimeout of 20 * 800ms = 16 secs
577-
578584 System .setProperty ("zookeeper.forceSync" , "no" ); // disable fsync to ZK txn
579585 // log in tests to avoid
580586 // timeout
581- this .zookeeper = new ZooKeeperServer (this .snapshotDir , this .logDir , tickTime );
587+ this .zookeeper = new ZooKeeperServer (this .snapshotDir , this .logDir , TICK_TIME );
582588 this .factory = new NIOServerCnxnFactory ();
583- InetSocketAddress addr = new InetSocketAddress ("127.0.0.1" , zkPort == 0 ? TestUtils .RandomPort () : zkPort );
589+ InetSocketAddress addr = new InetSocketAddress (LOOPBACK , zkPort == 0 ? TestUtils .RandomPort () : zkPort );
584590 this .factory .configure (addr , 0 );
585591 this .factory .startup (zookeeper );
586592 this .port = zookeeper .getClientPort ();
@@ -608,16 +614,16 @@ public void shutdown() throws IOException {
608614 }
609615
610616 int n = 0 ;
611- while (n ++ < 100 ) {
617+ while (n ++ < HUNDRED ) {
612618 try {
613- ZkFourLetterWords .sendStat ("127.0.0.1" , this .port , 3000 );
614- Thread .sleep (100 );
619+ ZkFourLetterWords .sendStat (LOOPBACK , this .port , THREE_K );
620+ Thread .sleep (HUNDRED );
615621 }
616622 catch (@ SuppressWarnings ("unused" ) Exception e ) {
617623 break ;
618624 }
619625 }
620- if (n == 100 ) {
626+ if (n == HUNDRED ) {
621627 logger .debug ("Zookeeper failed to stop" );
622628 }
623629
0 commit comments