@@ -46,9 +46,10 @@ open class KafkaConsumeAndSendTester @JvmOverloads constructor(
4646 protected lateinit var producer: KafkaProducer <String , String >
4747
4848 override fun start () {
49- properties [ProducerConfig .CLIENT_ID_CONFIG ] = " kafka-tester-$topic "
50- properties [ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ] = bootstrapServers
49+ producerProperties [ProducerConfig .CLIENT_ID_CONFIG ] = " kafka-tester-$topic "
50+ producerProperties [ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ] = bootstrapServers
5151 producer = KafkaProducer <String , String >(producerProperties)
52+ logger.info(" Producer started with properties:\n {}" , producerProperties)
5253 super .start()
5354 }
5455
@@ -60,9 +61,7 @@ open class KafkaConsumeAndSendTester @JvmOverloads constructor(
6061 override fun send (message : Message , params : Map <String , String >) =
6162 logger.debug(" Sending to {}..." , topic).also {
6263 producer.send(record(message, partitionFrom(params), keyFrom(params))).get().apply {
63- logger.info(
64- " Sent to topic {} and partition {} with offset {}:\n {}" , topic(), partition(), offset(), message
65- )
64+ logger.info(" Message sent to {}-{} {}: {}" , topic(), partition(), offset(), message)
6665 }
6766 }
6867
@@ -92,7 +91,7 @@ open class KafkaConsumeOnlyTester @JvmOverloads constructor(
9291 protected val bootstrapServers : String ,
9392 protected val topic : String ,
9493 protected val sutConsumerGroup : String? ,
95- protected val properties : MutableMap <String , Any ?> = DEFAULT_CONSUMER_CONFIG .toMutableMap(),
94+ protected val consumerProperties : MutableMap <String , Any ?> = DEFAULT_CONSUMER_CONFIG .toMutableMap(),
9695 protected val pollTimeout : Duration = ofMillis(POLL_MILLIS ),
9796 protected val accumulateOnRetries : Boolean = false ,
9897 protected val recordMapper : (ConsumerRecord <String , String >) -> Message = DEFAULT_RECORD_MAPPER
@@ -103,12 +102,13 @@ open class KafkaConsumeOnlyTester @JvmOverloads constructor(
103102 override fun accumulateOnRetries () = accumulateOnRetries
104103
105104 override fun start () {
106- properties[ConsumerConfig .GROUP_ID_CONFIG ] = " kafka-tester-$topic "
107- consumer = KafkaConsumer <String , String >(properties).apply {
105+ consumerProperties[ConsumerConfig .GROUP_ID_CONFIG ] = " kafka-tester-$topic "
106+ consumerProperties[ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ] = bootstrapServers
107+ consumer = KafkaConsumer <String , String >(consumerProperties).apply {
108108 assign(partitionsFor(topic).map { TopicPartition (it.topic(), it.partition()) })
109109 }
110- adminClient = AdminClient .create(properties )
111- logger.info(" KafkaTester started with properties:\n {}" , properties )
110+ adminClient = AdminClient .create(consumerProperties )
111+ logger.info(" Consumer started with properties:\n {}" , consumerProperties )
112112 }
113113
114114 override fun stop () {
@@ -121,7 +121,7 @@ open class KafkaConsumeOnlyTester @JvmOverloads constructor(
121121 .map { it.key to RecordsToDelete .beforeOffset(it.value.offset()) }
122122 .associate { it.apply { logger.debug(" Purging partition {}" , this ) } }
123123 )
124- logger.info(" Topic {} is purged " , topic)
124+ logger.info(" Topic purged: {} " , topic)
125125 }
126126
127127 private fun listLatestOffsets () = adminClient.listOffsets(latestOffsetsSpec())
0 commit comments