Skip to content

Commit a5a9afd

Browse files
blissdmanub
authored andcommitted
Add a batch version of publishToKafka (#88)
* Add a batch version of publishToKafka It is very expensive to create a `KafkaProducer`. For tests that send a lot of messages it is very slow to use the original `publishToKafka` method that created a new `KafkaProducer` for each message to send. Now there is a batch version of `publishToKafka` that accepts a sequence of key/value tuples and sends them all with the same `KafkaProducer` instance. * Replace `filter` with `find`
1 parent a2f5663 commit a5a9afd

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,40 @@ sealed trait EmbeddedKafkaSupport {
247247
new KafkaProducer(baseProducerConfig.asJava, keySerializer, serializer),
248248
new ProducerRecord(topic, key, message))
249249

250+
251+
/**
252+
* Publishes synchronously a batch of message to the running Kafka broker.
253+
*
254+
* @param topic the topic to which publish the message (it will be auto-created)
255+
* @param messages the keys and messages of type [[(K, T)]] to publish
256+
* @param config an implicit [[EmbeddedKafkaConfig]]
257+
* @param keySerializer an implicit [[Serializer]] for the type [[K]]
258+
* @param serializer an implicit [[Serializer]] for the type [[T]]
259+
* @throws KafkaUnavailableException if unable to connect to Kafka
260+
*/
261+
@throws(classOf[KafkaUnavailableException])
262+
def publishToKafka[K, T](topic: String, messages: Seq[(K, T)])(
263+
implicit config: EmbeddedKafkaConfig,
264+
keySerializer: Serializer[K],
265+
serializer: Serializer[T]): Unit = {
266+
267+
val producer = new KafkaProducer(baseProducerConfig.asJava, keySerializer, serializer)
268+
269+
val tupleToRecord = (new ProducerRecord(topic, _: K, _: T)).tupled
270+
271+
val futureSend = tupleToRecord andThen producer.send
272+
273+
val futures = messages.map(futureSend)
274+
275+
// Assure all messages sent before returning, and fail on first send error
276+
val records = futures.map(f => Try(f.get(10, SECONDS)))
277+
records.find(_.isFailure).foreach(record => throw new KafkaUnavailableException(record.failed.get))
278+
279+
producer.close()
280+
}
281+
250282
private def publishToKafka[K, T](kafkaProducer: KafkaProducer[K, T],
251-
record: ProducerRecord[K, T]) = {
283+
record: ProducerRecord[K, T]): Unit = {
252284
val sendFuture = kafkaProducer.send(record)
253285
val sendResult = Try {
254286
sendFuture.get(10, SECONDS)

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,38 @@ class EmbeddedKafkaMethodsSpec
7979

8080
consumer.close()
8181
}
82+
83+
"publish synchronously a batch of String messages with String keys to Kafka" in {
84+
implicit val serializer = new StringSerializer()
85+
implicit val deserializer = new StringDeserializer()
86+
val key1 = "key1"
87+
val message1 = "hello world!"
88+
val key2 = "key2"
89+
val message2 = "goodbye world!"
90+
val topic = "publish_test_topic_batch_string_key"
91+
92+
val messages = List((key1, message1), (key2, message2))
93+
94+
publishToKafka(topic, messages)
95+
96+
val consumer = kafkaConsumer
97+
consumer.subscribe(List(topic).asJava)
98+
99+
val records = consumer.poll(consumerPollTimeout).iterator()
100+
101+
records.hasNext shouldBe true
102+
103+
val record1 = records.next()
104+
record1.key() shouldBe key1
105+
record1.value() shouldBe message1
106+
107+
records.hasNext shouldBe true
108+
val record2 = records.next()
109+
record2.key() shouldBe key2
110+
record2.value() shouldBe message2
111+
112+
consumer.close()
113+
}
82114
}
83115

84116
"the createCustomTopic method" should {

0 commit comments

Comments
 (0)