Skip to content

Commit 92435b3

Browse files
crankydillomanub
authored andcommitted
143 Kafka should be configured to point to the launched zookeeper (#145)
instance. Previously, if 0 was passed for both kafka and zookeeper ports, zookeeper would start on an available port; however, kafka was told that zookeeper was listening on port 0. I also removed the implicit config from `EmbeddedK`. It didn't look like it was being used anywhere... However, I added it as a required argument. This was done to allow Java code like this: ```java int kafkaPort = 0; int zookeeperPort = 0; EmbeddedKafka$ kafka = EmbeddedKafka$.MODULE$; EmbeddedKafkaConfig cfg = new EmbeddedKafkaConfigImpl( kafkaPort, zookeeperPort, Map$.MODULE$.empty(), Map$.MODULE$.empty(), Map$.MODULE$.empty() ); EmbeddedK serverInfo = kafka.start(cfg); EmbeddedKafkaConfig actualCfg = serverInfo.config(); kafka.publishStringMessageToKafka("topic", "message", actualCfg); ```
1 parent 64d4eea commit 92435b3

File tree

3 files changed

+58
-10
lines changed

3 files changed

+58
-10
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,25 @@ object EmbeddedKafka extends EmbeddedKafkaSupport[EmbeddedKafkaConfig] {
6161
val factory =
6262
EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
6363

64-
val kafkaServer = startKafka(config, kafkaLogsDir)
64+
val zkPort = zookeeperPort(factory)
65+
66+
val configWithUsedZookeeperPort = EmbeddedKafkaConfig(
67+
config.kafkaPort,
68+
zkPort,
69+
config.customBrokerProperties,
70+
config.customProducerProperties,
71+
config.customConsumerProperties
72+
)
73+
val kafkaServer = startKafka(configWithUsedZookeeperPort, kafkaLogsDir)
74+
75+
val actualConfig = EmbeddedKafkaConfigImpl(kafkaPort(kafkaServer),
76+
zkPort,
77+
config.customBrokerProperties,
78+
config.customProducerProperties,
79+
config.customConsumerProperties)
80+
6581
val broker =
66-
EmbeddedK(Option(factory), kafkaServer, kafkaLogsDir)
82+
EmbeddedK(Option(factory), kafkaServer, kafkaLogsDir, actualConfig)
6783

6884
servers :+= broker
6985
broker
@@ -93,7 +109,8 @@ object EmbeddedKafka extends EmbeddedKafkaSupport[EmbeddedKafkaConfig] {
93109
*/
94110
def startKafka(kafkaLogsDir: Directory)(
95111
implicit config: EmbeddedKafkaConfig): EmbeddedK = {
96-
val broker = EmbeddedK(startKafka(config, kafkaLogsDir), kafkaLogsDir)
112+
val broker =
113+
EmbeddedK(startKafka(config, kafkaLogsDir), kafkaLogsDir, config)
97114
servers :+= broker
98115
broker
99116
}
@@ -168,6 +185,13 @@ object EmbeddedKafka extends EmbeddedKafkaSupport[EmbeddedKafkaConfig] {
168185
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> false.toString
169186
) ++ config.customConsumerProperties
170187

188+
private[embeddedkafka] def kafkaPort(kafkaServer: KafkaServer): Int =
189+
kafkaServer.boundPort(kafkaServer.config.listeners.head.listenerName)
190+
191+
private[embeddedkafka] def zookeeperPort(zk: EmbeddedZ): Int =
192+
zookeeperPort(zk.factory)
193+
private[embeddedkafka] def zookeeperPort(fac: ServerCnxnFactory): Int =
194+
fac.getLocalPort
171195
}
172196

173197
private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] {
@@ -221,10 +245,8 @@ private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] {
221245
zkPort,
222246
config.customBrokerProperties,
223247
kafkaLogsDir)
224-
val kafkaPort =
225-
broker.boundPort(broker.config.listeners.head.listenerName)
226248
val actualConfig =
227-
EmbeddedKafkaConfigImpl(kafkaPort,
249+
EmbeddedKafkaConfigImpl(EmbeddedKafka.kafkaPort(broker),
228250
zkPort,
229251
config.customBrokerProperties,
230252
config.customProducerProperties,

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedServer.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ private[embeddedkafka] trait EmbeddedServerWithKafka extends EmbeddedServer {
5151
*/
5252
case class EmbeddedK(factory: Option[EmbeddedZ],
5353
broker: KafkaServer,
54-
logsDirs: Directory)(implicit config: EmbeddedKafkaConfig)
54+
logsDirs: Directory,
55+
config: EmbeddedKafkaConfig)
5556
extends EmbeddedServerWithKafka {
5657

5758
/**
@@ -72,7 +73,8 @@ case class EmbeddedK(factory: Option[EmbeddedZ],
7273
}
7374

7475
object EmbeddedK {
75-
def apply(broker: KafkaServer, logsDirs: Directory)(
76-
implicit config: EmbeddedKafkaConfig): EmbeddedK =
77-
EmbeddedK(factory = None, broker, logsDirs)
76+
def apply(broker: KafkaServer,
77+
logsDirs: Directory,
78+
config: EmbeddedKafkaConfig): EmbeddedK =
79+
EmbeddedK(factory = None, broker, logsDirs, config)
7880
}

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaObjectSpec.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import org.apache.kafka.common.serialization.{
55
StringSerializer
66
}
77
import net.manub.embeddedkafka.EmbeddedKafka._
8+
import org.apache.kafka.common.network.ListenerName
9+
import org.apache.kafka.common.security.auth.SecurityProtocol
810

911
import scala.collection.JavaConverters._
1012
import scala.reflect.io.Directory
@@ -61,6 +63,28 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
6163
EmbeddedKafka.stop()
6264
}
6365

66+
"start and stop Kafka and Zookeeper successfully on arbitrary available ports" in {
67+
val someConfig =
68+
EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
69+
val kafka = EmbeddedKafka.start()(someConfig)
70+
71+
kafka.factory shouldBe defined
72+
73+
val usedZookeeperPort = EmbeddedKafka.zookeeperPort(kafka.factory.get)
74+
val usedKafkaPort = EmbeddedKafka.kafkaPort(kafka.broker)
75+
76+
kafkaIsAvailable(usedKafkaPort)
77+
zookeeperIsAvailable(usedZookeeperPort)
78+
79+
kafka.config.kafkaPort should be(usedKafkaPort)
80+
kafka.config.zooKeeperPort should be(usedZookeeperPort)
81+
82+
EmbeddedKafka.stop()
83+
84+
kafkaIsNotAvailable(usedKafkaPort)
85+
zookeeperIsNotAvailable(usedZookeeperPort)
86+
}
87+
6488
"start and stop multiple Kafka instances on specified ports" in {
6589
val someConfig =
6690
EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 32111)

0 commit comments

Comments
 (0)