Skip to content

Commit 0fa5cf3

Browse files
committed
Some refactoring to support:
- Definition of publisher using just the class type (and using implicits) - Type-specific publish and receive methods - AVRO support
1 parent f6b1ac7 commit 0fa5cf3

File tree

7 files changed

+232
-21
lines changed

7 files changed

+232
-21
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ lazy val commonSettings = Seq(
1010
"org.scalatest" %% "scalatest" % "2.2.5",
1111
"org.apache.kafka" %% "kafka" % "0.8.2.1",
1212
"org.apache.zookeeper" % "zookeeper" % "3.4.6",
13+
"org.apache.avro" % "avro" % "1.7.7",
1314

1415
"com.typesafe.akka" %% "akka-actor" % "2.3.11" % "test",
1516
"com.typesafe.akka" %% "akka-testkit" % "2.3.11" % "test"

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import java.util.Properties
55
import java.util.concurrent.Executors
66

77
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
8-
import kafka.serializer.StringDecoder
8+
import kafka.serializer.{Decoder, StringDecoder}
99
import kafka.server.{KafkaConfig, KafkaServer}
1010
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
11-
import org.apache.kafka.common.serialization.{Serializer, StringSerializer}
11+
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringSerializer}
1212
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1313
import org.scalatest.Suite
1414

@@ -46,6 +46,9 @@ trait EmbeddedKafka {
4646
}
4747
}
4848

