@@ -20,6 +20,7 @@ package io.streamthoughts.kafka.clients.consumer
2020
2121import ch.qos.logback.classic.Level
2222import io.streamthoughts.kafka.clients.consumer.ConsumerTask.State
23+ import io.streamthoughts.kafka.clients.consumer.error.ConsumedErrorHandler
2324import io.streamthoughts.kafka.clients.consumer.error.serialization.DeserializationErrorHandler
2425import io.streamthoughts.kafka.clients.consumer.listener.ConsumerBatchRecordsListener
2526import io.streamthoughts.kafka.clients.loggerFor
@@ -38,8 +39,9 @@ import org.apache.kafka.common.errors.WakeupException
3839import org.apache.kafka.common.serialization.ByteArrayDeserializer
3940import org.apache.kafka.common.serialization.Deserializer
4041import java.time.Duration
41- import java.util.*
42+ import java.util.LinkedList
4243import java.util.concurrent.CountDownLatch
44+ import java.util.concurrent.TimeUnit
4345import java.util.concurrent.atomic.AtomicBoolean
4446import kotlin.collections.HashMap
4547import kotlin.math.max
@@ -53,11 +55,16 @@ class KafkaConsumerTask<K, V>(
5355 private val listener : ConsumerBatchRecordsListener <K , V >,
5456 private var clientId : String = " " ,
5557 private val deserializationErrorHandler : DeserializationErrorHandler <K , V >,
58+ private val consumedErrorHandler : ConsumedErrorHandler ? = null ,
5659 private val consumerAwareRebalanceListener : ConsumerAwareRebalanceListener ? = null
5760) : ConsumerTask {
5861
5962 companion object {
6063 private val Log = loggerFor(KafkaConsumerTask ::class .java)
64+
65+ private fun <K , V > flatten (records : Map <TopicPartition , List <ConsumerRecord <K ?, V ?>>>): List <ConsumerRecord <K ?, V ?>> {
66+ return records.flatMap { (_, v) -> v }.toList()
67+ }
6168 }
6269
6370 @Volatile
@@ -134,10 +141,10 @@ class KafkaConsumerTask<K, V>(
134141 } catch (e: WakeupException ) {
135142 if (! isShutdown.get()) throw e
136143 else {
137- logWithConsumerInfo(Level .INFO , " Stop polling due to the io.streamthoughts.kafka.clients. consumer-task is being closed" )
144+ logWithConsumerInfo(Level .INFO , " Stop polling due to the consumer-task is being closed" )
138145 }
139146 } catch (e: CancellationException ) {
140- logWithConsumerInfo(Level .INFO , " Stop polling due to the io.streamthoughts.kafka.clients. consumer-task has been canceled" )
147+ logWithConsumerInfo(Level .INFO , " Stop polling due to the consumer-task has been canceled" )
141148 throw e
142149 } finally {
143150 state = State .PENDING_SHUTDOWN
@@ -160,21 +167,36 @@ class KafkaConsumerTask<K, V>(
160167 }
161168
162169 private fun pollOnce () {
163- val records : ConsumerRecords <ByteArray , ByteArray > = consumer.poll(pollTime)
170+ val rawRecords : ConsumerRecords <ByteArray , ByteArray > = consumer.poll(pollTime)
164171
165172 if (state == State .PARTITIONS_ASSIGNED ) {
166173 state = State .RUNNING
167174 }
168175
169- // deserialize all records using user-provided Deserializer
170- val deserialized : Map <TopicPartition , List <ConsumerRecord <K ?, V ?>>> =
171- records.partitions()
172- .map { Pair (it, deserialize(records.records(it))) }
173- .toMap()
176+ if (! rawRecords.isEmpty) {
177+ // deserialize all records using user-provided Deserializer
178+ val recordsPerPartitions: Map <TopicPartition , List <ConsumerRecord <K ?, V ?>>> =
179+ rawRecords.partitions()
180+ .map { Pair (it, deserialize(rawRecords.records(it))) }
181+ .toMap()
182+ try {
183+ processBatchRecords(ConsumerRecords (recordsPerPartitions))
184+ updateConsumedOffsets(rawRecords) // only update once all records from batch have been processed.
185+ mayCommitAfterBatch()
186+ } catch (e: Exception ) {
187+ mayHandleConsumedError(recordsPerPartitions, e)
188+ }
189+ }
190+ }
174191
175- processBatchRecords(ConsumerRecords (deserialized))
176- updateConsumedOffsets(records) // only update once all records from batch have been processed.
177- mayCommitAfterBatch()
192+ private fun mayHandleConsumedError (recordsPerPartitions : Map <TopicPartition , List <ConsumerRecord <K ?, V ?>>>,
193+ thrownException : Exception
194+ ) {
195+ consumedErrorHandler?.handle(
196+ this ,
197+ flatten(recordsPerPartitions),
198+ thrownException
199+ )
178200 }
179201
180202 private fun processBatchRecords (records : ConsumerRecords <K ?, V ?>) {
@@ -200,6 +222,19 @@ class KafkaConsumerTask<K, V>(
200222 shutdownLatch.await()
201223 }
202224
225+ override fun shutdown (timeout : Duration ) {
226+ logWithConsumerInfo(Level .INFO , " Closing" )
227+ isShutdown.set(true )
228+ consumer.wakeup()
229+ if (timeout != Duration .ZERO ) {
230+ try {
231+ shutdownLatch.await(timeout.toMillis(), TimeUnit .MILLISECONDS )
232+ } catch (e: InterruptedException ) {
233+ logWithConsumerInfo(Level .WARN , " Failed to close consumer before timeout" )
234+ }
235+ }
236+ }
237+
203238 private fun deserialize (records : List <ConsumerRecord <ByteArray , ByteArray >>): List <ConsumerRecord <K ?, V ?>> {
204239 val deserialized = LinkedList <ConsumerRecord <K ?, V ?>>()
205240 for (record : ConsumerRecord <ByteArray , ByteArray > in records) {
@@ -253,7 +288,7 @@ class KafkaConsumerTask<K, V>(
253288 state = State .PARTITIONS_REVOKED
254289 consumerAwareRebalanceListener?.onPartitionsRevokedBeforeCommit(consumer, partitions)
255290
256- doCommitSync (offsetAndMetadataToCommit())
291+ commitSync (offsetAndMetadataToCommit())
257292
258293 consumerAwareRebalanceListener?.onPartitionsRevokedAfterCommit(consumer, partitions)
259294 assignedPartitions.clear()
@@ -267,11 +302,11 @@ class KafkaConsumerTask<K, V>(
267302 }
268303 }
269304
270- private fun offsetAndMetadataToCommit () = consumedOffsets.map { Pair (it.key, OffsetAndMetadata (it.value)) }.toMap()
305+ private fun offsetAndMetadataToCommit () = consumedOffsets.map { Pair (it.key, OffsetAndMetadata (it.value + 1 )) }.toMap()
271306
272307 private fun mayCommitAfterBatch () {
273308 if (! isAutoCommitEnabled && consumedOffsets.isNotEmpty()) {
274- doCommitAsync (offsetAndMetadataToCommit())
309+ commitAsync (offsetAndMetadataToCommit())
275310 consumedOffsets.clear()
276311 }
277312 }
@@ -282,11 +317,11 @@ class KafkaConsumerTask<K, V>(
282317 val offset = consumer.position(topicPartition)
283318 Pair (topicPartition, OffsetAndMetadata (offset))
284319 }.toMap()
285- doCommitSync (positionsToCommit)
320+ commitSync (positionsToCommit)
286321 }
287322 }
288323
289- private fun doCommitAsync (offsets : Map <TopicPartition , OffsetAndMetadata >? = null ) {
324+ override fun commitAsync (offsets : Map <TopicPartition , OffsetAndMetadata >? ) {
290325 logWithConsumerInfo(Level .INFO , " Committing offsets async-synchronously for positions: $offsets " )
291326 consumer.commitAsync(offsets) {
292327 _, exception -> if (exception != null ) {
@@ -295,8 +330,8 @@ class KafkaConsumerTask<K, V>(
295330 }
296331 }
297332
298- private fun doCommitSync (offsets : Map <TopicPartition , OffsetAndMetadata >? = null ) {
299- if (consumer.assignment().isEmpty()) return // no need to commit if no partition is assign to this io.streamthoughts.kafka.clients. consumer
333+ override fun commitSync (offsets : Map <TopicPartition , OffsetAndMetadata >? ) {
334+ if (consumer.assignment().isEmpty()) return // no need to commit if no partition is assign to this consumer
300335 try {
301336 if (offsets == null ) {
302337 logWithConsumerInfo(Level .WARN , " Committing offsets synchronously for consumed records" )
@@ -307,7 +342,7 @@ class KafkaConsumerTask<K, V>(
307342 }
308343 logWithConsumerInfo(Level .WARN , " Offsets committed for partitions: $assignedPartitions " )
309344 } catch (e: RetriableCommitFailedException ) {
310- doCommitSync (offsets)
345+ commitSync (offsets)
311346 } catch (e : RebalanceInProgressException ) {
312347 logWithConsumerInfo(Level .WARN , " Error while committing offsets due to a rebalance in progress. Ignored" )
313348 }
0 commit comments