Skip to content

Commit 02638f4

Browse files
committed
Some refactoring
1 parent 00e2241 commit 02638f4

File tree

6 files changed

+76
-91
lines changed

6 files changed

+76
-91
lines changed

src/main/scala/net/manub/embeddedkafka/marshalling/marshalling.scala renamed to src/main/scala/net/manub/embeddedkafka/Codecs.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package net.manub.embeddedkafka
33
import kafka.serializer._
44
import org.apache.kafka.common.serialization._
55

6-
7-
package object marshalling {
6+
/** useful encoders/serializers and decoders/deserializers **/
7+
object Codecs {
88
implicit val stringEncoder: Encoder[String] = new StringEncoder()
99
implicit val nullEncoder: Encoder[Array[Byte]] = new DefaultEncoder()
1010
implicit val stringSerializer: Serializer[String] = new StringSerializer()

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
88
import kafka.serializer.{Decoder, StringDecoder}
99
import kafka.server.{KafkaConfig, KafkaServer}
1010
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
11-
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringSerializer}
11+
import org.apache.kafka.common.serialization.{Serializer, StringSerializer}
1212
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1313
import org.scalatest.Suite
1414

@@ -17,7 +17,6 @@ import scala.concurrent._
1717
import scala.concurrent.duration._
1818
import scala.language.{higherKinds, postfixOps}
1919
import scala.reflect.io.Directory
20-
import scala.reflect.runtime.universe._
2120
import scala.util.Try
2221

2322
trait EmbeddedKafka {
@@ -46,28 +45,42 @@ trait EmbeddedKafka {
4645
}
4746
}
4847

48+
49+
/**
50+
* Publishes synchronously a message of type [[String]] to the running Kafka broker.
51+
*
52+
* @see [[EmbeddedKafka#publishToKafka]]
53+
* @param topic the topic to which publish the message (it will be auto-created)
54+
* @param message the [[String]] message to publish
55+
* @param config an implicit [[EmbeddedKafkaConfig]]
56+
* @throws KafkaUnavailableException if unable to connect to Kafka
57+
*/
4958
def publishStringMessageToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit =
5059
publishToKafka(topic, message)(config, new StringSerializer)
5160

5261
/**
53-
* Publishes asynchronously a message to the running Kafka broker.
62+
* Publishes synchronously a message to the running Kafka broker.
5463
*
5564
* @param topic the topic to which publish the message (it will be auto-created)
56-
* @param message the message to publish
65+
* @param message the message of type [[T]] to publish
5766
* @param config an implicit [[EmbeddedKafkaConfig]]
67+
* @param serializer an implicit [[Serializer]] for the type [[T]]
5868
* @throws KafkaUnavailableException if unable to connect to Kafka
5969
*/
6070
@throws(classOf[KafkaUnavailableException])
61-
def publishToKafka[T](topic: String, message: T)(implicit config: EmbeddedKafkaConfig, serializer: Serializer[T]): Unit = {
71+
def publishToKafka[T](topic: String, message: T)
72+
(implicit config: EmbeddedKafkaConfig, serializer: Serializer[T]): Unit = {
6273

6374
val kafkaProducer = new KafkaProducer(Map(
64-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
65-
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
66-
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
75+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
76+
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
77+
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
6778
), new StringSerializer, serializer)
6879

6980
val sendFuture = kafkaProducer.send(new ProducerRecord(topic, message))
70-
val sendResult = Try { sendFuture.get(3, SECONDS) }
81+
val sendResult = Try {
82+
sendFuture.get(3, SECONDS)
83+
}
7184

7285
kafkaProducer.close()
7386

@@ -83,7 +96,8 @@ trait EmbeddedKafka {
8396
*
8497
* @param topic the topic to consume a message from
8598
* @param config an implicit [[EmbeddedKafkaConfig]]
86-
* @return the first message consumed from the given topic
99+
* @param decoder an implicit [[Decoder]] for the type [[T]]
100+
* @return the first message consumed from the given topic, with a type [[T]]
87101
* @throws TimeoutException if unable to consume a message within 3 seconds
88102
* @throws KafkaUnavailableException if unable to connect to Kafka
89103
*/
@@ -100,12 +114,12 @@ trait EmbeddedKafka {
100114
Consumer.create(new ConsumerConfig(props))
101115
}.getOrElse(throw new KafkaUnavailableException)
102116

103-
val filter = Whitelist(topic)
104117
val messageStreams =
105-
consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = decoder)
118+
consumer.createMessageStreamsByFilter(Whitelist(topic), keyDecoder = new StringDecoder, valueDecoder = decoder)
106119

107-
val messageFuture = Future { messageStreams.headOption
108-
.getOrElse(throw new KafkaSpecException("Unable to find a message stream")).iterator().next().message()
120+
val messageFuture = Future {
121+
messageStreams.headOption
122+
.getOrElse(throw new KafkaSpecException("Unable to find a message stream")).iterator().next().message()
109123
}
110124

111125
try {
@@ -117,22 +131,19 @@ trait EmbeddedKafka {
117131

118132
object aKafkaProducer {
119133
def thatSerializesValuesWith[V](serializer: Class[_ <: Serializer[V]])(implicit config: EmbeddedKafkaConfig) = {
120-
new KafkaProducer[String, V]( basicKafkaConfig(config) + (
134+
new KafkaProducer[String, V](basicKafkaConfig(config) +(
121135
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
122-
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName
123-
))
136+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName))
124137
}
125-
138+
126139
def apply[V](implicit valueSerializer: Serializer[V], config: EmbeddedKafkaConfig) =
127140
new KafkaProducer[String, V](basicKafkaConfig(config), new StringSerializer, valueSerializer)
128141

129-
def basicKafkaConfig[V](config: EmbeddedKafkaConfig): Map[String, String] = {
130-
Map(
131-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
132-
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
133-
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
134-
)
135-
}
142+
def basicKafkaConfig[V](config: EmbeddedKafkaConfig): Map[String, String] = Map(
143+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
144+
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
145+
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
146+
)
136147
}
137148

138149
private def startZooKeeper(zooKeeperPort: Int): ServerCnxnFactory = {
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package net.manub.embeddedkafka.marshalling.avro
1+
package net.manub.embeddedkafka.avro
22

33
import java.io.ByteArrayOutputStream
44

@@ -9,10 +9,10 @@ import org.apache.avro.io._
99
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter, SpecificRecord}
1010
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
1111

12-
class KafkaAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null) extends Decoder[T]{
13-
private[this] val NoInstanceReuse = null.asInstanceOf[T]
14-
private[this] val NoDecoderReuse = null.asInstanceOf[BinaryDecoder]
15-
private[this] val reader: SpecificDatumReader[T] = new SpecificDatumReader[T](schema)
12+
class KafkaAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null) extends Decoder[T] {
13+
private val NoInstanceReuse = null.asInstanceOf[T]
14+
private val NoDecoderReuse = null.asInstanceOf[BinaryDecoder]
15+
private val reader = new SpecificDatumReader[T](schema)
1616

1717
override def fromBytes(bytes: Array[Byte]): T = {
1818
val decoder = DecoderFactory.get().binaryDecoder(bytes, NoDecoderReuse)
@@ -21,7 +21,7 @@ class KafkaAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiablePro
2121
}
2222

2323
class KafkaAvroEncoder[T <: SpecificRecord](props: VerifiableProperties = null) extends Encoder[T] {
24-
private[this] val NoEncoderReuse = null.asInstanceOf[BinaryEncoder]
24+
private val NoEncoderReuse = null.asInstanceOf[BinaryEncoder]
2525

2626
override def toBytes(nullableData: T): Array[Byte] = {
2727
Option(nullableData).fold[Array[Byte]](null) { data =>
@@ -35,26 +35,29 @@ class KafkaAvroEncoder[T <: SpecificRecord](props: VerifiableProperties = null)
3535

3636
out.toByteArray
3737
}
38-
3938
}
4039
}
4140

42-
class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema) extends Deserializer[T]{
43-
private[this] val decoder = new KafkaAvroDecoder[T](schema = schema)
44-
45-
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}
41+
class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema)
42+
extends Deserializer[T] with NoOpConfiguration with NoOpClose {
4643

47-
override def close(): Unit = {}
44+
private val decoder = new KafkaAvroDecoder[T](schema = schema)
4845

4946
override def deserialize(topic: String, data: Array[Byte]): T = decoder.fromBytes(data)
5047
}
5148

52-
class KafkaAvroSerializer[T <: SpecificRecord]() extends Serializer[T] {
53-
private[this] val encoder = new KafkaAvroEncoder[T]()
49+
class KafkaAvroSerializer[T <: SpecificRecord]()
50+
extends Serializer[T] with NoOpConfiguration with NoOpClose {
5451

55-
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}
52+
private val encoder = new KafkaAvroEncoder[T]()
5653

5754
override def serialize(topic: String, data: T): Array[Byte] = encoder.toBytes(data)
55+
}
56+
57+
sealed trait NoOpConfiguration {
58+
def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
59+
}
5860

59-
override def close(): Unit = {}
61+
sealed trait NoOpClose {
62+
def close(): Unit = ()
6063
}
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1-
package net.manub.embeddedkafka.marshalling
1+
package net.manub.embeddedkafka
22

3-
import kafka.serializer.{Encoder, Decoder}
3+
import kafka.serializer.{Decoder, Encoder}
44
import kafka.utils.VerifiableProperties
55
import org.apache.avro.Schema
66
import org.apache.avro.specific.SpecificRecord
77
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
88

99
package object avro {
10-
implicit def specificAvroSerializer[T <: SpecificRecord] : Serializer[T] = new KafkaAvroSerializer[T]
11-
implicit def specificAvroEncoder[T <: SpecificRecord] : Encoder[T] = new KafkaAvroEncoder[T]
1210

13-
def specificAvroDeserializer[T <: SpecificRecord](schema: Schema) : Deserializer[T] =
11+
implicit def specificAvroSerializer[T <: SpecificRecord]: Serializer[T] = new KafkaAvroSerializer[T]
12+
implicit def specificAvroEncoder[T <: SpecificRecord]: Encoder[T] = new KafkaAvroEncoder[T]
13+
14+
def specificAvroDeserializer[T <: SpecificRecord](schema: Schema): Deserializer[T] =
1415
new KafkaAvroDeserializer[T](schema)
1516

16-
def specificAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null) : Decoder[T] =
17+
def specificAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null): Decoder[T] =
1718
new KafkaAvroDecoder[T](schema, props)
1819
}
1920

0 commit comments

Comments
 (0)