Skip to content

Commit 9eded9f

Browse files
committed
Added a consumeFirstMessageFrom method.
1 parent c7813e9 commit 9eded9f

File tree

2 files changed

+88
-9
lines changed

2 files changed

+88
-9
lines changed

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,34 @@ package net.manub.embeddedkafka
22

33
import java.net.InetSocketAddress
44
import java.util.Properties
5+
import java.util.concurrent.Executors
56

7+
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
8+
import kafka.serializer.StringDecoder
69
import kafka.server.{KafkaConfig, KafkaServer}
710
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
811
import org.apache.kafka.common.serialization.StringSerializer
912
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1013
import org.scalatest.Suite
11-
import scala.collection.JavaConversions.mapAsJavaMap
1214

15+
import scala.collection.JavaConversions.mapAsJavaMap
16+
import scala.concurrent.duration._
17+
import scala.concurrent.{Await, ExecutionContext, Future}
18+
import scala.language.postfixOps
1319
import scala.reflect.io.Directory
1420

1521
trait EmbeddedKafka {
1622

1723
this: Suite =>
1824

25+
val executorService = Executors.newFixedThreadPool(2)
26+
implicit val executionContext = ExecutionContext.fromExecutorService(executorService)
27+
1928
/**
2029
* Starts a ZooKeeper instance and a Kafka broker, then executes the body passed as a parameter.
2130
*
2231
* @param body the function to execute
23-
* @param config an implicit {@see EmbeddedKafkaConfig}
32+
* @param config an implicit [[EmbeddedKafkaConfig]]
2433
*/
2534
def withRunningKafka(body: => Unit)(implicit config: EmbeddedKafkaConfig) = {
2635

@@ -40,9 +49,9 @@ trait EmbeddedKafka {
4049
*
4150
* @param topic the topic to which publish the message (it will be auto-created)
4251
* @param message the message to publish
43-
* @param config an implicit {@see EmbeddedKafkaConfig}
52+
* @param config an implicit [[EmbeddedKafkaConfig]]
4453
*/
45-
def publishToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig) = {
54+
def publishToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit = {
4655

4756
val producerProps = Map(
4857
"bootstrap.servers" -> s"localhost:${config.kafkaPort}",
@@ -56,6 +65,37 @@ trait EmbeddedKafka {
5665
kafkaProducer.close()
5766
}
5867

68+
69+
/**
70+
* Consumes the first message available in a given topic, deserializing it as a String.
71+
* Throws a [[java.util.concurrent.TimeoutException]] if a message is not available in 3 seconds.
72+
*
73+
* @param topic the topic to consume a message from
74+
* @param config an implicit [[EmbeddedKafkaConfig]]
75+
* @return the first message consumed from the given topic
76+
*/
77+
def consumeFirstMessageFrom(topic: String)(implicit config: EmbeddedKafkaConfig): String = {
78+
val props = new Properties()
79+
props.put("group.id", "scalatest-embedded-kafka-spec")
80+
props.put("zookeeper.connect", s"localhost:${config.zooKeeperPort}")
81+
props.put("auto.offset.reset", "smallest")
82+
83+
val consumer = Consumer.create(new ConsumerConfig(props))
84+
85+
val filter = Whitelist(topic)
86+
val messageStreams =
87+
consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = new StringDecoder)
88+
89+
val messageFuture = Future { messageStreams.head.iterator().next().message() }
90+
91+
try {
92+
Await.result(messageFuture, 3 seconds)
93+
} finally {
94+
consumer.shutdown()
95+
}
96+
}
97+
98+
5999
private def startZooKeeper(zooKeeperPort: Int): ServerCnxnFactory = {
60100
val zkLogsDir = Directory.makeTemp("zookeeper-logs")
61101
val tickTime = 2000

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,30 @@ package net.manub.embeddedkafka
22

33
import java.net.InetSocketAddress
44
import java.util.Properties
5+
import java.util.concurrent.TimeoutException
56

67
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
78
import akka.io.Tcp._
89
import akka.io.{IO, Tcp}
910
import akka.testkit.{ImplicitSender, TestKit}
10-
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
11+
import kafka.consumer.{ConsumerConfig, Consumer, Whitelist}
1112
import kafka.serializer.StringDecoder
12-
import org.scalatest.concurrent.ScalaFutures
13+
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
14+
import org.apache.kafka.common.serialization.StringSerializer
15+
import org.scalatest.concurrent.{JavaFutures, ScalaFutures}
1316
import org.scalatest.exceptions.TestFailedException
1417
import org.scalatest.time.{Milliseconds, Seconds, Span}
1518
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
16-
import scala.concurrent.ExecutionContext.Implicits.global
1719

20+
import scala.collection.JavaConversions._
21+
import scala.concurrent.ExecutionContext.Implicits.global
1822
import scala.concurrent.Future
1923
import scala.concurrent.duration._
2024
import scala.language.postfixOps
2125

2226
class EmbeddedKafkaSpec
2327
extends TestKit(ActorSystem("embedded-kafka-spec")) with WordSpecLike with EmbeddedKafka with Matchers
24-
with ImplicitSender with BeforeAndAfterAll with ScalaFutures {
28+
with ImplicitSender with BeforeAndAfterAll with ScalaFutures with JavaFutures {
2529

2630
implicit val config = PatienceConfig(Span(2, Seconds), Span(100, Milliseconds))
2731

@@ -103,7 +107,7 @@ class EmbeddedKafkaSpec
103107

104108
"the publishToKafka method" should {
105109

106-
"publish synchronously a message to Kafka as String" in {
110+
"publishes asynchronously a message to Kafka as String" in {
107111

108112
withRunningKafka {
109113

@@ -134,6 +138,41 @@ class EmbeddedKafkaSpec
134138
}
135139
}
136140

141+
"the consumeFirstMessageFrom method" should {
142+
143+
"returns a message published to a topic" in {
144+
145+
withRunningKafka {
146+
147+
val message = "hello world!"
148+
val topic = "test_topic"
149+
150+
val producer = new KafkaProducer[String, String](Map(
151+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
152+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
153+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
154+
))
155+
156+
whenReady(producer.send(new ProducerRecord[String, String](topic, message))) { _ =>
157+
consumeFirstMessageFrom(topic) shouldBe message
158+
}
159+
160+
producer.close()
161+
}
162+
}
163+
164+
"throws a TimeoutExeption when a message is not available" in {
165+
166+
withRunningKafka {
167+
168+
a[TimeoutException] shouldBe thrownBy {
169+
consumeFirstMessageFrom("non_existing_topic")
170+
}
171+
}
172+
173+
}
174+
}
175+
137176
lazy val consumerConfigForEmbeddedKafka: ConsumerConfig = {
138177
val props = new Properties()
139178
props.put("group.id", "test")

0 commit comments

Comments
 (0)