Skip to content

Commit d1127eb

Browse files
committed
Formatted with scalafmt.
1 parent 0057f82 commit d1127eb

File tree

17 files changed

+255
-139
lines changed

17 files changed

+255
-139
lines changed

.scalafmt.conf

Whitespace-only changes.

build.sbt

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ lazy val commonSettings = Seq(
1919
javaOptions += "-Xmx1G"
2020
)
2121

22-
2322
lazy val commonLibrarySettings = libraryDependencies ++= Seq(
24-
"org.scalatest" %% "scalatest" % "3.0.1",
25-
"org.apache.kafka" %% "kafka" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
26-
"org.apache.zookeeper" % "zookeeper" % "3.4.8" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
27-
"org.apache.avro" % "avro" % "1.8.1" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
28-
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Test,
29-
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test
30-
)
23+
"org.scalatest" %% "scalatest" % "3.0.1",
24+
"org.apache.kafka" %% "kafka" % kafkaVersion exclude (slf4jLog4jOrg, slf4jLog4jArtifact),
25+
"org.apache.zookeeper" % "zookeeper" % "3.4.8" exclude (slf4jLog4jOrg, slf4jLog4jArtifact),
26+
"org.apache.avro" % "avro" % "1.8.1" exclude (slf4jLog4jOrg, slf4jLog4jArtifact),
27+
"com.typesafe.akka" %% "akka-actor" % akkaVersion % Test,
28+
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test
29+
)
3130

3231
lazy val publishSettings = Seq(
3332
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
@@ -65,7 +64,6 @@ lazy val root = (project in file("."))
6564
.settings(publishTo := Some(Resolver.defaultLocal))
6665
.aggregate(embeddedKafka, kafkaStreams)
6766

68-
6967
lazy val embeddedKafka = (project in file("embedded-kafka"))
7068
.settings(name := "scalatest-embedded-kafka")
7169
.settings(publishSettings: _*)
@@ -80,6 +78,6 @@ lazy val kafkaStreams = (project in file("kafka-streams"))
8078
.settings(commonLibrarySettings)
8179
.settings(releaseSettings: _*)
8280
.settings(libraryDependencies ++= Seq(
83-
"org.apache.kafka" % "kafka-streams" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact)
81+
"org.apache.kafka" % "kafka-streams" % kafkaVersion exclude (slf4jLog4jOrg, slf4jLog4jArtifact)
8482
))
8583
.dependsOn(embeddedKafka)

embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@ object ConsumerExtensions {
3636
* @param topic the topic to consume
3737
* @return the next batch of messages
3838
*/
39-
def getNextBatch(topic: String): Seq[(K, V)] = Try {
40-
import scala.collection.JavaConversions._
41-
consumer.subscribe(List(topic))
42-
consumer.partitionsFor(topic)
43-
val records = consumer.poll(2000)
44-
// use toList to force eager evaluation. toSeq is lazy
45-
records.iterator().toList.map(r => r.key -> r.value)
46-
}.recover {
47-
case ex: KafkaException => throw new KafkaUnavailableException(ex)
48-
}.get
39+
def getNextBatch(topic: String): Seq[(K, V)] =
40+
Try {
41+
import scala.collection.JavaConversions._
42+
consumer.subscribe(List(topic))
43+
consumer.partitionsFor(topic)
44+
val records = consumer.poll(2000)
45+
// use toList to force eager evaluation. toSeq is lazy
46+
records.iterator().toList.map(r => r.key -> r.value)
47+
}.recover {
48+
case ex: KafkaException => throw new KafkaUnavailableException(ex)
49+
}.get
4950
}
5051
}
51-

embedded-kafka/src/main/scala/net/manub/embeddedkafka/Consumers.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import org.apache.kafka.common.serialization.Deserializer
77

