@@ -4,13 +4,15 @@ import io.github.adven27.concordion.extensions.exam.mq.MqTester
44import mu.KLogging
55import org.apache.kafka.clients.admin.AdminClient
66import org.apache.kafka.clients.admin.OffsetSpec
7- import org.apache.kafka.clients.admin.RecordsToDelete.beforeOffset
7+ import org.apache.kafka.clients.admin.RecordsToDelete
8+ import org.apache.kafka.clients.admin.TopicDescription
89import org.apache.kafka.clients.consumer.ConsumerConfig
910import org.apache.kafka.clients.consumer.KafkaConsumer
1011import org.apache.kafka.clients.consumer.OffsetAndMetadata
1112import org.apache.kafka.clients.producer.KafkaProducer
1213import org.apache.kafka.clients.producer.ProducerConfig
1314import org.apache.kafka.clients.producer.ProducerRecord
15+ import org.apache.kafka.common.KafkaFuture
1416import org.apache.kafka.common.TopicPartition
1517import org.apache.kafka.common.header.internals.RecordHeader
1618import org.apache.kafka.common.serialization.StringDeserializer
@@ -99,10 +101,24 @@ open class KafkaConsumeOnlyTester @JvmOverloads constructor(
99101 }
100102
101103 override fun purge () = logger.info(" Purging topic {}..." , topic).also {
102- adminClient.deleteRecords(sutOffsets().map { it.key to beforeOffset(it.value.offset()) }.toMap())
104+ adminClient.deleteRecords(
105+ listOffsets()
106+ .map { it.key to RecordsToDelete .beforeOffset(it.value.offset()) }
107+ .associate { it.apply { logger.info(" Purging partition {}" , this ) } }
108+ )
103109 logger.info(" Topic {} is purged" , topic)
104110 }
105111
112+ private fun listOffsets () = adminClient.listOffsets(
113+ adminClient.describeTopics(listOf (topic))
114+ .values().values
115+ .flatMap { topicDesc -> topicDesc.toPartitions() }
116+ .associate { TopicPartition (topic, it) to OffsetSpec .latest() }
117+ ).all()[KAFKA_FETCHING_TIMEOUT , TimeUnit .SECONDS ]
118+
119+ private fun KafkaFuture<TopicDescription>.toPartitions () =
120+ this [KAFKA_FETCHING_TIMEOUT , TimeUnit .SECONDS ].partitions().map { it.partition() }
121+
106122 override fun receive (): List <MqTester .Message > = consumer.apply { seekTo(sutOffsets()) }.consume()
107123
108124 override fun send (message : MqTester .Message , params : Map <String , String >) {
@@ -111,11 +127,13 @@ open class KafkaConsumeOnlyTester @JvmOverloads constructor(
111127
112128 private fun sutOffsets (): Map <TopicPartition , OffsetAndMetadata > =
113129 adminClient.listConsumerGroupOffsets(sutConsumerGroup)
114- .partitionsToOffsetAndMetadata()[FETCH_CONSUMER_GROUP_OFFSETS_TIMEOUT , TimeUnit .SECONDS ]
115- .apply { logger.info(" SUT offsets: {}" , this ) }
130+ .partitionsToOffsetAndMetadata()[KAFKA_FETCHING_TIMEOUT , TimeUnit .SECONDS ]
131+ .filterKeys { it.topic() == topic }
132+ .apply { logger.info(" SUT [consumerGroup: {}] offsets: {}" , sutConsumerGroup, this ) }
116133
117134 private fun KafkaConsumer <String , String >.seekTo (offsets : Map <TopicPartition , OffsetAndMetadata >) {
118135 if (offsets.isEmpty()) {
136+ logger.info(" Offsets are empty - seek from beginning..." )
119137 seekToBeginning()
120138 } else {
121139 offsets.entries.map { it.key to it.value.offset() }.forEach { (partition, committed) ->
@@ -143,12 +161,16 @@ open class KafkaConsumeOnlyTester @JvmOverloads constructor(
143161
144162 private fun KafkaConsumer <String , String >.consume (): List <MqTester .Message > =
145163 logger.info(" Consuming events..." ).let {
146- poll(pollTimeout).apply { commitAsync() }.map { MqTester .Message (it.value()) }
164+ poll(pollTimeout).apply { commitAsync() }.map {
165+ MqTester .Message (it.value()).apply {
166+ logger.info(" Event consumed:\n {}" , this )
167+ }
168+ }
147169 }
148170
149171 companion object : KLogging () {
150172 private const val POLL_MILLIS : Long = 1500
151- private const val FETCH_CONSUMER_GROUP_OFFSETS_TIMEOUT : Long = 10
173+ private const val KAFKA_FETCHING_TIMEOUT : Long = 10
152174
153175 @JvmField
154176 val DEFAULT_CONSUMER_CONFIG : Map <String , String ?> = mapOf (
0 commit comments