49+
def publishStringMessageToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit =
50+
publishToKafka(topic, message)(config, new StringSerializer)
51+
4952
/**
5053
* Publishes asynchronously a message to the running Kafka broker.
5154
*
@@ -55,24 +58,25 @@ trait EmbeddedKafka {
5558
* @throws KafkaUnavailableException if unable to connect to Kafka
5659
*/
5760
@throws(classOf[KafkaUnavailableException])
58-
def publishToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit = {
61+
def publishToKafka[T](topic: String, message: T)(implicit config: EmbeddedKafkaConfig, serializer: Serializer[T]): Unit = {
5962

60-
val kafkaProducer = new KafkaProducer[String, String](Map[String, String](
63+
val kafkaProducer = new KafkaProducer(Map(
6164
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
62-
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
63-
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
6465
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
6566
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
66-
))
67+
), new StringSerializer, serializer)
6768

68-
val sendFuture = kafkaProducer.send(new ProducerRecord[String, String](topic, message))
69+
val sendFuture = kafkaProducer.send(new ProducerRecord(topic, message))
6970
val sendResult = Try { sendFuture.get(3, SECONDS) }
7071

7172
kafkaProducer.close()
7273

7374
if (sendResult.isFailure) throw new KafkaUnavailableException
7475
}
7576

77+
def consumeFirstStringMessageFrom(topic: String)(implicit config: EmbeddedKafkaConfig): String =
78+
consumeFirstMessageFrom(topic)(config, new StringDecoder())
79+
7680

7781
/**
7882
* Consumes the first message available in a given topic, deserializing it as a String.
@@ -85,7 +89,7 @@ trait EmbeddedKafka {
8589
*/
8690
@throws(classOf[TimeoutException])
8791
@throws(classOf[KafkaUnavailableException])
88-
def consumeFirstMessageFrom(topic: String)(implicit config: EmbeddedKafkaConfig): String = {
92+
def consumeFirstMessageFrom[T](topic: String)(implicit config: EmbeddedKafkaConfig, decoder: Decoder[T]): T = {
8993
val props = new Properties()
9094
props.put("group.id", s"embedded-kafka-spec-$suiteId")
9195
props.put("zookeeper.connect", s"localhost:${config.zooKeeperPort}")
@@ -98,7 +102,7 @@ trait EmbeddedKafka {
98102

99103
val filter = Whitelist(topic)
100104
val messageStreams =
101-
consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = new StringDecoder)
105+
consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = decoder)
102106

103107
val messageFuture = Future { messageStreams.headOption
104108
.getOrElse(throw new KafkaSpecException("Unable to find a message stream")).iterator().next().message()
@@ -113,13 +117,21 @@ trait EmbeddedKafka {
113117

114118
object aKafkaProducer {
115119
def thatSerializesValuesWith[V](serializer: Class[_ <: Serializer[V]])(implicit config: EmbeddedKafkaConfig) = {
116-
new KafkaProducer[String, V](Map(
117-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
120+
new KafkaProducer[String, V]( basicKafkaConfig(config) + (
118121
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
119-
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName,
122+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName
123+
))
124+
}
125+
126+
def apply[V](implicit valueSerializer: Serializer[V], config: EmbeddedKafkaConfig) =
127+
new KafkaProducer[String, V](basicKafkaConfig(config), new StringSerializer, valueSerializer)
128+
129+
def basicKafkaConfig[V](config: EmbeddedKafkaConfig): Map[String, String] = {
130+
Map(
131+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
120132
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
121133
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
122-
))
134+
)
123135
}
124136
}
125137

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package net.manub.embeddedkafka.marshalling.avro
2+
3+
import java.io.ByteArrayOutputStream
4+
5+
import kafka.serializer.{Decoder, Encoder}
6+
import kafka.utils.VerifiableProperties
7+
import org.apache.avro.Schema
8+
import org.apache.avro.io._
9+
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter, SpecificRecord}
10+
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
11+
12+
class KafkaAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null) extends Decoder[T]{
13+
private[this] val NoInstanceReuse = null.asInstanceOf[T]
14+
private[this] val NoDecoderReuse = null.asInstanceOf[BinaryDecoder]
15+
private[this] val reader: SpecificDatumReader[T] = new SpecificDatumReader[T](schema)
16+
17+
override def fromBytes(bytes: Array[Byte]): T = {
18+
val decoder = DecoderFactory.get().binaryDecoder(bytes, NoDecoderReuse)
19+
reader.read(NoInstanceReuse, decoder)
20+
}
21+
}
22+
23+
class KafkaAvroEncoder[T <: SpecificRecord](props: VerifiableProperties = null) extends Encoder[T] {
24+
private[this] val NoEncoderReuse = null.asInstanceOf[BinaryEncoder]
25+
26+
override def toBytes(nullableData: T): Array[Byte] = {
27+
Option(nullableData).fold[Array[Byte]](null) { data =>
28+
val writer: DatumWriter[T] = new SpecificDatumWriter[T](data.getSchema)
29+
val out = new ByteArrayOutputStream()
30+
val encoder = EncoderFactory.get.binaryEncoder(out, NoEncoderReuse)
31+
32+
writer.write(data, encoder)
33+
encoder.flush()
34+
out.close()
35+
36+
out.toByteArray
37+
}
38+
39+
}
40+
}
41+
42+
class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema) extends Deserializer[T]{
43+
private[this] val decoder = new KafkaAvroDecoder[T](schema = schema)
44+
45+
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}
46+
47+
override def close(): Unit = {}
48+
49+
override def deserialize(topic: String, data: Array[Byte]): T = decoder.fromBytes(data)
50+
}
51+
52+
class KafkaAvroSerializer[T <: SpecificRecord]() extends Serializer[T] {
53+
private[this] val encoder = new KafkaAvroEncoder[T]()
54+
55+
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}
56+
57+
override def serialize(topic: String, data: T): Array[Byte] = encoder.toBytes(data)
58+
59+
override def close(): Unit = {}
60+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package net.manub.embeddedkafka.marshalling
2+
3+
import kafka.serializer.{Encoder, Decoder}
4+
import kafka.utils.VerifiableProperties
5+
import org.apache.avro.Schema
6+
import org.apache.avro.specific.SpecificRecord
7+
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
8+
9+
package object avro {
10+
implicit def specificAvroSerializer[T <: SpecificRecord] : Serializer[T] = new KafkaAvroSerializer[T]
11+
implicit def specificAvroEncoder[T <: SpecificRecord] : Encoder[T] = new KafkaAvroEncoder[T]
12+
13+
def specificAvroDeserializer[T <: SpecificRecord](schema: Schema) : Deserializer[T] =
14+
new KafkaAvroDeserializer[T](schema)
15+
16+
def specificAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null) : Decoder[T] =
17+
new KafkaAvroDecoder[T](schema, props)
18+
}
19+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package net.manub.embeddedkafka
2+
3+
import kafka.serializer._
4+
import org.apache.kafka.common.serialization._
5+
6+
7+
package object marshalling {
8+
implicit val stringEncoder: Encoder[String] = new StringEncoder()
9+
implicit val nullEncoder: Encoder[Array[Byte]] = new DefaultEncoder()
10+
implicit val stringSerializer: Serializer[String] = new StringSerializer()
11+
implicit val nullSerializer: Serializer[Array[Byte]] = new ByteArraySerializer()
12+
13+
implicit val stringDecoder: Decoder[String] = new StringDecoder()
14+
implicit val nullDecoder: Decoder[Array[Byte]] = new DefaultDecoder()
15+
implicit val stringDeserializer: Deserializer[String] = new StringDeserializer()
16+
implicit val nullDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer()
17+
}

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,14 @@ class EmbeddedKafkaSpec
107107

108108
"the publishToKafka method" should {
109109

110-
"publishes asynchronously a message to Kafka as String" in {
110+
"publishes asynchronously a message to Kafka" in {
111111

112112
withRunningKafka {
113113

114114
val message = "hello world!"
115115
val topic = "test_topic"
116116

117-
publishToKafka(topic, message)
117+
publishStringMessageToKafka(topic, message)
118118

119119
val consumer = Consumer.create(consumerConfigForEmbeddedKafka)
120120

@@ -142,9 +142,8 @@ class EmbeddedKafkaSpec
142142
}
143143

144144
"throws a KafkaUnavailableException when Kafka is unavailable when trying to publish" in {
145-
146145
a[KafkaUnavailableException] shouldBe thrownBy {
147-
publishToKafka("non_existing_topic", "a message")
146+
publishStringMessageToKafka("non_existing_topic", "a message")
148147
}
149148
}
150149
}
@@ -165,7 +164,50 @@ class EmbeddedKafkaSpec
165164
))
166165

