@@ -4,112 +4,117 @@ import io.github.adven27.concordion.extensions.exam.mq.MqTester
44import mu.KLogging
55import org.apache.kafka.clients.admin.AdminClient
66import org.apache.kafka.clients.admin.OffsetSpec
7+ import org.apache.kafka.clients.admin.RecordsToDelete.beforeOffset
78import org.apache.kafka.clients.consumer.ConsumerConfig
89import org.apache.kafka.clients.consumer.KafkaConsumer
910import org.apache.kafka.clients.consumer.OffsetAndMetadata
1011import org.apache.kafka.clients.producer.KafkaProducer
1112import org.apache.kafka.clients.producer.ProducerConfig
1213import org.apache.kafka.clients.producer.ProducerRecord
1314import org.apache.kafka.common.TopicPartition
14- import org.apache.kafka.common.serialization.LongDeserializer
15- import org.apache.kafka.common.serialization.LongSerializer
15+ import org.apache.kafka.common.header.internals.RecordHeader
1616import org.apache.kafka.common.serialization.StringDeserializer
1717import org.apache.kafka.common.serialization.StringSerializer
1818import java.time.Duration
1919import java.time.Duration.ofMillis
2020import java.time.Duration.ofSeconds
21- import java.util.Properties
2221import java.util.concurrent.TimeUnit
2322
2423@Suppress(" unused" )
25- open class KafkaTester @JvmOverloads constructor(
26- protected val bootstrapServers : String ,
27- protected val topic : String ,
28- protected val properties : Properties = DEFAULT_PROPERTIES ,
29- protected val pollTimeout : Duration = ofMillis(POLL_MILLIS ),
30- protected val partitionHeader : String = " partition"
31- ) : MqTester {
32- protected lateinit var producer: KafkaProducer <Long , String >
33- protected lateinit var consumer: KafkaConsumer <Long , String >
34-
35- override fun purge () = logger.info(" Purging topic {}..." , topic).also {
36- consumer.poll(ofMillis(POLL_MILLIS ))
37- logger.info(" Topic {} is purged" , topic)
38- }
39-
40- override fun receive (): List <MqTester .Message > = logger.info(" Reading from {}" , topic).let {
41- consumer.poll(pollTimeout).apply { consumer.commitAsync() }.map { MqTester .Message (it.value()) }
42- }
43-
44- override fun send (message : String , headers : Map <String , String >) = logger.info(" Sending to {}..." , topic).also {
45- producer.send(record(message, partitionFrom(headers))).get().apply {
46- logger.info(
47- " Sent to topic {} and partition {} with offset {}:\n {}" , topic(), partition(), offset(), message
48- )
49- }
50- }
51-
52- private fun partitionFrom (headers : Map <String , String >) = headers[partitionHeader]?.toInt()
53-
54- private fun record (value : String , partition : Int? ): ProducerRecord <Long , String > =
55- ProducerRecord (topic, partition, null , value)
24+ open class KafkaConsumeAndSendTester @JvmOverloads constructor(
25+ sutConsumerGroup : String ,
26+ bootstrapServers : String ,
27+ topic : String ,
28+ properties : MutableMap <String , Any ?> = (DEFAULT_PRODUCER_CONFIG + DEFAULT_CONSUMER_CONFIG ).toMutableMap(),
29+ pollTimeout : Duration = ofMillis(POLL_MILLIS ),
30+ accumulateOnRetries : Boolean = false
31+ ) : KafkaConsumeOnlyTester(sutConsumerGroup, bootstrapServers, topic, properties, pollTimeout, accumulateOnRetries) {
32+ protected lateinit var producer: KafkaProducer <String , String >
5633
5734 override fun start () {
35+ properties[ProducerConfig .CLIENT_ID_CONFIG ] = " kafka-tester-$topic "
5836 properties[ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ] = bootstrapServers
59- producer = KafkaProducer <Long , String >(properties)
60- consumer = KafkaConsumer <Long , String >(properties).apply { subscribe(listOf (topic)) }
61- logger.info(" KafkaTester started with properties:\n {}" , properties)
37+ producer = KafkaProducer <String , String >(properties)
38+ super .start()
6239 }
6340
6441 override fun stop () {
6542 producer.close(ofSeconds(4 ))
66- consumer.close(ofSeconds( 4 ) )
43+ super .stop( )
6744 }
6845
46+ override fun send (message : MqTester .Message , params : Map <String , String >) =
47+ logger.info(" Sending to {}..." , topic).also {
48+ producer.send(record(message, partitionFrom(params), keyFrom(params))).get().apply {
49+ logger.info(
50+ " Sent to topic {} and partition {} with offset {}:\n {}" , topic(), partition(), offset(), message
51+ )
52+ }
53+ }
54+
55+ private fun record (message : MqTester .Message , partition : Int? , key : String? ) = ProducerRecord (
56+ topic, partition, key, message.body, message.headers.map { RecordHeader (it.key, it.value.toByteArray()) }
57+ )
58+
6959 companion object : KLogging () {
60+ private const val POLL_MILLIS : Long = 1500
61+ private const val FETCH_CONSUMER_GROUP_OFFSETS_TIMEOUT : Long = 10
62+ private const val PARAM_PARTITION = " partition"
63+ private const val PARAM_KEY = " key"
7064
7165 @JvmField
72- val DEFAULT_PROPERTIES = Properties ().apply {
73- put(ProducerConfig .CLIENT_ID_CONFIG , " kafka-tester" )
74- put(ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , LongSerializer ::class .java.name)
75- put(ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer ::class .java.name)
76- put(ConsumerConfig .GROUP_ID_CONFIG , " kafka-tester" )
77- put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , LongDeserializer ::class .java.name)
78- put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer ::class .java.name)
79- put(ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG , " false" )
80- put(ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , " earliest" )
81- }
66+ val DEFAULT_PRODUCER_CONFIG : Map <String , String ?> = mapOf (
67+ ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG to StringSerializer ::class .java.name,
68+ ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer ::class .java.name,
69+ )
8270
83- protected const val POLL_MILLIS : Long = 1500
71+ private fun partitionFrom (headers : Map <String , String >) = headers[PARAM_PARTITION ]?.toInt()
72+ private fun keyFrom (headers : Map <String , String >) = headers[PARAM_KEY ]
8473 }
8574}
8675
87- @Suppress(" unused" )
88- open class KafkaMultiPartitionTester @JvmOverloads constructor(
89- private val sutConsumerGroup : String ,
90- bootstrapServers : String ,
91- topic : String ,
92- properties : Properties = DEFAULT_PROPERTIES ,
93- pollTimeout : Duration = ofMillis(POLL_MILLIS ),
94- partitionHeader : String = " partition"
95- ) : KafkaTester(bootstrapServers, topic, properties, pollTimeout, partitionHeader) {
96- private var adminClient: AdminClient ? = null
76+ @Suppress(" unused" , " TooManyFunctions" )
77+ open class KafkaConsumeOnlyTester @JvmOverloads constructor(
78+ protected val sutConsumerGroup : String ,
79+ protected val bootstrapServers : String ,
80+ protected val topic : String ,
81+ protected val properties : MutableMap <String , Any ?> = DEFAULT_CONSUMER_CONFIG .toMutableMap(),
82+ protected val pollTimeout : Duration = ofMillis(POLL_MILLIS ),
83+ protected val accumulateOnRetries : Boolean = false
84+ ) : MqTester {
85+ protected lateinit var consumer: KafkaConsumer <String , String >
86+ protected lateinit var adminClient: AdminClient
87+
88+ override fun accumulateOnRetries () = accumulateOnRetries
9789
9890 override fun start () {
99- super .start()
91+ properties[ConsumerConfig .GROUP_ID_CONFIG ] = " kafka-tester-$topic "
92+ consumer = KafkaConsumer <String , String >(properties).apply { subscribe(listOf (topic)) }
10093 adminClient = AdminClient .create(properties)
94+ logger.info(" KafkaTester started with properties:\n {}" , properties)
10195 }
10296
103- override fun accumulateOnRetries () = false
97+ override fun stop () {
98+ consumer.close(ofSeconds(4 ))
99+ }
100+
101+ override fun purge () = logger.info(" Purging topic {}..." , topic).also {
102+ adminClient.deleteRecords(sutOffsets().map { it.key to beforeOffset(it.value.offset()) }.toMap())
103+ logger.info(" Topic {} is purged" , topic)
104+ }
104105
105106 override fun receive (): List <MqTester .Message > = consumer.apply { seekTo(sutOffsets()) }.consume()
106107
108+ override fun send (message : MqTester .Message , params : Map <String , String >) {
109+ throw UnsupportedOperationException (" $javaClass doesn't support sending messages" )
110+ }
111+
107112 private fun sutOffsets (): Map <TopicPartition , OffsetAndMetadata > =
108- adminClient!! .listConsumerGroupOffsets(sutConsumerGroup)
113+ adminClient.listConsumerGroupOffsets(sutConsumerGroup)
109114 .partitionsToOffsetAndMetadata()[FETCH_CONSUMER_GROUP_OFFSETS_TIMEOUT , TimeUnit .SECONDS ]
110115 .apply { logger.info(" SUT offsets: {}" , this ) }
111116
112- private fun KafkaConsumer <Long , String >.seekTo (offsets : Map <TopicPartition , OffsetAndMetadata >) {
117+ private fun KafkaConsumer <String , String >.seekTo (offsets : Map <TopicPartition , OffsetAndMetadata >) {
113118 if (offsets.isEmpty()) {
114119 seekToBeginning()
115120 } else {
@@ -122,26 +127,35 @@ open class KafkaMultiPartitionTester @JvmOverloads constructor(
122127 }
123128
124129 private fun endOf (p : TopicPartition ): Long =
125- adminClient!! .listOffsets(mapOf (p to OffsetSpec .latest())).all().get()[p]?.offset() ? : 0L
130+ adminClient.listOffsets(mapOf (p to OffsetSpec .latest())).all().get()[p]?.offset() ? : 0L
126131
127- private fun KafkaConsumer <Long , String >.seekToBeginning () {
132+ private fun KafkaConsumer <String , String >.seekToBeginning () {
128133 // At this point, there is no heartbeat from consumer and seek() wont work... So call poll() first
129134 poll(pollTimeout)
130135 seekToBeginning(assignment())
131136 }
132137
133- private fun KafkaConsumer <Long , String >.seekTo (pointer : Long , p : TopicPartition ) {
138+ private fun KafkaConsumer <String , String >.seekTo (pointer : Long , p : TopicPartition ) {
134139 // At this point, there is no heartbeat from consumer and seek() wont work... So call poll() first
135140 poll(pollTimeout)
136141 seek(p, pointer)
137142 }
138143
139- private fun KafkaConsumer <Long , String >.consume (): List <MqTester .Message > = logger.info(" Consuming events..." ).let {
140- poll(pollTimeout).apply { commitAsync() }.map { MqTester .Message (it.value()) }
141- }
144+ private fun KafkaConsumer <String , String >.consume (): List <MqTester .Message > =
145+ logger.info(" Consuming events..." ).let {
146+ poll(pollTimeout).apply { commitAsync() }.map { MqTester .Message (it.value()) }
147+ }
142148
143149 companion object : KLogging () {
144150 private const val POLL_MILLIS : Long = 1500
145151 private const val FETCH_CONSUMER_GROUP_OFFSETS_TIMEOUT : Long = 10
152+
153+ @JvmField
154+ val DEFAULT_CONSUMER_CONFIG : Map <String , String ?> = mapOf (
155+ ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java.name,
156+ ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java.name,
157+ ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG to " false" ,
158+ ConsumerConfig .AUTO_OFFSET_RESET_CONFIG to " earliest" ,
159+ )
146160 }
147161}
0 commit comments