Skip to content

Commit c30918a

Browse files
adamosloizoumanub
authored andcommitted
Introducing support for Kafka streams (#39)
* Moved embedded kafka to its own sub module. Changed root project to aggregation project in preparation for introducing kafka-streams subproject. * Introduced kafka-streams module for testing Kafka Streams using Embedded Kafka. Introduced EmbeddedKafkaStreams and EmbeddedKafkaStreamsAllInOne traits for easy testing. Introduced Consumers trait for easy creation and consumption of test messages. * Disabling verbose logging for tests to make it easier to trace failed tests in build.
1 parent 0bf5226 commit c30918a

File tree

21 files changed

+435
-11
lines changed

21 files changed

+435
-11
lines changed

README.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,73 @@ It is possible to create producers for custom types in two ways:
9494

9595
For more information about how to use the utility methods, you can either look at the Scaladocs or at the tests of this project.
9696

97+
## Custom consumers
98+
99+
Use the `Consumer` trait that easily creates consumers of arbitrary key-value types and manages their lifecycle (via a loaner pattern).
100+
* For basic String consumption use `Consumer.withStringConsumer { your code here }`.
101+
* For arbitrary key and value types, expose implicit `Deserializer`s for each type and use `Consumer.withConsumer { your code here }`.
102+
* If you just want to create a consumer and manage its lifecycle yourself then try `Consumer.newConsumer()`.
103+
104+
105+
## Easy message consumption
106+
With `ConsumerExtensions` you can turn a consumer to a Scala lazy Stream of key-value pairs and treat it as a collection for easy assertion.
107+
* Just import the extensions.
108+
* On any `KafkaConsumer` instance you can now do:
109+
110+
```scala
111+
import net.manub.embeddedkafka.ConsumerExtensions._
112+
...
113+
consumer.consumeLazily("from-this-topic").take(3).toList should be (Seq(
114+
"1" -> "one",
115+
"2" -> "two",
116+
"3" -> "three"
117+
)
118+
```
119+
120+
121+
# scalatest-embedded-kafka-streams
122+
123+
A library that builds on top of `scalatest-embedded-kafka` to offer easy testing of [Kafka Streams](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams) with ScalaTest.
124+
It uses Kafka Streams 0.10.0.1.
125+
It takes care of instantiating and starting your streams as well as closing them after running your test-case code.
126+
127+
## How to use
128+
129+
* In your `build.sbt` file add the following dependency: `"net.manub" %% "scalatest-embedded-kafka-streams" % "0.8.1" % "test"`
130+
* Have a look at the [example test](kafka-streams/src/test/scala/net/manub/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala)
131+
* For most of the cases have your `Spec` extend the `EmbeddedKafkaStreamsAllInOne` trait. This offers both streams management and easy creation of consumers for asserting resulting messages in output/sink topics.
132+
* If you only want to use the streams management without the test consumers just have the `Spec` extend the `EmbeddedKafkaStreams` trait.
133+
* Use the `runStreamsWithStringConsumer` to:
134+
* Create any topics that need to exist for the strems to operate (usually sources and sinks).
135+
* Pass the Stream or Topology builder that will then be used to instantiate and start the Kafka Streams. This will be done while using the `withRunningKafka` closure internally so that your stream runs with an embedded Kafka and Zookeeper.
136+
* Pass the `{code block}` that needs a running instance of your streams. This is where your actual test code will sit. You can publish messages to your source topics and consume messages from your sink topics that the Kafka Streams should have generated. This method also offers a pre-instantiated consumer that can read String keys and values.
137+
* For more flexibility, use `runStreams` and `withConsumer`. This allows you to create your own consumers of custom types as seen in the [example test](kafka-streams/src/test/scala/net/manub/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala).
138+
139+
```scala
140+
import net.manub.embeddedkafka.ConsumerExtensions._
141+
import org.apache.kafka.streams.kstream.KStreamBuilder
142+
import org.scalatest.{Matchers, WordSpec}
143+
144+
class MySpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
145+
"my kafka stream" should {
146+
"be easy to test" in {
147+
val inputTopic = "input-topic"
148+
val outputTopic = "output-topic"
149+
// your code for building the stream goes here e.g.
150+
val streamBuilder = new KStreamBuilder
151+
streamBuilder.stream(inputTopic).to(outputTopic)
152+
// tell the stream test
153+
// 1. what topics need to be created before the stream starts
154+
// 2. the builder to be used for initializing and starting the stream
155+
runStreamsWithStringConsumer(
156+
topicsToCreate = Seq(inputTopic, outputTopic),
157+
builder = streamBuilder
158+
){ consumer =>
159+
// your test code goes here
160+
publishToKafka(inputTopic, key = "hello", message = "world")
161+
consumer.consumeLazily(outputTopic).head should be ("hello" -> "world")
162+
}
163+
}
164+
}
165+
}
166+
```

build.sbt

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
11
import sbtrelease.Version
22

3+
val kafkaVersion = "0.10.0.1"
4+
35
val slf4jLog4jOrg = "org.slf4j"
46
val slf4jLog4jArtifact = "slf4j-log4j12"
57

68
lazy val commonSettings = Seq(
7-
name := "scalatest-embedded-kafka",
89
organization := "net.manub",
910
scalaVersion := "2.11.8",
1011
crossScalaVersions := Seq("2.10.6", "2.11.8"),
1112
homepage := Some(url("https://github.com/manub/scalatest-embedded-kafka")),
1213
parallelExecution in Test := false,
13-
libraryDependencies ++= Seq(
14-
"org.scalatest" %% "scalatest" % "3.0.0",
15-
"org.apache.kafka" %% "kafka" % "0.10.0.1" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
16-
"org.apache.zookeeper" % "zookeeper" % "3.4.7" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
17-
"org.apache.avro" % "avro" % "1.7.7" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
18-
"com.typesafe.akka" %% "akka-actor" % "2.3.14" % Test,
19-
"com.typesafe.akka" %% "akka-testkit" % "2.3.14" % Test
20-
)
14+
logBuffered in Test := false,
15+
fork in Test := true
16+
)
17+
18+
lazy val commonLibrarySettings = libraryDependencies ++= Seq(
19+
"org.scalatest" %% "scalatest" % "3.0.0",
20+
"org.apache.kafka" %% "kafka" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
21+
"org.apache.zookeeper" % "zookeeper" % "3.4.7" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
22+
"org.apache.avro" % "avro" % "1.7.7" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
23+
"com.typesafe.akka" %% "akka-actor" % "2.3.14" % Test,
24+
"com.typesafe.akka" %% "akka-testkit" % "2.3.14" % Test
2125
)
2226

2327
lazy val publishSettings = Seq(
@@ -47,6 +51,25 @@ lazy val releaseSettings = Seq(
4751
)
4852

4953
lazy val root = (project in file("."))
54+
.settings(name := "scalatest-embedded-kafka-root")
55+
.settings(commonSettings: _*)
56+
.aggregate(embeddedKafka, kafkaStreams)
57+
58+
59+
lazy val embeddedKafka = (project in file("embedded-kafka"))
60+
.settings(name := "scalatest-embedded-kafka")
61+
.settings(publishSettings: _*)
62+
.settings(commonSettings: _*)
63+
.settings(commonLibrarySettings)
64+
.settings(releaseSettings: _*)
65+
66+
lazy val kafkaStreams = (project in file("kafka-streams"))
67+
.settings(name := "scalatest-embedded-kafka-streams")
5068
.settings(publishSettings: _*)
5169
.settings(commonSettings: _*)
70+
.settings(commonLibrarySettings)
5271
.settings(releaseSettings: _*)
72+
.settings(libraryDependencies ++= Seq(
73+
"org.apache.kafka" % "kafka-streams" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact)
74+
))
75+
.dependsOn(embeddedKafka)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package net.manub.embeddedkafka
2+
3+
import org.apache.kafka.clients.consumer.KafkaConsumer
4+
import org.apache.kafka.common.KafkaException
5+
import org.apache.log4j.Logger
6+
7+
import scala.util.Try
8+
9+
/** Method extensions for Kafka's [[KafkaConsumer]] API allowing easy testing.*/
10+
object ConsumerExtensions {
11+
val MaximumAttempts = 3
12+
implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) {
13+
14+
private val logger = Logger.getLogger(classOf[ConsumerOps[K, V]])
15+
16+
/** Consume messages from a given topic and return them as a lazily evaluated Scala Stream.
17+
* Depending on how many messages are taken from the Scala Stream it will try up to 3 times
18+
* to consume batches from the given topic, until it reaches the number of desired messages or
19+
* return otherwise.
20+
*
21+
* @param topic the topic from which to consume messages
22+
* @return the stream of consumed messages that you can do `.take(n: Int).toList`
23+
* to evaluate the requested number of messages.
24+
*/
25+
def consumeLazily(topic: String): Stream[(K, V)] = {
26+
val attempts = 1 to MaximumAttempts
27+
attempts.toStream.flatMap { attempt =>
28+
val batch: Seq[(K, V)] = getNextBatch(topic)
29+
logger.debug(s"----> Batch $attempt ($topic) | ${batch.mkString("|")}")
30+
batch
31+
}
32+
}
33+
34+
/** Get the next batch of messages from Kafka.
35+
*
36+
* @param topic the topic to consume
37+
* @return the next batch of messages
38+
*/
39+
def getNextBatch(topic: String): Seq[(K, V)] = Try {
40+
import scala.collection.JavaConversions._
41+
consumer.subscribe(List(topic))
42+
consumer.partitionsFor(topic)
43+
val records = consumer.poll(2000)
44+
// use toList to force eager evaluation. toSeq is lazy
45+
records.iterator().toList.map(r => r.key -> r.value)
46+
}.recover {
47+
case ex: KafkaException => throw new KafkaUnavailableException(ex)
48+
}.get
49+
}
50+
}
51+
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package net.manub.embeddedkafka
2+
3+
import java.util.Properties
4+
5+
import org.apache.kafka.clients.consumer.KafkaConsumer
6+
import org.apache.kafka.common.serialization.Deserializer
7+
8+
/** Utility trait for easily creating Kafka consumers and accessing their consumed messages. */
9+
trait Consumers {
10+
/** Loaner pattern that allows running a code block with a newly created consumer.
11+
* The consumer's lifecycle will be automatically handled and closed at the end of the
12+
* given code block.
13+
*
14+
* @param block the code block to be executed with the instantiated consumer
15+
* passed as an argument
16+
* @tparam K the type of the consumer's Key
17+
* @tparam V the type of the consumer's Value
18+
* @tparam T the type of the block's returning result
19+
* @return the result of the executed block
20+
*/
21+
def withConsumer[K: Deserializer, V: Deserializer, T](block: KafkaConsumer[K, V] => T)
22+
(implicit config: EmbeddedKafkaConfig): T = {
23+
val consumer = newConsumer[K, V]()
24+
try {
25+
val result = block(consumer)
26+
result
27+
} finally {
28+
consumer.close()
29+
}
30+
}
31+
32+
/** Convenience alternative to `withConsumer` that offers a consumer for String keys and values.
33+
*
34+
* @param block the block to be executed with the consumer
35+
* @tparam T the type of the result of the code block
36+
* @return the code block result
37+
*/
38+
def withStringConsumer[T](block: KafkaConsumer[String, String] => T)
39+
(implicit config: EmbeddedKafkaConfig): T = {
40+
import net.manub.embeddedkafka.Codecs.stringDeserializer
41+
withConsumer(block)
42+
}
43+
44+
/** Create a new Kafka consumer.
45+
*
46+
* @tparam K the type of the consumer's Key
47+
* @tparam V the type of the consumer's Value
48+
* @return the new consumer
49+
*/
50+
def newConsumer[K: Deserializer, V: Deserializer]()
51+
(implicit config: EmbeddedKafkaConfig): KafkaConsumer[K, V] = {
52+
val props = new Properties()
53+
props.put("group.id", UUIDs.newUuid().toString)
54+
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
55+
props.put("auto.offset.reset", "earliest")
56+
57+
new KafkaConsumer[K, V](props, implicitly[Deserializer[K]], implicitly[Deserializer[V]])
58+
}
59+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package net.manub.embeddedkafka
2+
3+
import java.util.UUID
4+
5+
/** Utility object for creating unique test IDs.
6+
* Useful for separating IDs and directories across test cases.
7+
*/
8+
object UUIDs {
9+
/** Create a new unique ID.
10+
*
11+
* @return the unique ID
12+
*/
13+
def newUuid(): UUID = UUID.randomUUID()
14+
}

0 commit comments

Comments
 (0)