167166
whenReady(producer.send(new ProducerRecord[String, String](topic, message))) { _ =>
168-
consumeFirstMessageFrom(topic) shouldBe message
167+
consumeFirstStringMessageFrom(topic) shouldBe message
168+
}
169+
170+
producer.close()
171+
}
172+
}
173+
174+
"returns a message published to a topic with implicit decoder" in {
175+
176+
withRunningKafka {
177+
178+
val message = "hello world!"
179+
val topic = "test_topic"
180+
181+
val producer = new KafkaProducer[String, String](Map(
182+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
183+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
184+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
185+
))
186+
187+
import marshalling._
188+
whenReady(producer.send(new ProducerRecord[String, String](topic, message))) { _ =>
189+
consumeFirstMessageFrom[Array[Byte]](topic) shouldBe message.getBytes
190+
}
191+
192+
producer.close()
193+
}
194+
}
195+
196+
"return a message published to a topic with custom decoder" in {
197+
198+
import marshalling.avro._
199+
withRunningKafka {
200+
201+
val message = TestAvroClass("name")
202+
val topic = "test_topic"
203+
implicit val testAvroClassDecoder = specificAvroDecoder[TestAvroClass](TestAvroClass.SCHEMA$)
204+
205+
val producer = new KafkaProducer[String, TestAvroClass](Map(
206+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001"
207+
), new StringSerializer, specificAvroSerializer[TestAvroClass])
208+
209+
whenReady(producer.send(new ProducerRecord(topic, message))) { _ =>
210+
consumeFirstMessageFrom[TestAvroClass](topic) shouldBe message
169211
}
170212

171213
producer.close()
@@ -176,15 +218,15 @@ class EmbeddedKafkaSpec
176218

177219
withRunningKafka {
178220
a[TimeoutException] shouldBe thrownBy {
179-
consumeFirstMessageFrom("non_existing_topic")
221+
consumeFirstStringMessageFrom("non_existing_topic")
180222
}
181223
}
182224
}
183225

184226
"throws a KafkaUnavailableException when there's no running instance of Kafka" in {
185227

186228
a[KafkaUnavailableException] shouldBe thrownBy {
187-
consumeFirstMessageFrom("non_existing_topic")
229+
consumeFirstStringMessageFrom("non_existing_topic")
188230
}
189231
}
190232
}
@@ -201,6 +243,27 @@ class EmbeddedKafkaSpec
201243
}
202244
}
203245

246+
"the aKafkaProducer object" should {
247+
248+
"return a producer that encodes messages for the given type" in {
249+
import marshalling._
250+
withRunningKafka {
251+
val producer = aKafkaProducer[String]
252+
producer.send(new ProducerRecord[String, String]("a topic", "a message"))
253+
}
254+
}
255+
256+
257+
"return a producer that encodes messages for a custom type" in {
258+
import marshalling.avro._
259+
260+
withRunningKafka {
261+
val producer = aKafkaProducer[TestAvroClass]
262+
producer.send(new ProducerRecord[String, TestAvroClass]("a topic", TestAvroClass("name")))
263+
}
264+
}
265+
}
266+
204267
lazy val consumerConfigForEmbeddedKafka: ConsumerConfig = {
205268
val props = new Properties()
206269
props.put("group.id", "test")
@@ -211,6 +274,8 @@ class EmbeddedKafkaSpec
211274
}
212275
}
213276

277+
278+
214279
object TcpClient {
215280
def props(remote: InetSocketAddress, replies: ActorRef) = Props(classOf[TcpClient], remote, replies)
216281
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package net.manub.embeddedkafka
2+
3+
import org.apache.avro.specific.SpecificRecordBase
4+
import org.apache.avro.{AvroRuntimeException, Schema}
5+
6+
case class TestAvroClass(var name: String) extends SpecificRecordBase {
7+
def this() = this("")
8+
9+
override def get(i: Int): AnyRef = i match {
10+
case 0 => name
11+
case _ => throw new AvroRuntimeException("Bad index")
12+
}
13+
14+
override def put(i: Int, v: scala.Any): Unit = i match {
15+
case 0 => name = v match {
16+
case (utf8: org.apache.avro.util.Utf8) => utf8.toString
17+
case _ => v.asInstanceOf[String]
18+
}
19+
case _ => throw new AvroRuntimeException("Bad index")
20+
}
21+
22+
override def getSchema: Schema = TestAvroClass.SCHEMA$
23+
}
24+
25+
object TestAvroClass {
26+
val SCHEMA$ = (new Schema.Parser).parse(
27+
"""
28+
|{"namespace": "example",
29+
| "type": "record",
30+
| "namespace": "net.manub.embeddedkafka",
31+
| "name": "TestAvroClass",
32+
| "fields": [
33+
| {"name": "name", "type": "string"}
34+
| ]
35+
|}
36+
""".stripMargin )
37+
}

0 commit comments

Comments
 (0)