Skip to content

Commit 4143cd6

Browse files
committed
Added xan EmbeddedKafka object.
1 parent 4baa3d5 commit 4143cd6

File tree

5 files changed

+204
-94
lines changed

5 files changed

+204
-94
lines changed

README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,20 @@ scalatest-embedded-kafka is available on Bintray and Maven Central, compiled for
2323

2424
* In-memory Zookeeper and Kafka will be instantiated respectively on port 6000 and 6001 and automatically shutdown at the end of the test.
2525

26+
### Use without the `withRunningKafka` method
27+
28+
A `EmbeddedKafka` companion object is provided for usage without the `EmbeddedKafka` trait. Zookeeper and Kafka can be started an stopped in a programmatic way.
29+
30+
class MySpec extends WordSpec {
31+
32+
"runs with embedded kafka" should {
33+
34+
withRunningKafka {
35+
// ... code goes here
36+
}
37+
38+
}
39+
2640
## Configuration
2741

2842
It's possible to change the ports on which Zookeeper and Kafka are started by providing an implicit `EmbeddedKafkaConfig`
@@ -39,6 +53,8 @@ It's possible to change the ports on which Zookeeper and Kafka are started by pr
3953
4054
}
4155
56+
This works for both `withRunningKafka` and `EmbeddedKafka.start()`
57+
4258
## Utility methods
4359

4460
The `EmbeddedKafka` trait provides also some utility methods to interact with the embedded kafka, in order to set preconditions or verifications in your specs:
@@ -47,8 +63,14 @@ The `EmbeddedKafka` trait provides also some utility methods to interact with th
4763
4864
def consumeFirstMessageFrom(topic: String): String
4965
66+
## Custom producers
67+
68+
It is possible to create producers for custom types in two ways:
69+
70+
* Using the syntax `aKafkaProducer thatSerializesValuesWith classOf[Serializer[V]]`. This will return a `KafkaProducer[String, V]`
71+
* Using the syntax `aKafkaProducer[V]`. This will return a `KafkaProducer[String, V]`, using an implicit `Serializer[V]`.
5072

51-
For more information about how to use those method, you can either look at the Scaladocs or at the tests of this project.
73+
For more information about how to use the utility methods, you can either look at the Scaladocs or at the tests of this project.
5274

5375
## Badges
5476

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,51 @@ import scala.language.{higherKinds, postfixOps}
1919
import scala.reflect.io.Directory
2020
import scala.util.Try
2121

