Skip to content

Commit 5c15b5d

Browse files
committed
Made some methods some robust and added a couple of exceptions for exceptional cases.
1 parent 9eded9f commit 5c15b5d

File tree

3 files changed

+42
-15
lines changed

3 files changed

+42
-15
lines changed

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,22 @@ package net.manub.embeddedkafka
22

33
import java.net.InetSocketAddress
44
import java.util.Properties
5-
import java.util.concurrent.Executors
5+
import java.util.concurrent.{TimeUnit, Executors}
66

77
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
88
import kafka.serializer.StringDecoder
99
import kafka.server.{KafkaConfig, KafkaServer}
10-
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
10+
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
1111
import org.apache.kafka.common.serialization.StringSerializer
1212
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1313
import org.scalatest.Suite
1414

1515
import scala.collection.JavaConversions.mapAsJavaMap
1616
import scala.concurrent.duration._
17-
import scala.concurrent.{Await, ExecutionContext, Future}
17+
import scala.concurrent._
1818
import scala.language.postfixOps
1919
import scala.reflect.io.Directory
20+
import scala.util.Try
2021

2122
trait EmbeddedKafka {
2223

@@ -50,37 +51,49 @@ trait EmbeddedKafka {
5051
* @param topic the topic to which publish the message (it will be auto-created)
5152
* @param message the message to publish
5253
* @param config an implicit [[EmbeddedKafkaConfig]]
54+
* @throws KafkaUnavailableException if unable to connect to Kafka
5355
*/
56+
@throws(classOf[KafkaUnavailableException])
5457
def publishToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit = {
5558

56-
val producerProps = Map(
57-
"bootstrap.servers" -> s"localhost:${config.kafkaPort}",
58-
"key.serializer" -> classOf[StringSerializer].getName,
59-
"value.serializer" -> classOf[StringSerializer].getName
60-
)
59+
val kafkaProducer = new KafkaProducer[String, String](Map[String, String](
60+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
61+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
62+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
63+
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
64+
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
65+
))
6166

62-
val kafkaProducer = new KafkaProducer[String, String](producerProps)
67+
val sendFuture = kafkaProducer.send(new ProducerRecord[String, String](topic, message))
68+
val sendResult = Try { sendFuture.get(3, SECONDS) }
6369

64-
kafkaProducer.send(new ProducerRecord[String, String](topic, message))
6570
kafkaProducer.close()
71+
72+
if (sendResult.isFailure) throw new KafkaUnavailableException
6673
}
6774

6875

6976
/**
7077
* Consumes the first message available in a given topic, deserializing it as a String.
71-
* Throws a [[java.util.concurrent.TimeoutException]] if a message is not available in 3 seconds.
7278
*
7379
* @param topic the topic to consume a message from
7480
* @param config an implicit [[EmbeddedKafkaConfig]]
7581
* @return the first message consumed from the given topic
82+
* @throws TimeoutException if unable to consume a message within 3 seconds
83+
* @throws KafkaUnavailableException if unable to connect to Kafka
7684
*/
85+
@throws(classOf[TimeoutException])
86+
@throws(classOf[KafkaUnavailableException])
7787
def consumeFirstMessageFrom(topic: String)(implicit config: EmbeddedKafkaConfig): String = {
7888
val props = new Properties()
7989
props.put("group.id", "scalatest-embedded-kafka-spec")
8090
props.put("zookeeper.connect", s"localhost:${config.zooKeeperPort}")
8191
props.put("auto.offset.reset", "smallest")
92+
props.put("zookeeper.connection.timeout.ms", "6000")
8293

83-
val consumer = Consumer.create(new ConsumerConfig(props))
94+
val consumer = Try {
95+
Consumer.create(new ConsumerConfig(props))
96+
}.getOrElse(throw new KafkaUnavailableException)
8497

8598
val filter = Whitelist(topic)
8699
val messageStreams =
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package net.manub.embeddedkafka
2+
3+
class KafkaUnavailableException extends RuntimeException

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,16 @@ class EmbeddedKafkaSpec
132132
}
133133

134134
consumer.shutdown()
135-
136135
}
137136

138137
}
138+
139+
"throws a KafkaUnavailableException when Kafka is unavailable when trying to publish" in {
140+
141+
a[KafkaUnavailableException] shouldBe thrownBy {
142+
publishToKafka("non_existing_topic", "a message")
143+
}
144+
}
139145
}
140146

141147
"the consumeFirstMessageFrom method" should {
@@ -149,7 +155,7 @@ class EmbeddedKafkaSpec
149155

150156
val producer = new KafkaProducer[String, String](Map(
151157
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
152-
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
158+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
153159
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
154160
))
155161

@@ -164,12 +170,17 @@ class EmbeddedKafkaSpec
164170
"throws a TimeoutExeption when a message is not available" in {
165171

166172
withRunningKafka {
167-
168173
a[TimeoutException] shouldBe thrownBy {
169174
consumeFirstMessageFrom("non_existing_topic")
170175
}
171176
}
177+
}
172178

179+
"throws a KafkaUnavailableException when there's no running instance of Kafka" in {
180+
181+
a[KafkaUnavailableException] shouldBe thrownBy {
182+
consumeFirstMessageFrom("non_existing_topic")
183+
}
173184
}
174185
}
175186

0 commit comments

Comments
 (0)