@@ -5,6 +5,7 @@ import java.util.Properties
55import java .util .concurrent .Executors
66
77import kafka .admin .AdminUtils
8+ import kafka .server .KafkaConfig ._
89import kafka .server .{KafkaConfig , KafkaServer }
910import kafka .utils .ZkUtils
1011import org .apache .kafka .clients .consumer .{KafkaConsumer , OffsetAndMetadata }
@@ -32,7 +33,8 @@ import scala.language.{higherKinds, postfixOps}
3233import scala .reflect .io .Directory
3334import scala .util .Try
3435
35- trait EmbeddedKafka extends EmbeddedKafkaSupport { this : Suite =>
36+ trait EmbeddedKafka extends EmbeddedKafkaSupport {
37+ this : Suite =>
3638}
3739
3840object EmbeddedKafka extends EmbeddedKafkaSupport {
@@ -61,7 +63,7 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
6163 * Starts a Zookeeper instance in memory, storing logs in a specific location.
6264 *
6365 * @param zkLogsDir the path for the Zookeeper logs
64- * @param config an implicit [[EmbeddedKafkaConfig ]]
66+ * @param config an implicit [[EmbeddedKafkaConfig ]]
6567 */
6668 def startZooKeeper (zkLogsDir : Directory )(
6769 implicit config : EmbeddedKafkaConfig ): Unit = {
@@ -72,7 +74,7 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
7274 * Starts a Kafka broker in memory, storing logs in a specific location.
7375 *
7476 * @param kafkaLogDir the path for the Kafka logs
75- * @param config an implicit [[EmbeddedKafkaConfig ]]
77+ * @param config an implicit [[EmbeddedKafkaConfig ]]
7678 */
7779 def startKafka (kafkaLogDir : Directory )(
7880 implicit config : EmbeddedKafkaConfig ): Unit = {
@@ -133,7 +135,8 @@ sealed trait EmbeddedKafkaSupport {
133135 implicit config : EmbeddedKafkaConfig ): T = {
134136 withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
135137 withTempDir(" kafka" ) { kafkaLogsDir =>
136- val broker = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
138+ val broker =
139+ startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
137140 try {
138141 body
139142 } finally {
@@ -154,12 +157,16 @@ sealed trait EmbeddedKafkaSupport {
154157 * @param body the function to execute, given an [[EmbeddedKafkaConfig ]] with the actual
155158 * ports Kafka and ZooKeeper are running on
156159 */
157- def withRunningKafkaOnFoundPort [T ](config : EmbeddedKafkaConfig )(body : EmbeddedKafkaConfig => T ): T = {
160+ def withRunningKafkaOnFoundPort [T ](config : EmbeddedKafkaConfig )(
161+ body : EmbeddedKafkaConfig => T ): T = {
158162 withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
159163 withTempDir(" kafka" ) { kafkaLogsDir =>
160- val broker : KafkaServer = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
161- val kafkaPort = broker.boundPort(broker.config.listeners.head.listenerName)
162- val actualConfig = config.copy(kafkaPort = kafkaPort, zooKeeperPort = zkPort)
164+ val broker : KafkaServer =
165+ startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
166+ val kafkaPort =
167+ broker.boundPort(broker.config.listeners.head.listenerName)
168+ val actualConfig =
169+ config.copy(kafkaPort = kafkaPort, zooKeeperPort = zkPort)
163170 try {
164171 body(actualConfig)
165172 } finally {
@@ -254,20 +261,24 @@ sealed trait EmbeddedKafkaSupport {
254261 }
255262
256263 def kafkaProducer [K , T ](topic : String , key : K , message : T )(
257- implicit config : EmbeddedKafkaConfig ,
258- keySerializer : Serializer [K ],
259- serializer : Serializer [T ]) = new KafkaProducer [K , T ](baseProducerConfig.asJava, keySerializer, serializer)
260-
261- def kafkaConsumer [K , T ](
262- implicit config : EmbeddedKafkaConfig ,
263- keyDeserializer : Deserializer [K ],
264- deserializer : Deserializer [T ]) = new KafkaConsumer [K , T ](baseConsumerConfig, keyDeserializer, deserializer)
265-
266- private def baseProducerConfig (implicit config : EmbeddedKafkaConfig ) = Map [String , Object ](
267- ProducerConfig .BOOTSTRAP_SERVERS_CONFIG -> s " localhost: ${config.kafkaPort}" ,
268- ProducerConfig .MAX_BLOCK_MS_CONFIG -> 10000 .toString,
269- ProducerConfig .RETRY_BACKOFF_MS_CONFIG -> 1000 .toString
270- ) ++ config.customProducerProperties
264+ implicit config : EmbeddedKafkaConfig ,
265+ keySerializer : Serializer [K ],
266+ serializer : Serializer [T ]) =
267+ new KafkaProducer [K , T ](baseProducerConfig.asJava,
268+ keySerializer,
269+ serializer)
270+
271+ def kafkaConsumer [K , T ](implicit config : EmbeddedKafkaConfig ,
272+ keyDeserializer : Deserializer [K ],
273+ deserializer : Deserializer [T ]) =
274+ new KafkaConsumer [K , T ](baseConsumerConfig, keyDeserializer, deserializer)
275+
276+ private def baseProducerConfig (implicit config : EmbeddedKafkaConfig ) =
277+ Map [String , Object ](
278+ ProducerConfig .BOOTSTRAP_SERVERS_CONFIG -> s " localhost: ${config.kafkaPort}" ,
279+ ProducerConfig .MAX_BLOCK_MS_CONFIG -> 10000 .toString,
280+ ProducerConfig .RETRY_BACKOFF_MS_CONFIG -> 1000 .toString
281+ ) ++ config.customProducerProperties
271282
272283 private def baseConsumerConfig (
273284 implicit config : EmbeddedKafkaConfig ): Properties = {
@@ -280,11 +291,9 @@ sealed trait EmbeddedKafkaSupport {
280291 props
281292 }
282293
283- def consumeFirstStringMessageFrom (topic : String ,
284- autoCommit : Boolean = false )(
294+ def consumeFirstStringMessageFrom (topic : String , autoCommit : Boolean = false )(
285295 implicit config : EmbeddedKafkaConfig ): String =
286- consumeFirstMessageFrom(topic, autoCommit)(config,
287- new StringDeserializer ())
296+ consumeFirstMessageFrom(topic, autoCommit)(config, new StringDeserializer ())
288297
289298 def consumeNumberStringMessagesFrom (topic : String ,
290299 number : Int ,
@@ -315,14 +324,16 @@ sealed trait EmbeddedKafkaSupport {
315324 def consumeFirstMessageFrom [T ](topic : String , autoCommit : Boolean = false )(
316325 implicit config : EmbeddedKafkaConfig ,
317326 deserializer : Deserializer [T ]): T =
318- consumeNumberMessagesFrom(topic, 1 , autoCommit)(config, deserializer).head
327+ consumeNumberMessagesFrom(topic, 1 , autoCommit)(config, deserializer).head
319328
320329 def consumeNumberMessagesFrom [T ](topic : String ,
321330 number : Int ,
322331 autoCommit : Boolean = false )(
323332 implicit config : EmbeddedKafkaConfig ,
324333 deserializer : Deserializer [T ]): List [T ] =
325- consumeNumberMessagesFromTopics(Set (topic), number, autoCommit)(config, deserializer)(topic)
334+ consumeNumberMessagesFromTopics(Set (topic), number, autoCommit)(
335+ config,
336+ deserializer)(topic)
326337
327338 /**
328339 * Consumes the first n messages available in given topics, deserializes them as type [[T ]], and returns
@@ -331,18 +342,18 @@ sealed trait EmbeddedKafkaSupport {
331342 * Only the messages that are returned are committed if autoCommit is false.
332343 * If autoCommit is true then all messages that were polled will be committed.
333344 *
334- * @param topics the topics to consume messages from
335- * @param number the number of messages to consume in a batch
336- * @param autoCommit if false, only the offset for the consumed messages will be commited.
337- * if true, the offset for the last polled message will be committed instead.
338- * Defaulted to false.
339- * @param timeout the interval to wait for messages before throwing TimeoutException
345+ * @param topics the topics to consume messages from
346+ * @param number the number of messages to consume in a batch
347+ * @param autoCommit if false, only the offset for the consumed messages will be commited.
348+ * if true, the offset for the last polled message will be committed instead.
349+ * Defaulted to false.
350+ * @param timeout the interval to wait for messages before throwing TimeoutException
340351 * @param resetTimeoutOnEachMessage when true, throw TimeoutException if we have a silent period
341352 * (no incoming messages) for the timeout interval; when false,
342353 * throw TimeoutException after the timeout interval if we
343354 * haven't received all of the expected messages
344- * @param config an implicit [[EmbeddedKafkaConfig ]]
345- * @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer ]] for the type [[T ]]
355+ * @param config an implicit [[EmbeddedKafkaConfig ]]
356+ * @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer ]] for the type [[T ]]
346357 * @return the List of messages consumed from the given topics, each with a type [[T ]]
347358 * @throws TimeoutException if unable to consume messages within specified timeout
348359 * @throws KafkaUnavailableException if unable to connect to Kafka
@@ -351,7 +362,8 @@ sealed trait EmbeddedKafkaSupport {
351362 number : Int ,
352363 autoCommit : Boolean = false ,
353364 timeout : Duration = 5 .seconds,
354- resetTimeoutOnEachMessage : Boolean = true )(
365+ resetTimeoutOnEachMessage : Boolean =
366+ true )(
355367 implicit config : EmbeddedKafkaConfig ,
356368 deserializer : Deserializer [T ]): Map [String , List [T ]] = {
357369
@@ -387,7 +399,8 @@ sealed trait EmbeddedKafkaSupport {
387399 }
388400 }
389401 if (messagesRead < number) {
390- throw new TimeoutException (s " Unable to retrieve $number message(s) from Kafka in $timeout" )
402+ throw new TimeoutException (
403+ s " Unable to retrieve $number message(s) from Kafka in $timeout" )
391404 }
392405 messagesBuffers.map { case (topic, messages) => topic -> messages.toList }
393406 }
@@ -417,9 +430,10 @@ sealed trait EmbeddedKafkaSupport {
417430
418431 def apply [V ](implicit valueSerializer : Serializer [V ],
419432 config : EmbeddedKafkaConfig ): KafkaProducer [String , V ] = {
420- val producer = new KafkaProducer [String , V ](baseProducerConfig(config).asJava,
421- new StringSerializer ,
422- valueSerializer)
433+ val producer = new KafkaProducer [String , V ](
434+ baseProducerConfig(config).asJava,
435+ new StringSerializer ,
436+ valueSerializer)
423437 producers :+= producer
424438 producer
425439 }
@@ -445,19 +459,20 @@ sealed trait EmbeddedKafkaSupport {
445459 val listener = s " PLAINTEXT://localhost: ${config.kafkaPort}"
446460
447461 val properties = new Properties
448- properties.setProperty(" zookeeper.connect" , zkAddress)
449- properties.setProperty(" broker.id" , " 0" )
450- properties.setProperty(" listeners" , listener)
451- properties.setProperty(" advertised.listeners" , listener)
452- properties.setProperty(" auto.create.topics.enable" , " true" )
453- properties.setProperty(" log.dir" , kafkaLogDir.toAbsolute.path)
454- properties.setProperty(" log.flush.interval.messages" , 1 .toString)
455- properties.setProperty(" offsets.topic.replication.factor" , 1 .toString)
456- properties.setProperty(" offsets.topic.num.partitions" , 1 .toString)
457- properties.setProperty(" transaction.state.log.replication.factor" , 1 .toString)
462+ properties.setProperty(ZkConnectProp , zkAddress)
463+ properties.setProperty(BrokerIdProp , " 0" )
464+ properties.setProperty(ListenersProp , listener)
465+ properties.setProperty(AdvertisedListenersProp , listener)
466+ properties.setProperty(AutoCreateTopicsEnableProp , " true" )
467+ properties.setProperty(LogDirProp , kafkaLogDir.toAbsolute.path)
468+ properties.setProperty(LogFlushIntervalMessagesProp , 1 .toString)
469+ properties.setProperty(OffsetsTopicReplicationFactorProp , 1 .toString)
470+ properties.setProperty(OffsetsTopicPartitionsProp , 1 .toString)
471+ properties.setProperty(TransactionsTopicReplicationFactorProp , 1 .toString)
472+ properties.setProperty(TransactionsTopicMinISRProp , 1 .toString)
458473
459474 // The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory
460- properties.setProperty(" log.cleaner.dedupe.buffer.size " , " 1048577" )
475+ properties.setProperty(LogCleanerDedupeBufferSizeProp , " 1048577" )
461476
462477 config.customBrokerProperties.foreach {
463478 case (key, value) => properties.setProperty(key, value)
0 commit comments