22-
trait EmbeddedKafka {
23-
22+
trait EmbeddedKafka extends EmbeddedKafkaSupport {
2423
this: Suite =>
24+
}
25+
26+
object EmbeddedKafka extends EmbeddedKafkaSupport {
27+
28+
private[this] var factory: Option[ServerCnxnFactory] = None
29+
private[this] var broker: Option[KafkaServer] = None
2530

31+
/**
32+
* Starts a ZooKeeper instance and a Kafka broker in memory.
33+
*
34+
* @param config an implicit [[EmbeddedKafkaConfig]]
35+
*/
36+
def start()(implicit config: EmbeddedKafkaConfig) = {
37+
factory = Option(startZooKeeper(config.zooKeeperPort))
38+
broker = Option(startKafka(config))
39+
}
40+
41+
/**
42+
* Stops the in memory ZooKeeper instance and Kafka broker.
43+
*/
44+
def stop(): Unit = {
45+
broker.foreach(_.shutdown)
46+
factory.foreach(_.shutdown())
47+
broker = None
48+
factory = None
49+
}
50+
51+
/**
52+
* Returns whether the in memory Kafka and Zookeeper are running.
53+
*/
54+
def isRunning: Boolean = factory.nonEmpty && broker.nonEmpty
55+
}
56+
57+
sealed trait EmbeddedKafkaSupport {
2658
val executorService = Executors.newFixedThreadPool(2)
2759
implicit val executionContext = ExecutionContext.fromExecutorService(executorService)
2860

2961
/**
30-
* Starts a ZooKeeper instance and a Kafka broker, then executes the body passed as a parameter.
31-
*
32-
* @param body the function to execute
33-
* @param config an implicit [[EmbeddedKafkaConfig]]
34-
*/
62+
* Starts a ZooKeeper instance and a Kafka broker, then executes the body passed as a parameter.
63+
*
64+
* @param body the function to execute
65+
* @param config an implicit [[EmbeddedKafkaConfig]]
66+
*/
3567
def withRunningKafka(body: => Unit)(implicit config: EmbeddedKafkaConfig) = {
3668

3769
val factory = startZooKeeper(config.zooKeeperPort)
@@ -45,28 +77,27 @@ trait EmbeddedKafka {
4577
}
4678
}
4779

48-
4980
/**
50-
* Publishes synchronously a message of type [[String]] to the running Kafka broker.
51-
*
52-
* @see [[EmbeddedKafka#publishToKafka]]
53-
* @param topic the topic to which publish the message (it will be auto-created)
54-
* @param message the [[String]] message to publish
55-
* @param config an implicit [[EmbeddedKafkaConfig]]
56-
* @throws KafkaUnavailableException if unable to connect to Kafka
57-
*/
81+
* Publishes synchronously a message of type [[String]] to the running Kafka broker.
82+
*
83+
* @see [[EmbeddedKafka#publishToKafka]]
84+
* @param topic the topic to which publish the message (it will be auto-created)
85+
* @param message the [[String]] message to publish
86+
* @param config an implicit [[EmbeddedKafkaConfig]]
87+
* @throws KafkaUnavailableException if unable to connect to Kafka
88+
*/
5889
def publishStringMessageToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit =
5990
publishToKafka(topic, message)(config, new StringSerializer)
6091

6192
/**
62-
* Publishes synchronously a message to the running Kafka broker.
63-
*
64-
* @param topic the topic to which publish the message (it will be auto-created)
65-
* @param message the message of type [[T]] to publish
66-
* @param config an implicit [[EmbeddedKafkaConfig]]
67-
* @param serializer an implicit [[Serializer]] for the type [[T]]
68-
* @throws KafkaUnavailableException if unable to connect to Kafka
69-
*/
93+
* Publishes synchronously a message to the running Kafka broker.
94+
*
95+
* @param topic the topic to which publish the message (it will be auto-created)
96+
* @param message the message of type [[T]] to publish
97+
* @param config an implicit [[EmbeddedKafkaConfig]]
98+
* @param serializer an implicit [[Serializer]] for the type [[T]]
99+
* @throws KafkaUnavailableException if unable to connect to Kafka
100+
*/
70101
@throws(classOf[KafkaUnavailableException])
71102
def publishToKafka[T](topic: String, message: T)
72103
(implicit config: EmbeddedKafkaConfig, serializer: Serializer[T]): Unit = {
@@ -92,20 +123,20 @@ trait EmbeddedKafka {
92123

93124

94125
/**
95-
* Consumes the first message available in a given topic, deserializing it as a String.
96-
*
97-
* @param topic the topic to consume a message from
98-
* @param config an implicit [[EmbeddedKafkaConfig]]
99-
* @param decoder an implicit [[Decoder]] for the type [[T]]
100-
* @return the first message consumed from the given topic, with a type [[T]]
101-
* @throws TimeoutException if unable to consume a message within 3 seconds
102-
* @throws KafkaUnavailableException if unable to connect to Kafka
103-
*/
126+
* Consumes the first message available in a given topic, deserializing it as a String.
127+
*
128+
* @param topic the topic to consume a message from
129+
* @param config an implicit [[EmbeddedKafkaConfig]]
130+
* @param decoder an implicit [[Decoder]] for the type [[T]]
131+
* @return the first message consumed from the given topic, with a type [[T]]
132+
* @throws TimeoutException if unable to consume a message within 3 seconds
133+
* @throws KafkaUnavailableException if unable to connect to Kafka
134+
*/
104135
@throws(classOf[TimeoutException])
105136
@throws(classOf[KafkaUnavailableException])
106137
def consumeFirstMessageFrom[T](topic: String)(implicit config: EmbeddedKafkaConfig, decoder: Decoder[T]): T = {
107138
val props = new Properties()
108-
props.put("group.id", s"embedded-kafka-spec-$suiteId")
139+
props.put("group.id", s"embedded-kafka-spec")
109140
props.put("zookeeper.connect", s"localhost:${config.zooKeeperPort}")
110141
props.put("auto.offset.reset", "smallest")
111142
props.put("zookeeper.connection.timeout.ms", "6000")
@@ -146,7 +177,7 @@ trait EmbeddedKafka {
146177
)
147178
}
148179

149-
private def startZooKeeper(zooKeeperPort: Int): ServerCnxnFactory = {
180+
def startZooKeeper(zooKeeperPort: Int): ServerCnxnFactory = {
150181
val zkLogsDir = Directory.makeTemp("zookeeper-logs")
151182
val tickTime = 2000
152183

@@ -158,7 +189,7 @@ trait EmbeddedKafka {
158189
factory
159190
}
160191

161-
private def startKafka(config: EmbeddedKafkaConfig): KafkaServer = {
192+
def startKafka(config: EmbeddedKafkaConfig): KafkaServer = {
162193
val kafkaLogDir = Directory.makeTemp("kafka")
163194

164195
val zkAddress = s"localhost:${config.zooKeeperPort}"
@@ -176,4 +207,4 @@ trait EmbeddedKafka {
176207
broker.startup()
177208
broker
178209
}
179-
}
210+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package net.manub.embeddedkafka
2+
3+
class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
4+
5+
"the EmbeddedKafka object" when {
6+
"invoking the start and stop methods" should {
7+
"start and stop Kafka and Zookeeper on the default ports" in {
8+
EmbeddedKafka.start()
9+
10+
kafkaIsAvailable()
11+
zookeeperIsAvailable()
12+
13+
EmbeddedKafka.stop()
14+
15+
kafkaIsNotAvailable()
16+
zookeeperIsNotAvailable()
17+
}
18+
19+
"start and stop Kafka and Zookeeper on different specified ports using an implicit configuration" in {
20+
implicit val config = EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 54321)
21+
EmbeddedKafka.start()
22+
23+
kafkaIsAvailable(12345)
24+
zookeeperIsAvailable(54321)
25+
26+
EmbeddedKafka.stop()
27+
}
28+
}
29+
30+
"invoking the isRunnning method" should {
31+
"return whether both Kafka and Zookeeper are running" in {
32+
EmbeddedKafka.start()
33+
EmbeddedKafka.isRunning shouldBe true
34+
EmbeddedKafka.stop()
35+
EmbeddedKafka.isRunning shouldBe false
36+
}
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)