88
/** Utility trait for easily creating Kafka consumers and accessing their consumed messages. */
99
trait Consumers {
10+
1011
/** Loaner pattern that allows running a code block with a newly created consumer.
1112
* The consumer's lifecycle will be automatically handled and closed at the end of the
1213
* given code block.
@@ -18,8 +19,9 @@ trait Consumers {
1819
* @tparam T the type of the block's returning result
1920
* @return the result of the executed block
2021
*/
21-
def withConsumer[K: Deserializer, V: Deserializer, T](block: KafkaConsumer[K, V] => T)
22-
(implicit config: EmbeddedKafkaConfig): T = {
22+
def withConsumer[K: Deserializer, V: Deserializer, T](
23+
block: KafkaConsumer[K, V] => T)(
24+
implicit config: EmbeddedKafkaConfig): T = {
2325
val consumer = newConsumer[K, V]()
2426
try {
2527
val result = block(consumer)
@@ -35,8 +37,8 @@ trait Consumers {
3537
* @tparam T the type of the result of the code block
3638
* @return the code block result
3739
*/
38-
def withStringConsumer[T](block: KafkaConsumer[String, String] => T)
39-
(implicit config: EmbeddedKafkaConfig): T = {
40+
def withStringConsumer[T](block: KafkaConsumer[String, String] => T)(
41+
implicit config: EmbeddedKafkaConfig): T = {
4042
import net.manub.embeddedkafka.Codecs.stringDeserializer
4143
withConsumer(block)
4244
}
@@ -47,13 +49,15 @@ trait Consumers {
4749
* @tparam V the type of the consumer's Value
4850
* @return the new consumer
4951
*/
50-
def newConsumer[K: Deserializer, V: Deserializer]()
51-
(implicit config: EmbeddedKafkaConfig): KafkaConsumer[K, V] = {
52+
def newConsumer[K: Deserializer, V: Deserializer]()(
53+
implicit config: EmbeddedKafkaConfig): KafkaConsumer[K, V] = {
5254
val props = new Properties()
5355
props.put("group.id", UUIDs.newUuid().toString)
5456
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
5557
props.put("auto.offset.reset", "earliest")
5658

57-
new KafkaConsumer[K, V](props, implicitly[Deserializer[K]], implicitly[Deserializer[V]])
59+
new KafkaConsumer[K, V](props,
60+
implicitly[Deserializer[K]],
61+
implicitly[Deserializer[V]])
5862
}
5963
}

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,18 @@ import kafka.admin.AdminUtils
88
import kafka.server.{KafkaConfig, KafkaServer}
99
import kafka.utils.ZkUtils
1010
import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata}
11-
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
11+
import org.apache.kafka.clients.producer.{
12+
KafkaProducer,
13+
ProducerConfig,
14+
ProducerRecord
15+
}
1216
import org.apache.kafka.common.{KafkaException, TopicPartition}
13-
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
17+
import org.apache.kafka.common.serialization.{
18+
Deserializer,
19+
Serializer,
20+
StringDeserializer,
21+
StringSerializer
22+
}
1423
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1524
import org.scalatest.Suite
1625

@@ -120,7 +129,8 @@ sealed trait EmbeddedKafkaSupport {
120129
* @param body the function to execute
121130
* @param config an implicit [[EmbeddedKafkaConfig]]
122131
*/
123-
def withRunningKafka(body: => Any)(implicit config: EmbeddedKafkaConfig): Any = {
132+
def withRunningKafka(body: => Any)(
133+
implicit config: EmbeddedKafkaConfig): Any = {
124134

125135
def cleanLogs(directories: Directory*): Unit = {
126136
directories.foreach(_.deleteRecursively())
@@ -211,7 +221,8 @@ sealed trait EmbeddedKafkaSupport {
211221
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
212222
)
213223

214-
private def baseConsumerConfig(implicit config: EmbeddedKafkaConfig) : Properties = {
224+
private def baseConsumerConfig(
225+
implicit config: EmbeddedKafkaConfig): Properties = {
215226
val props = new Properties()
216227
props.put("group.id", s"embedded-kafka-spec")
217228
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
@@ -220,13 +231,19 @@ sealed trait EmbeddedKafkaSupport {
220231
props
221232
}
222233

223-
def consumeFirstStringMessageFrom(topic: String, autoCommit: Boolean = false)(
234+
def consumeFirstStringMessageFrom(topic: String,
235+
autoCommit: Boolean = false)(
224236
implicit config: EmbeddedKafkaConfig): String =
225-
consumeFirstMessageFrom(topic, autoCommit)(config, new StringDeserializer())
237+
consumeFirstMessageFrom(topic, autoCommit)(config,
238+
new StringDeserializer())
226239

227-
def consumeNumberStringMessagesFrom(topic: String, number: Int, autoCommit: Boolean = false)(
240+
def consumeNumberStringMessagesFrom(topic: String,
241+
number: Int,
242+
autoCommit: Boolean = false)(
228243
implicit config: EmbeddedKafkaConfig): List[String] =
229-
consumeNumberMessagesFrom(topic, number, autoCommit)(config, new StringDeserializer())
244+
consumeNumberMessagesFrom(topic, number, autoCommit)(
245+
config,
246+
new StringDeserializer())
230247

231248
/**
232249
* Consumes the first message available in a given topic, deserializing it as a String.
@@ -300,9 +317,11 @@ sealed trait EmbeddedKafkaSupport {
300317
* @throws TimeoutException if unable to consume a message within 5 seconds
301318
* @throws KafkaUnavailableException if unable to connect to Kafka
302319
*/
303-
def consumeNumberMessagesFrom[T](topic: String, number: Int, autoCommit: Boolean = false)(
304-
implicit config: EmbeddedKafkaConfig,
305-
deserializer: Deserializer[T]): List[T] = {
320+
def consumeNumberMessagesFrom[T](topic: String,
321+
number: Int,
322+
autoCommit: Boolean = false)(
323+
implicit config: EmbeddedKafkaConfig,
324+
deserializer: Deserializer[T]): List[T] = {
306325

307326
import scala.collection.JavaConverters._
308327

@@ -344,7 +363,6 @@ sealed trait EmbeddedKafkaSupport {
344363
}.get
345364
}
346365

347-
348366
object aKafkaProducer {
349367
private[this] var producers = Vector.empty[KafkaProducer[_, _]]
350368

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package net.manub.embeddedkafka
22

33
case class EmbeddedKafkaConfig(kafkaPort: Int = 6001,
44
zooKeeperPort: Int = 6000,
5-
customBrokerProperties: Map[String, String] = Map.empty)
5+
customBrokerProperties: Map[String, String] =
6+
Map.empty)
67

78
object EmbeddedKafkaConfig {
89
implicit val defaultConfig = EmbeddedKafkaConfig()

embedded-kafka/src/main/scala/net/manub/embeddedkafka/UUIDs.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import java.util.UUID
66
* Useful for separating IDs and directories across test cases.
77
*/
88
object UUIDs {
9+
910
/** Create a new unique ID.
1011
*
1112
* @return the unique ID

0 commit comments

Comments
 (0)