@@ -5,30 +5,20 @@ import java.util.Properties
55import java .util .concurrent .Executors
66
77import kafka .admin .AdminUtils
8- import kafka .server .KafkaConfig ._
98import kafka .server .{KafkaConfig , KafkaServer }
109import kafka .utils .ZkUtils
1110import org .apache .kafka .clients .consumer .{KafkaConsumer , OffsetAndMetadata }
12- import org .apache .kafka .clients .producer .{
13- KafkaProducer ,
14- ProducerConfig ,
15- ProducerRecord
16- }
11+ import org .apache .kafka .clients .producer .{KafkaProducer , ProducerConfig , ProducerRecord }
12+ import org .apache .kafka .common .serialization .{Deserializer , Serializer , StringDeserializer , StringSerializer }
1713import org .apache .kafka .common .{KafkaException , TopicPartition }
18- import org .apache .kafka .common .serialization .{
19- Deserializer ,
20- Serializer ,
21- StringDeserializer ,
22- StringSerializer
23- }
2414import org .apache .zookeeper .server .{ServerCnxnFactory , ZooKeeperServer }
2515import org .scalatest .Suite
2616
2717import scala .collection .JavaConverters ._
2818import scala .collection .mutable
2919import scala .collection .mutable .ListBuffer
3020import scala .concurrent .duration ._
31- import scala .concurrent .{ExecutionContext , TimeoutException }
21+ import scala .concurrent .{ExecutionContext , ExecutionContextExecutorService , TimeoutException }
3222import scala .language .{higherKinds , postfixOps }
3323import scala .reflect .io .Directory
3424import scala .util .Try
@@ -118,7 +108,7 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
118108
119109sealed trait EmbeddedKafkaSupport {
120110 private val executorService = Executors .newFixedThreadPool(2 )
121- implicit private val executionContext =
111+ implicit private val executionContext : ExecutionContextExecutorService =
122112 ExecutionContext .fromExecutorService(executorService)
123113
124114 val zkSessionTimeoutMs = 10000
@@ -136,7 +126,7 @@ sealed trait EmbeddedKafkaSupport {
136126 withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
137127 withTempDir(" kafka" ) { kafkaLogsDir =>
138128 val broker =
139- startKafka(config.copy(zooKeeperPort = zkPort) , kafkaLogsDir)
129+ startKafka(config.kafkaPort, zkPort, config.customBrokerProperties , kafkaLogsDir)
140130 try {
141131 body
142132 } finally {
@@ -162,11 +152,11 @@ sealed trait EmbeddedKafkaSupport {
162152 withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
163153 withTempDir(" kafka" ) { kafkaLogsDir =>
164154 val broker : KafkaServer =
165- startKafka(config.copy(zooKeeperPort = zkPort) , kafkaLogsDir)
155+ startKafka(config.kafkaPort, zkPort, config.customBrokerProperties , kafkaLogsDir)
166156 val kafkaPort =
167157 broker.boundPort(broker.config.listeners.head.listenerName)
168158 val actualConfig =
169- config.copy (kafkaPort = kafkaPort, zooKeeperPort = zkPort )
159+ EmbeddedKafkaConfigImpl (kafkaPort, zkPort, config.customBrokerProperties, config.customProducerProperties, config.customConsumerProperties )
170160 try {
171161 body(actualConfig)
172162 } finally {
@@ -556,10 +546,12 @@ sealed trait EmbeddedKafkaSupport {
556546 factory
557547 }
558548
559- def startKafka (config : EmbeddedKafkaConfig ,
560- kafkaLogDir : Directory ): KafkaServer = {
561- val zkAddress = s " localhost: ${config.zooKeeperPort}"
562- val listener = s " PLAINTEXT://localhost: ${config.kafkaPort}"
549+ private def startKafka (kafkaPort : Int ,
550+ zooKeeperPort : Int ,
551+ customBrokerProperties : Map [String , String ],
552+ kafkaLogDir : Directory ) = {
553+ val zkAddress = s " localhost: $zooKeeperPort"
554+ val listener = s " PLAINTEXT://localhost: $kafkaPort"
563555
564556 val properties = new Properties
565557 properties.setProperty(" zookeeper.connect" , zkAddress)
@@ -577,7 +569,7 @@ sealed trait EmbeddedKafkaSupport {
577569 // The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory
578570 properties.setProperty(" log.cleaner.dedupe.buffer.size" , " 1048577" )
579571
580- config. customBrokerProperties.foreach {
572+ customBrokerProperties.foreach {
581573 case (key, value) => properties.setProperty(key, value)
582574 }
583575
@@ -586,6 +578,11 @@ sealed trait EmbeddedKafkaSupport {
586578 broker
587579 }
588580
581+ def startKafka (config : EmbeddedKafkaConfig ,
582+ kafkaLogDir : Directory ): KafkaServer = {
583+ startKafka(config.kafkaPort, config.zooKeeperPort, config.customBrokerProperties, kafkaLogDir)
584+ }
585+
589586 /**
590587 * Creates a topic with a custom configuration
591588 *
0 commit comments