Skip to content

Commit f6b1ac7

Browse files
committed
Refactoring for aKafkaProducerThat object
1 parent e8a80e3 commit f6b1ac7

File tree

3 files changed

+18
-20
lines changed

3 files changed

+18
-20
lines changed

build.sbt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import sbtrelease.Version
2+
13
lazy val commonSettings = Seq(
24
name := "scalatest-embedded-kafka",
35
organization := "net.manub",
@@ -34,7 +36,7 @@ lazy val publishSettings = Seq(
3436
)
3537

3638
lazy val releaseSettings = Seq(
37-
releaseVersionBump := sbtrelease.Version.Bump.Minor,
39+
releaseVersionBump := Version.Bump.Minor,
3840
releaseCrossBuild := true
3941
)
4042

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ import org.scalatest.Suite
1515
import scala.collection.JavaConversions.mapAsJavaMap
1616
import scala.concurrent._
1717
import scala.concurrent.duration._
18-
import scala.language.postfixOps
18+
import scala.language.{higherKinds, postfixOps}
1919
import scala.reflect.io.Directory
20+
import scala.reflect.runtime.universe._
2021
import scala.util.Try
2122

2223
trait EmbeddedKafka {
@@ -110,24 +111,18 @@ trait EmbeddedKafka {
110111
}
111112
}
112113

113-
def aKafkaProducerThat(): KafkaProducerConfiguration = {
114-
new KafkaProducerConfiguration()
115-
}
116-
117-
sealed class KafkaProducerConfiguration {
118-
119-
def serializesValuesWith[T <: Serializer[_]](serializer: Class[T])(implicit config: EmbeddedKafkaConfig) = {
120-
new KafkaProducer[String, T](Map(
121-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
122-
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
123-
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName,
124-
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
125-
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
114+
object aKafkaProducer {
115+
def thatSerializesValuesWith[V](serializer: Class[_ <: Serializer[V]])(implicit config: EmbeddedKafkaConfig) = {
116+
new KafkaProducer[String, V](Map(
117+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
118+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
119+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName,
120+
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
121+
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
126122
))
127123
}
128124
}
129125

130-
131126
private def startZooKeeper(zooKeeperPort: Int): ServerCnxnFactory = {
132127
val zkLogsDir = Directory.makeTemp("zookeeper-logs")
133128
val tickTime = 2000

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package net.manub.embeddedkafka
22

3-
import java.lang.Class
43
import java.net.InetSocketAddress
54
import java.util.Properties
65
import java.util.concurrent.TimeoutException
@@ -23,7 +22,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
2322
import scala.concurrent.Future
2423
import scala.concurrent.duration._
2524
import scala.language.postfixOps
26-
import scala.reflect.ClassTag
2725

2826
class EmbeddedKafkaSpec
2927
extends TestKit(ActorSystem("embedded-kafka-spec")) with WordSpecLike with EmbeddedKafka with Matchers
@@ -194,9 +192,12 @@ class EmbeddedKafkaSpec
194192
"the aKafkaProducerThat method" should {
195193

196194
"return a producer that encodes messages for the given encoder" in {
197-
val producer = aKafkaProducerThat serializesValuesWith classOf[ByteArraySerializer]
198195

199-
producer.isInstanceOf[KafkaProducer[String, ByteArraySerializer]] shouldBe true
196+
withRunningKafka {
197+
val producer = aKafkaProducer thatSerializesValuesWith classOf[ByteArraySerializer]
198+
producer.send(new ProducerRecord[String, Array[Byte]]("a topic", "a message".getBytes))
199+
}
200+
200201
}
201202
}
202203

0 commit comments

Comments
 (0)