Skip to content

Commit e8a80e3

Browse files
committed
Added a first stub to generate a custom KafkaProducer
1 parent 420b943 commit e8a80e3

File tree

2 files changed

+34
-6
lines changed

2 files changed

+34
-6
lines changed

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@ package net.manub.embeddedkafka
22

33
import java.net.InetSocketAddress
44
import java.util.Properties
5-
import java.util.concurrent.{TimeUnit, Executors}
5+
import java.util.concurrent.Executors
66

77
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
88
import kafka.serializer.StringDecoder
99
import kafka.server.{KafkaConfig, KafkaServer}
1010
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
11-
import org.apache.kafka.common.serialization.StringSerializer
11+
import org.apache.kafka.common.serialization.{Serializer, StringSerializer}
1212
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1313
import org.scalatest.Suite
1414

1515
import scala.collection.JavaConversions.mapAsJavaMap
16-
import scala.concurrent.duration._
1716
import scala.concurrent._
17+
import scala.concurrent.duration._
1818
import scala.language.postfixOps
1919
import scala.reflect.io.Directory
2020
import scala.util.Try
@@ -99,8 +99,8 @@ trait EmbeddedKafka {
9999
val messageStreams =
100100
consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = new StringDecoder)
101101

102-
val messageFuture = Future {
103-
messageStreams.headOption.getOrElse(throw new KafkaSpecException("Unable to find a message stream")).iterator().next().message()
102+
val messageFuture = Future { messageStreams.headOption
103+
.getOrElse(throw new KafkaSpecException("Unable to find a message stream")).iterator().next().message()
104104
}
105105

106106
try {
@@ -110,6 +110,23 @@ trait EmbeddedKafka {
110110
}
111111
}
112112

113+
def aKafkaProducerThat(): KafkaProducerConfiguration = {
114+
new KafkaProducerConfiguration()
115+
}
116+
117+
sealed class KafkaProducerConfiguration {
118+
119+
def serializesValuesWith[T <: Serializer[_]](serializer: Class[T])(implicit config: EmbeddedKafkaConfig) = {
120+
new KafkaProducer[String, T](Map(
121+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
122+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
123+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName,
124+
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
125+
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
126+
))
127+
}
128+
}
129+
113130

114131
private def startZooKeeper(zooKeeperPort: Int): ServerCnxnFactory = {
115132
val zkLogsDir = Directory.makeTemp("zookeeper-logs")

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package net.manub.embeddedkafka
22

3+
import java.lang.Class
34
import java.net.InetSocketAddress
45
import java.util.Properties
56
import java.util.concurrent.TimeoutException
@@ -11,7 +12,7 @@ import akka.testkit.{ImplicitSender, TestKit}
1112
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
1213
import kafka.serializer.StringDecoder
1314
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
14-
import org.apache.kafka.common.serialization.StringSerializer
15+
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
1516
import org.scalatest.concurrent.{JavaFutures, ScalaFutures}
1617
import org.scalatest.exceptions.TestFailedException
1718
import org.scalatest.time.{Milliseconds, Seconds, Span}
@@ -22,6 +23,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
2223
import scala.concurrent.Future
2324
import scala.concurrent.duration._
2425
import scala.language.postfixOps
26+
import scala.reflect.ClassTag
2527

2628
class EmbeddedKafkaSpec
2729
extends TestKit(ActorSystem("embedded-kafka-spec")) with WordSpecLike with EmbeddedKafka with Matchers
@@ -189,6 +191,15 @@ class EmbeddedKafkaSpec
189191
}
190192
}
191193

194+
"the aKafkaProducerThat method" should {
195+
196+
"return a producer that encodes messages for the given encoder" in {
197+
val producer = aKafkaProducerThat serializesValuesWith classOf[ByteArraySerializer]
198+
199+
producer.isInstanceOf[KafkaProducer[String, ByteArraySerializer]] shouldBe true
200+
}
201+
}
202+
192203
lazy val consumerConfigForEmbeddedKafka: ConsumerConfig = {
193204
val props = new Properties()
194205
props.put("group.id", "test")

0 commit comments

Comments
 (0)