@@ -15,7 +15,6 @@ import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1515import org .scalatest .Suite
1616
1717import scala .collection .JavaConverters ._
18- import scala .collection .mutable
1918import scala .collection .mutable .ListBuffer
2019import scala .concurrent .duration ._
2120import scala .concurrent .{ExecutionContext , ExecutionContextExecutorService , TimeoutException }
@@ -29,81 +28,107 @@ trait EmbeddedKafka extends EmbeddedKafkaSupport {
2928
3029object EmbeddedKafka extends EmbeddedKafkaSupport {
3130
32- private [this ] var factory : Option [ServerCnxnFactory ] = None
33- private [this ] var broker : Option [KafkaServer ] = None
34- private [this ] val logsDirs = mutable.Buffer .empty[Directory ]
31+ private [this ] var servers : Seq [EmbeddedServer ] = Seq .empty
3532
3633 /**
3734 * Starts a ZooKeeper instance and a Kafka broker in memory, using temporary directories for storing logs.
3835 * The log directories will be cleaned after calling the [[stop() ]] method or on JVM exit, whichever happens earlier.
3936 *
4037 * @param config an implicit [[EmbeddedKafkaConfig ]]
4138 */
42- def start ()(implicit config : EmbeddedKafkaConfig ): Unit = {
39+ def start ()(implicit config : EmbeddedKafkaConfig ): EmbeddedK = {
4340 val zkLogsDir = Directory .makeTemp(" zookeeper-logs" )
4441 val kafkaLogsDir = Directory .makeTemp(" kafka-logs" )
4542
46- factory = Option (startZooKeeper(config.zooKeeperPort, zkLogsDir))
47- broker = Option (startKafka(config, kafkaLogsDir))
43+ val factory = EmbeddedZ (startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir )
44+ val broker = EmbeddedK ( Option (factory), startKafka(config, kafkaLogsDir), kafkaLogsDir )
4845
49- logsDirs ++= Seq (zkLogsDir, kafkaLogsDir)
46+ servers :+= broker
47+ broker
5048 }
5149
5250 /**
5351 * Starts a Zookeeper instance in memory, storing logs in a specific location.
5452 *
5553 * @param zkLogsDir the path for the Zookeeper logs
5654 * @param config an implicit [[EmbeddedKafkaConfig ]]
55+ * @return an [[EmbeddedZ ]] server
5756 */
5857 def startZooKeeper (zkLogsDir : Directory )(
59- implicit config : EmbeddedKafkaConfig ): Unit = {
60- factory = Option (startZooKeeper(config.zooKeeperPort, zkLogsDir))
58+ implicit config : EmbeddedKafkaConfig ): EmbeddedZ = {
59+ val factory = EmbeddedZ (startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
60+ servers :+= factory
61+ factory
6162 }
6263
6364 /**
6465 * Starts a Kafka broker in memory, storing logs in a specific location.
6566 *
66- * @param kafkaLogDir the path for the Kafka logs
67- * @param config an implicit [[EmbeddedKafkaConfig ]]
67+ * @param kafkaLogsDir the path for the Kafka logs
68+ * @param config an implicit [[EmbeddedKafkaConfig ]]
69+ * @return an [[EmbeddedK ]] server
6870 */
69- def startKafka (kafkaLogDir : Directory )(
70- implicit config : EmbeddedKafkaConfig ): Unit = {
71- broker = Option (startKafka(config, kafkaLogDir))
71+ def startKafka (kafkaLogsDir : Directory )(
72+ implicit config : EmbeddedKafkaConfig ): EmbeddedK = {
73+ val broker = EmbeddedK (startKafka(config, kafkaLogsDir), kafkaLogsDir)
74+ servers :+= broker
75+ broker
7276 }
7377
7478 /**
75- * Stops the in memory ZooKeeper instance and Kafka broker , and deletes the log directories.
79+ * Stops all in memory ZooKeeper instances and Kafka brokers , and deletes the log directories.
7680 */
7781 def stop (): Unit = {
78- stopKafka()
79- stopZooKeeper()
80- logsDirs.foreach(_.deleteRecursively())
81- logsDirs.clear()
82+ servers.foreach(_.stop(true ))
83+ servers = Seq .empty
8284 }
8385
8486 /**
85- * Stops the in memory Zookeeper instance, preserving the logs directory.
87+ * Stops a specific [[EmbeddedServer ]] instance, and deletes the log directory.
88+ *
89+ * @param server the [[EmbeddedServer ]] to be stopped.
90+ */
91+ def stop (server : EmbeddedServer ): Unit = {
92+ server.stop(true )
93+ servers = servers.filter(x => x != server)
94+ }
95+
96+ /**
97+ * Stops all in memory Zookeeper instances, preserving the logs directories.
8698 */
8799 def stopZooKeeper (): Unit = {
88- factory.foreach(_.shutdown())
89- factory = None
100+ val factories = servers.toFilteredSeq[EmbeddedZ ](isEmbeddedZ)
101+
102+ factories
103+ .foreach(_.stop(false ))
104+
105+ servers = servers.filter(! factories.contains(_))
90106 }
91107
92108 /**
93- * Stops the in memory Kafka instance , preserving the logs directory .
109+ * Stops all in memory Kafka instances , preserving the logs directories .
94110 */
95111 def stopKafka (): Unit = {
96- broker.foreach { b =>
97- b.shutdown()
98- b.awaitShutdown()
99- }
100- broker = None
112+ val brokers = servers.toFilteredSeq[EmbeddedK ](isEmbeddedK)
113+
114+ brokers
115+ .foreach(_.stop(false ))
116+
117+ servers = servers.filter(! brokers.contains(_))
101118 }
102119
103120 /**
104- * Returns whether the in memory Kafka and Zookeeper are running.
121+ * Returns whether the in memory Kafka and Zookeeper are both running.
105122 */
106- def isRunning : Boolean = factory.nonEmpty && broker.nonEmpty
123+ def isRunning : Boolean = servers.toFilteredSeq[EmbeddedK ](isEmbeddedK).exists(_.factory.isDefined)
124+
125+ private def isEmbeddedK (server : EmbeddedServer ): Boolean = server.isInstanceOf [EmbeddedK ]
126+ private def isEmbeddedZ (server : EmbeddedServer ): Boolean = server.isInstanceOf [EmbeddedZ ]
127+
128+ implicit class ServerOps (servers : Seq [EmbeddedServer ]) {
129+ def toFilteredSeq [T <: EmbeddedServer ](filter : EmbeddedServer => Boolean ): Seq [T ] =
130+ servers.filter(filter).asInstanceOf [Seq [T ]]
131+ }
107132}
108133
109134sealed trait EmbeddedKafkaSupport {
0 commit comments