11package net .manub .embeddedkafka
22
3- import org .apache .kafka .clients .consumer .KafkaConsumer
3+ import org .apache .kafka .clients .consumer .{ ConsumerRecord , KafkaConsumer }
44import org .apache .kafka .common .KafkaException
55import org .apache .log4j .Logger
66
@@ -9,59 +9,52 @@ import scala.util.Try
99/** Method extensions for Kafka's [[KafkaConsumer ]] API allowing easy testing. */
1010object ConsumerExtensions {
1111
12- implicit class ConsumerOps [ K , V ]( val consumer : KafkaConsumer [ K , V ]) {
12+ case class ConsumerRetryConfig ( maximumAttempts : Int = 3 , poll : Long = 2000 )
1313
14- private val logger = Logger .getLogger( classOf [ ConsumerOps [ K , V ]])
14+ implicit class ConsumerOps [ K , V ]( val consumer : KafkaConsumer [ K , V ]) {
1515
16- /** Consume messages from a given topic and return them as a lazily evaluated Scala Stream.
17- * Depending on how many messages are taken from the Scala Stream it will try up to 3 times
18- * to consume batches from the given topic, until it reaches the number of desired messages or
19- * return otherwise.
20- *
21- * @param topic the topic from which to consume messages
22- * @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3)
23- * @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000)
24- * @return the stream of consumed messages that you can do `.take(n: Int).toList`
25- * to evaluate the requested number of messages.
26- */
27- def consumeLazily (topic : String , maximumAttempts : Int = 3 , poll : Long = 2000 ): Stream [(K , V )] = {
28- consumeLazilyOnTopics(List (topic), maximumAttempts, poll).map { case (t, k, v) => (k, v) }
29- }
16+ private val logger = Logger .getLogger(getClass)
3017
31- /** Consume messages from a given list of topics and return them as a lazily evaluated Scala Stream.
32- * Depending on how many messages are taken from the Scala Stream it will try up to 3 times
18+ /** Consume messages from one or many topics and return them as a lazily evaluated Scala Stream.
19+ * Depending on how many messages are taken from the Scala Stream it will try up to retryConf.maximumAttempts times
3320 * to consume batches from the given topic, until it reaches the number of desired messages or
3421 * return otherwise.
3522 *
3623 * @param topics the topics from which to consume messages
37- * @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3)
38- * @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000)
24+ * @param decoder the function to use for decoding all [[ConsumerRecord ]]
25+ * @param retryConf contains the maximum number of attempts to try and get the next batch and the amount
26+ * of time, in milliseconds, to wait in the buffer for any messages to be available
3927 * @return the stream of consumed messages that you can do `.take(n: Int).toList`
4028 * to evaluate the requested number of messages.
4129 */
42- def consumeLazilyOnTopics (topics : List [String ], maximumAttempts : Int = 3 , poll : Long = 2000 ): Stream [(String , K , V )] = {
43- val attempts = 1 to maximumAttempts
30+ def consumeLazily [T ](topics : String * )(
31+ implicit decoder : ConsumerRecord [K , V ] => T ,
32+ retryConf : ConsumerRetryConfig = ConsumerRetryConfig ()
33+ ): Stream [T ] = {
34+ val attempts = 1 to retryConf.maximumAttempts
4435 attempts.toStream.flatMap { attempt =>
45- val batch : Seq [( String , K , V ) ] = getNextBatch(topics, poll )
36+ val batch : Seq [T ] = getNextBatch(retryConf.poll, topics )
4637 logger.debug(s " ----> Batch $attempt ( $topics) | ${batch.mkString(" |" )}" )
4738 batch
4839 }
4940 }
5041
5142 /** Get the next batch of messages from Kafka.
5243 *
53- * @param topics the topic to consume
54- * @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available
44+ * @param topics the topic to consume
45+ * @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available
46+ * @param decoder the function to use for decoding all [[ConsumerRecord ]]
5547 * @return the next batch of messages
5648 */
57- private def getNextBatch (topics : List [String ], poll : Long ): Seq [(String , K , V )] =
49+ private def getNextBatch [T ](poll : Long , topics : Seq [String ])(
50+ implicit decoder : ConsumerRecord [K , V ] => T ): Seq [T ] =
5851 Try {
5952 import scala .collection .JavaConverters ._
6053 consumer.subscribe(topics.asJava)
6154 topics.foreach(consumer.partitionsFor)
6255 val records = consumer.poll(poll)
6356 // use toList to force eager evaluation. toSeq is lazy
64- records.iterator().asScala.toList.map(r => (r.topic, r.key, r.value ))
57+ records.iterator().asScala.toList.map(decoder(_ ))
6558 }.recover {
6659 case ex : KafkaException => throw new KafkaUnavailableException (ex)
6760 }.get
0 commit comments