Skip to content

Commit 6b3fe78

Browse files
jiminhsiehmanub
authored andcommitted
Clean up (#139)
* Underscore for unused variables * IntelliJ friendly: import `scala.util.Random` * Remove unused import * Bump Akka version * For clarity
1 parent e1758f3 commit 6b3fe78

File tree

8 files changed

+19
-26
lines changed

8 files changed

+19
-26
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ parallelExecution in ThisBuild := false
44

55
val kafkaVersion = "1.1.0"
66
val confluentVersion = "4.1.0"
7-
val akkaVersion = "2.5.11"
7+
val akkaVersion = "2.5.12"
88

99
lazy val commonSettings = Seq(
1010
organization := "net.manub",

embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ object ConsumerExtensions {
2929
retryConf: ConsumerRetryConfig = ConsumerRetryConfig()
3030
): Stream[T] = {
3131
val attempts = 1 to retryConf.maximumAttempts
32-
attempts.toStream.flatMap { attempt =>
32+
attempts.toStream.flatMap { _ =>
3333
val batch: Seq[T] = getNextBatch(retryConf.poll, topics)
3434
batch
3535
}

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package net.manub.embeddedkafka
22

33
import java.net.InetSocketAddress
4-
import java.util.Properties
5-
import java.util.concurrent.Executors
64

75
import kafka.server.{KafkaConfig, KafkaServer}
86
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
@@ -31,12 +29,7 @@ import org.scalatest.Suite
3129
import scala.collection.JavaConverters._
3230
import scala.collection.mutable.ListBuffer
3331
import scala.concurrent.duration._
34-
import scala.concurrent.{
35-
ExecutionContext,
36-
ExecutionContextExecutorService,
37-
TimeoutException
38-
}
39-
import scala.language.{higherKinds, postfixOps}
32+
import scala.concurrent.TimeoutException
4033
import scala.reflect.io.Directory
4134
import scala.util.Try
4235

@@ -120,7 +113,7 @@ object EmbeddedKafka extends EmbeddedKafkaSupport[EmbeddedKafkaConfig] {
120113
*/
121114
def stop(server: EmbeddedServer): Unit = {
122115
server.stop(true)
123-
servers = servers.filter(x => x != server)
116+
servers = servers.filter(_ != server)
124117
}
125118

126119
/**

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
55
import org.apache.kafka.clients.producer.ProducerConfig
66

77
import scala.language.postfixOps
8+
import scala.util.Random
89

910
class EmbeddedKafkaCustomConfigSpec
1011
extends EmbeddedKafkaSpecSupport
@@ -40,5 +41,5 @@ class EmbeddedKafkaCustomConfigSpec
4041
}
4142

4243
def generateMessageOfLength(length: Int): String =
43-
Stream.continually(util.Random.nextPrintableChar) take length mkString
44+
Stream.continually(Random.nextPrintableChar) take length mkString
4445
}

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaWithRunningKafkaSpec.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package net.manub.embeddedkafka
22

33
import org.scalatest.exceptions.TestFailedException
44

5-
import scala.language.postfixOps
6-
75
class EmbeddedKafkaWithRunningKafkaSpec
86
extends EmbeddedKafkaSpecSupport
97
with EmbeddedKafka {

embedded-kafka/src/test/scala/net/manub/embeddedkafka/embeddedKafkaSpecSupport.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,41 +41,41 @@ abstract class EmbeddedKafkaSpecSupport
4141
def kafkaIsAvailable(kafkaPort: Int = 6001): Unit = {
4242
system.actorOf(
4343
TcpClient.props(new InetSocketAddress("localhost", kafkaPort), testActor))
44-
expectMsg(1 second, ConnectionSuccessful)
44+
expectMsg(1 second, Connection.Success)
4545
}
4646

4747
def schemaRegistryIsAvailable(schemaRegistryPort: Int = 6002): Unit = {
4848
system.actorOf(
4949
TcpClient.props(new InetSocketAddress("localhost", schemaRegistryPort),
5050
testActor))
51-
expectMsg(1 second, ConnectionSuccessful)
51+
expectMsg(1 second, Connection.Success)
5252
}
5353

5454
def zookeeperIsAvailable(zookeeperPort: Int = 6000): Unit = {
5555
system.actorOf(
5656
TcpClient.props(new InetSocketAddress("localhost", zookeeperPort),
5757
testActor))
58-
expectMsg(1 second, ConnectionSuccessful)
58+
expectMsg(1 second, Connection.Success)
5959
}
6060

6161
def kafkaIsNotAvailable(kafkaPort: Int = 6001): Unit = {
6262
system.actorOf(
6363
TcpClient.props(new InetSocketAddress("localhost", kafkaPort), testActor))
64-
expectMsg(1 second, ConnectionFailed)
64+
expectMsg(1 second, Connection.Failure)
6565
}
6666

6767
def schemaRegistryIsNotAvailable(schemaRegistryPort: Int = 6002): Unit = {
6868
system.actorOf(
6969
TcpClient.props(new InetSocketAddress("localhost", schemaRegistryPort),
7070
testActor))
71-
expectMsg(1 second, ConnectionFailed)
71+
expectMsg(1 second, Connection.Failure)
7272
}
7373

7474
def zookeeperIsNotAvailable(zookeeperPort: Int = 6000): Unit = {
7575
system.actorOf(
7676
TcpClient.props(new InetSocketAddress("localhost", zookeeperPort),
7777
testActor))
78-
expectMsg(1 second, ConnectionFailed)
78+
expectMsg(1 second, Connection.Failure)
7979
}
8080
}
8181

@@ -84,8 +84,10 @@ object TcpClient {
8484
Props(new TcpClient(remote, replies))
8585
}
8686

87-
case object ConnectionSuccessful
88-
case object ConnectionFailed
87+
object Connection {
88+
object Success
89+
object Failure
90+
}
8991

9092
class TcpClient(remote: InetSocketAddress, listener: ActorRef) extends Actor {
9193

@@ -95,11 +97,11 @@ class TcpClient(remote: InetSocketAddress, listener: ActorRef) extends Actor {
9597

9698
def receive: Receive = {
9799
case Connected(_, _) =>
98-
listener ! ConnectionSuccessful
100+
listener ! Connection.Success
99101
context stop self
100102

101103
case _ =>
102-
listener ! ConnectionFailed
104+
listener ! Connection.Failure
103105
context stop self
104106
}
105107
}

kafka-streams/src/main/scala/net/manub/embeddedkafka/streams/TestStreamsConfig.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package net.manub.embeddedkafka.streams
22

33
import java.nio.file.Files
44

5-
import net.manub.embeddedkafka.avro
65
import net.manub.embeddedkafka.EmbeddedKafkaConfig
76
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetResetStrategy}
87
import org.apache.kafka.streams.StreamsConfig

schema-registry/src/test/scala/net/manub/embeddedkafka/schemaregistry/EmbeddedKafkaWithSchemaRegistryObjectSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package net.manub.embeddedkafka.schemaregistry
22

3-
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaSpecSupport}
3+
import net.manub.embeddedkafka.EmbeddedKafkaSpecSupport
44

55
class EmbeddedKafkaWithSchemaRegistryObjectSpec
66
extends EmbeddedKafkaSpecSupport {

0 commit comments

Comments
 (0)