Skip to content

Commit ef1caff

Browse files
aseigneurinmanub
authored andcommitted
Add support for keyed messages (#86)
* add support for keyed messages * adding a unit test for method “consumeNumberKeyedMessagesFromTopics” * fixed unit tests grouping * adding a unit test for method “consumeFirstKeyedMessageFrom” * simpler formatting
1 parent ef2ff4d commit ef1caff

File tree

2 files changed

+207
-24
lines changed

2 files changed

+207
-24
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 92 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ sealed trait EmbeddedKafkaSupport {
293293

294294
def consumeFirstStringMessageFrom(topic: String, autoCommit: Boolean = false)(
295295
implicit config: EmbeddedKafkaConfig): String =
296-
consumeFirstMessageFrom(topic, autoCommit)(config, new StringDeserializer())
296+
consumeNumberStringMessagesFrom(topic, 1, autoCommit)(config).head
297297

298298
def consumeNumberStringMessagesFrom(topic: String,
299299
number: Int,
@@ -304,7 +304,7 @@ sealed trait EmbeddedKafkaSupport {
304304
new StringDeserializer())
305305

306306
/**
307-
* Consumes the first message available in a given topic, deserializing it as type [[T]].
307+
* Consumes the first message available in a given topic, deserializing it as type [[V]].
308308
*
309309
* Only the message that is returned is committed if autoCommit is false.
310310
* If autoCommit is true then all messages that were polled will be committed.
@@ -314,30 +314,62 @@ sealed trait EmbeddedKafkaSupport {
314314
* if true, the offset for the last polled message will be committed instead.
315315
* Defaulted to false.
316316
* @param config an implicit [[EmbeddedKafkaConfig]]
317-
* @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[T]]
318-
* @return the first message consumed from the given topic, with a type [[T]]
317+
* @param valueDeserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[V]]
318+
* @return the first message consumed from the given topic, with a type [[V]]
319319
* @throws TimeoutException if unable to consume a message within 5 seconds
320320
* @throws KafkaUnavailableException if unable to connect to Kafka
321321
*/
322322
@throws(classOf[TimeoutException])
323323
@throws(classOf[KafkaUnavailableException])
324-
def consumeFirstMessageFrom[T](topic: String, autoCommit: Boolean = false)(
324+
def consumeFirstMessageFrom[V](topic: String, autoCommit: Boolean = false)(
325+
implicit config: EmbeddedKafkaConfig,
326+
valueDeserializer: Deserializer[V]): V =
327+
consumeNumberMessagesFrom[V](topic, 1, autoCommit)(config, valueDeserializer).head
328+
329+
/**
330+
* Consumes the first message available in a given topic, deserializing it as type [[(K, V)]].
331+
*
332+
* Only the message that is returned is committed if autoCommit is false.
333+
* If autoCommit is true then all messages that were polled will be committed.
334+
*
335+
* @param topic the topic to consume a message from
336+
* @param autoCommit if false, only the offset for the consumed message will be commited.
337+
* if true, the offset for the last polled message will be committed instead.
338+
* Defaulted to false.
339+
* @param config an implicit [[EmbeddedKafkaConfig]]
340+
* @param keyDeserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[K]]
341+
* @param valueDeserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[V]]
342+
* @return the first message consumed from the given topic, with a type [[(K, V)]]
343+
* @throws TimeoutException if unable to consume a message within 5 seconds
344+
* @throws KafkaUnavailableException if unable to connect to Kafka
345+
*/
346+
@throws(classOf[TimeoutException])
347+
@throws(classOf[KafkaUnavailableException])
348+
def consumeFirstKeyedMessageFrom[K, V](topic: String, autoCommit: Boolean = false)(
325349
implicit config: EmbeddedKafkaConfig,
326-
deserializer: Deserializer[T]): T =
327-
consumeNumberMessagesFrom(topic, 1, autoCommit)(config, deserializer).head
350+
keyDeserializer: Deserializer[K],
351+
valueDeserializer: Deserializer[V]): (K, V) =
352+
consumeNumberKeyedMessagesFrom[K, V](topic, 1, autoCommit)(config, keyDeserializer, valueDeserializer).head
328353

329-
def consumeNumberMessagesFrom[T](topic: String,
354+
def consumeNumberMessagesFrom[V](topic: String,
330355
number: Int,
331356
autoCommit: Boolean = false)(
357+
implicit config: EmbeddedKafkaConfig,
358+
valueDeserializer: Deserializer[V]): List[V] =
359+
consumeNumberMessagesFromTopics(Set(topic), number, autoCommit)(config, valueDeserializer)(topic)
360+
361+
def consumeNumberKeyedMessagesFrom[K, V](topic: String,
362+
number: Int,
363+
autoCommit: Boolean = false)(
332364
implicit config: EmbeddedKafkaConfig,
333-
deserializer: Deserializer[T]): List[T] =
334-
consumeNumberMessagesFromTopics(Set(topic), number, autoCommit)(
335-
config,
336-
deserializer)(topic)
365+
keyDeserializer: Deserializer[K],
366+
valueDeserializer: Deserializer[V]): List[(K, V)] =
367+
consumeNumberKeyedMessagesFromTopics(Set(topic), number, autoCommit)(config, keyDeserializer,
368+
valueDeserializer)(topic)
337369

338370
/**
339-
* Consumes the first n messages available in given topics, deserializes them as type [[T]], and returns
340-
* the n messages in a Map from topic name to List[T].
371+
* Consumes the first n messages available in given topics, deserializes them as type [[V]], and returns
372+
* the n messages in a Map from topic name to List[V].
341373
*
342374
* Only the messages that are returned are committed if autoCommit is false.
343375
* If autoCommit is true then all messages that were polled will be committed.
@@ -353,19 +385,57 @@ sealed trait EmbeddedKafkaSupport {
353385
* throw TimeoutException after the timeout interval if we
354386
* haven't received all of the expected messages
355387
* @param config an implicit [[EmbeddedKafkaConfig]]
356-
* @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[T]]
357-
* @return the List of messages consumed from the given topics, each with a type [[T]]
388+
* @param valueDeserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]]
389+
* for the type [[V]]
390+
* @return the List of messages consumed from the given topics, each with a type [[V]]
358391
* @throws TimeoutException if unable to consume messages within specified timeout
359392
* @throws KafkaUnavailableException if unable to connect to Kafka
360393
*/
361-
def consumeNumberMessagesFromTopics[T](topics: Set[String],
394+
def consumeNumberMessagesFromTopics[V](topics: Set[String],
362395
number: Int,
363396
autoCommit: Boolean = false,
364397
timeout: Duration = 5.seconds,
365398
resetTimeoutOnEachMessage: Boolean =
366399
true)(
367400
implicit config: EmbeddedKafkaConfig,
368-
deserializer: Deserializer[T]): Map[String, List[T]] = {
401+
valueDeserializer: Deserializer[V]): Map[String, List[V]] = {
402+
consumeNumberKeyedMessagesFromTopics(topics, number, autoCommit, timeout,
403+
resetTimeoutOnEachMessage)(config, new StringDeserializer(), valueDeserializer)
404+
.mapValues(_.map(_._2))
405+
}
406+
407+
/**
408+
* Consumes the first n messages available in given topics, deserializes them as type [[(K, V)]], and returns
409+
* the n messages in a Map from topic name to List[(K, V)].
410+
*
411+
* Only the messages that are returned are committed if autoCommit is false.
412+
* If autoCommit is true then all messages that were polled will be committed.
413+
*
414+
* @param topics the topics to consume messages from
415+
* @param number the number of messages to consume in a batch
416+
* @param autoCommit if false, only the offset for the consumed messages will be commited.
417+
* if true, the offset for the last polled message will be committed instead.
418+
* Defaulted to false.
419+
* @param timeout the interval to wait for messages before throwing TimeoutException
420+
* @param resetTimeoutOnEachMessage when true, throw TimeoutException if we have a silent period
421+
* (no incoming messages) for the timeout interval; when false,
422+
* throw TimeoutException after the timeout interval if we
423+
* haven't received all of the expected messages
424+
* @param config an implicit [[EmbeddedKafkaConfig]]
425+
* @param keyDeserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[K]]
426+
* @param valueDeserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[V]]
427+
* @return the List of messages consumed from the given topics, each with a type [[(K, V)]]
428+
* @throws TimeoutException if unable to consume messages within specified timeout
429+
* @throws KafkaUnavailableException if unable to connect to Kafka
430+
*/
431+
def consumeNumberKeyedMessagesFromTopics[K, V](topics: Set[String],
432+
number: Int,
433+
autoCommit: Boolean = false,
434+
timeout: Duration = 5.seconds,
435+
resetTimeoutOnEachMessage: Boolean = true)(
436+
implicit config: EmbeddedKafkaConfig,
437+
keyDeserializer: Deserializer[K],
438+
valueDeserializer: Deserializer[V]): Map[String, List[(K, V)]] = {
369439

370440
import scala.collection.JavaConverters._
371441

@@ -374,10 +444,10 @@ sealed trait EmbeddedKafkaSupport {
374444

375445
var timeoutNanoTime = System.nanoTime + timeout.toNanos
376446
val consumer =
377-
new KafkaConsumer[String, T](props, new StringDeserializer, deserializer)
447+
new KafkaConsumer[K, V](props, keyDeserializer, valueDeserializer)
378448

379449
val messages = Try {
380-
val messagesBuffers = topics.map(_ -> ListBuffer.empty[T]).toMap
450+
val messagesBuffers = topics.map(_ -> ListBuffer.empty[(K, V)]).toMap
381451
var messagesRead = 0
382452
consumer.subscribe(topics.asJava)
383453
topics.foreach(consumer.partitionsFor)
@@ -391,7 +461,8 @@ sealed trait EmbeddedKafkaSupport {
391461
while (recordIter.hasNext && messagesRead < number) {
392462
val record = recordIter.next()
393463
val topic = record.topic()
394-
messagesBuffers(topic) += record.value()
464+
val message = (record.key(), record.value())
465+
messagesBuffers(topic) += message
395466
val tp = new TopicPartition(topic, record.partition())
396467
val om = new OffsetAndMetadata(record.offset() + 1)
397468
consumer.commitSync(Map(tp -> om).asJava)

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,16 @@ class EmbeddedKafkaMethodsSpec
183183
producer.close()
184184
}
185185

186+
"throw a TimeoutExeption when a message is not available" in {
187+
a[TimeoutException] shouldBe thrownBy {
188+
consumeFirstStringMessageFrom("non_existing_topic")
189+
}
190+
}
191+
}
192+
193+
"the consumeFirstMessageFrom method" should {
194+
val config = EmbeddedKafkaConfig()
195+
186196
"return a message published to a topic with implicit decoder" in {
187197
val message = "hello world!"
188198
val topic = "consume_test_topic"
@@ -228,11 +238,84 @@ class EmbeddedKafkaMethodsSpec
228238

229239
producer.close()
230240
}
241+
}
231242

232-
"throw a TimeoutExeption when a message is not available" in {
233-
a[TimeoutException] shouldBe thrownBy {
234-
consumeFirstStringMessageFrom("non_existing_topic")
243+
"the consumeFirstKeyedMessageFrom method" should {
244+
val config = EmbeddedKafkaConfig()
245+
246+
"return a message published to a topic with implicit decoders" in {
247+
val key = "greeting"
248+
val message = "hello world!"
249+
val topic = "consume_test_topic"
250+
251+
val producer = new KafkaProducer[String, String](
252+
Map[String, Object](
253+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
254+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[
255+
StringSerializer].getName,
256+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[
257+
StringSerializer].getName
258+
).asJava)
259+
260+
import Codecs._
261+
whenReady(
262+
producer.send(new ProducerRecord[String, String](topic, key, message))) {
263+
_ =>
264+
val res = consumeFirstKeyedMessageFrom[Array[Byte], Array[Byte]](topic)
265+
res._1 shouldBe key.getBytes
266+
res._2 shouldBe message.getBytes
235267
}
268+
269+
producer.close()
270+
}
271+
272+
"return a message published to a topic with custom decoders" in {
273+
274+
import avro._
275+
276+
val key = TestAvroClass("key")
277+
val message = TestAvroClass("message")
278+
val topic = "consume_test_topic"
279+
implicit val testAvroClassDecoder =
280+
specificAvroDeserializer[TestAvroClass](TestAvroClass.SCHEMA$)
281+
282+
val producer = new KafkaProducer[TestAvroClass, TestAvroClass](
283+
Map[String, Object](
284+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}"
285+
).asJava,
286+
specificAvroSerializer[TestAvroClass],
287+
specificAvroSerializer[TestAvroClass])
288+
289+
whenReady(producer.send(new ProducerRecord(topic, key, message))) { _ =>
290+
consumeFirstKeyedMessageFrom[TestAvroClass, TestAvroClass](topic) shouldBe (key, message)
291+
}
292+
293+
producer.close()
294+
}
295+
296+
"return a message published to a topic with 2 different decoders" in {
297+
298+
import avro._
299+
300+
val key = "key"
301+
val message = TestAvroClass("message")
302+
val topic = "consume_test_topic"
303+
implicit val stringDecoder = new StringDeserializer
304+
implicit val testAvroClassDecoder =
305+
specificAvroDeserializer[TestAvroClass](TestAvroClass.SCHEMA$)
306+
307+
val producer = new KafkaProducer[String, TestAvroClass](
308+
Map[String, Object](
309+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}"
310+
).asJava,
311+
new StringSerializer,
312+
specificAvroSerializer[TestAvroClass])
313+
314+
whenReady(producer.send(new ProducerRecord(topic, key, message))) { _ =>
315+
consumeFirstKeyedMessageFrom[String, TestAvroClass](topic) shouldBe (key, message)
316+
}
317+
318+
producer.close()
236319
}
237320
}
238321

@@ -319,6 +402,35 @@ class EmbeddedKafkaMethodsSpec
319402
}
320403
}
321404

405+
"the consumeNumberKeyedMessagesFromTopics method" should {
406+
"consume from multiple topics" in {
407+
val config = EmbeddedKafkaConfig()
408+
val topicMessagesMap = Map("topic1" -> List(("m1", "message 1")),
409+
"topic2" -> List(("m2a", "message 2a"), ("m2b", "message 2b")))
410+
val producer = new KafkaProducer[String, String](
411+
Map[String, Object](
412+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
413+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[
414+
StringSerializer].getName,
415+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[
416+
StringSerializer].getName
417+
).asJava)
418+
for ((topic, messages) <- topicMessagesMap; message <- messages) {
419+
producer.send(new ProducerRecord[String, String](topic, message._1, message._2))
420+
}
421+
422+
producer.flush()
423+
424+
implicit val deserializer = new StringDeserializer
425+
val consumedMessages =
426+
consumeNumberKeyedMessagesFromTopics(topicMessagesMap.keySet, topicMessagesMap.values.map(_.size).sum)
427+
428+
consumedMessages.mapValues(_.sorted) shouldEqual topicMessagesMap
429+
430+
producer.close()
431+
}
432+
}
433+
322434
"the aKafkaProducerThat method" should {
323435
"return a producer that encodes messages for the given encoder" in {
324436
val producer = aKafkaProducer thatSerializesValuesWith classOf[

0 commit comments

Comments
 (0)