Skip to content

Commit fb2d19c

Browse files
authored
Merge pull request #203 from conker84/issue_199
fixes #199: Refactor the KafkaConsumer record Management
2 parents 9ad3d47 + 9a18cdc commit fb2d19c

File tree

4 files changed

+106
-152
lines changed

4 files changed

+106
-152
lines changed

common/src/main/kotlin/streams/extensions/CommonExtensions.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package streams.extensions
22

3+
import org.apache.kafka.clients.consumer.ConsumerRecord
4+
import org.apache.kafka.clients.consumer.OffsetAndMetadata
5+
import org.apache.kafka.common.TopicPartition
36
import org.neo4j.graphdb.Node
47
import javax.lang.model.SourceVersion
58

@@ -26,4 +29,6 @@ fun Map<String, Any?>.flatten(map: Map<String, Any?> = this, prefix: String = ""
2629
listOf(newKey to value)
2730
}
2831
}.toMap()
29-
}
32+
}
33+
fun <K, V> ConsumerRecord<K, V>.topicPartition() = TopicPartition(this.topic(), this.partition())
34+
fun <K, V> ConsumerRecord<K, V>.offsetAndMetadata(metadata: String = "") = OffsetAndMetadata(this.offset() + 1, metadata)

consumer/src/main/kotlin/streams/StreamsEventSinkQueryExecution.kt

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,11 @@ class StreamsEventSinkQueryExecution(private val streamsTopicService: StreamsTop
2424

2525
override fun write(query: String, params: Collection<Any>) {
2626
if (Neo4jUtils.isWriteableInstance(db)) {
27-
try {
28-
val result = db.execute(query, mapOf("events" to params))
29-
if (log.isDebugEnabled) {
30-
log.debug("Query statistics:\n${result.queryStatistics}")
31-
}
32-
result.close()
33-
} catch (e: Exception) {
34-
log.error("Error while executing the query", e)
27+
val result = db.execute(query, mapOf("events" to params))
28+
if (log.isDebugEnabled) {
29+
log.debug("Query statistics:\n${result.queryStatistics}")
3530
}
31+
result.close()
3632
} else {
3733
if (log.isDebugEnabled) {
3834
log.debug("Not writeable instance")

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 59 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
package streams.kafka
22

33
import kotlinx.coroutines.*
4-
import org.apache.kafka.clients.consumer.*
4+
import org.apache.kafka.clients.consumer.ConsumerConfig
5+
import org.apache.kafka.clients.consumer.ConsumerRecord
6+
import org.apache.kafka.clients.consumer.KafkaConsumer
7+
import org.apache.kafka.clients.consumer.OffsetAndMetadata
58
import org.apache.kafka.common.TopicPartition
69
import org.neo4j.kernel.configuration.Config
710
import org.neo4j.kernel.internal.GraphDatabaseAPI
811
import org.neo4j.logging.Log
912
import streams.*
13+
import streams.extensions.offsetAndMetadata
14+
import streams.extensions.topicPartition
1015
import streams.serialization.JSONUtils
1116
import streams.utils.Neo4jUtils
12-
import streams.utils.StreamsUtils
13-
import java.util.concurrent.ConcurrentHashMap
1417
import java.util.concurrent.TimeUnit
1518

1619

@@ -47,7 +50,7 @@ class KafkaEventSink(private val config: Config,
4750
}
4851
}
4952

50-
override fun start() {
53+
override fun start() { // TODO move to the abstract class
5154
val streamsConfig = StreamsSinkConfiguration.from(config)
5255
val topics = streamsTopicService.getTopics()
5356
val isWriteableInstance = Neo4jUtils.isWriteableInstance(db)
@@ -74,15 +77,15 @@ class KafkaEventSink(private val config: Config,
7477
log.info("Kafka Sink started")
7578
}
7679

77-
override fun stop() = runBlocking {
80+
override fun stop() = runBlocking { // TODO move to the abstract class
7881
log.info("Stopping Kafka Sink daemon Job")
7982
try {
8083
job.cancelAndJoin()
8184
log.info("Kafka Sink daemon Job stopped")
8285
} catch (e : UninitializedPropertyAccessException) { /* ignoring this one only */ }
8386
}
8487

85-
override fun getEventSinkConfigMapper(): StreamsEventSinkConfigMapper {
88+
override fun getEventSinkConfigMapper(): StreamsEventSinkConfigMapper { // TODO move to the abstract class
8689
return object: StreamsEventSinkConfigMapper(streamsConfigMap, mappingKeys) {
8790
override fun convert(config: Map<String, String>): Map<String, String> {
8891
val props = streamsConfigMap
@@ -172,37 +175,47 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
172175
}
173176

174177
override fun stop() {
175-
StreamsUtils.ignoreExceptions({ consumer.close() }, UninitializedPropertyAccessException::class.java)
178+
consumer.close()
176179
}
177180

178181
private fun readSimple(action: (String, List<Any>) -> Unit) {
179182
val records = consumer.poll(0)
180-
if (!records.isEmpty) {
181-
try {
182-
this.topics.forEach { topic ->
183-
val topicRecords = records.records(topic)
184-
if (!topicRecords.iterator().hasNext()) {
185-
return@forEach
186-
}
187-
action(topic, topicRecords.map { JSONUtils.readValue<Any>(it.value()) })
188-
}
189-
} catch (e: Exception) {
190-
// TODO add dead letter queue
191-
}
183+
this.topics
184+
.filter { topic -> records.records(topic).iterator().hasNext() }
185+
.map { topic -> topic to records.records(topic) }
186+
.forEach { (topic, topicRecords) -> executeAction(action, topic, topicRecords) }
187+
}
188+
189+
fun executeAction(action: (String, List<Any>) -> Unit, topic: String, topicRecords: MutableIterable<ConsumerRecord<String, ByteArray>>) {
190+
try {
191+
action(topic, convert(topicRecords))
192+
} catch (e: Exception) {
193+
// TODO send to the DLQ
192194
}
193195
}
194196

197+
private fun convert(topicRecords: MutableIterable<ConsumerRecord<String, ByteArray>>) = topicRecords
198+
.map {
199+
try {
200+
"ok" to JSONUtils.readValue<Any>(it.value())
201+
} catch (e: Exception) {
202+
"error" to it
203+
}
204+
}
205+
.groupBy({ it.first }, { it.second })
206+
.let {
207+
// TODO send content of the "error" key to the DLQ
208+
it.getOrDefault("ok", emptyList())
209+
}
210+
195211
private fun readFromPartition(config: KafkaTopicConfig, action: (String, List<Any>) -> Unit) {
196212
setSeek(config.topicPartitionsMap)
197213
val records = consumer.poll(0)
198-
val consumerRecordsMap = toConsumerRecordsMap(config.topicPartitionsMap, records)
199-
if (consumerRecordsMap.isNotEmpty()) {
200-
try {
201-
consumerRecordsMap.forEach { action(it.key.topic(), it.value.map { JSONUtils.readValue<Any>(it.value()) }) }
202-
} catch (e: Exception) {
203-
// TODO add dead letter queue
204-
}
205-
}
214+
config.topicPartitionsMap
215+
.mapValues { records.records(it.key) }
216+
.filterValues { it.isNotEmpty() }
217+
.mapKeys { it.key.topic() }
218+
.forEach { (topic, topicRecords) -> executeAction(action, topic, topicRecords) }
206219
}
207220

208221
override fun read(action: (String, List<Any>) -> Unit) {
@@ -218,14 +231,6 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
218231
}
219232
}
220233

221-
fun toConsumerRecordsMap(topicPartitionsMap: Map<TopicPartition, Long>,
222-
records: ConsumerRecords<String, ByteArray>)
223-
: Map<TopicPartition, List<ConsumerRecord<String, ByteArray>>> = topicPartitionsMap
224-
.mapValues {
225-
records.records(it.key)
226-
}
227-
.filterValues { it.isNotEmpty() }
228-
229234
fun setSeek(topicPartitionsMap: Map<TopicPartition, Long>) {
230235
if (isSeekSet) {
231236
return
@@ -245,65 +250,45 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
245250
class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
246251
private val log: Log): KafkaAutoCommitEventConsumer(config, log) {
247252

248-
private val topicPartitionOffsetMap = ConcurrentHashMap<TopicPartition, OffsetAndMetadata>()
249-
250253
override fun start() {
251254
if (topics.isEmpty()) {
252255
log.info("No topics specified Kafka Consumer will not started")
253256
return
254257
}
255-
this.consumer.subscribe(topics, StreamsConsumerRebalanceListener(topicPartitionOffsetMap, consumer, config.autoOffsetReset, log))
258+
this.consumer.subscribe(topics)
256259
}
257260

258261
private fun readSimple(action: (String, List<Any>) -> Unit): Map<TopicPartition, OffsetAndMetadata> {
259-
val topicMap = mutableMapOf<TopicPartition, OffsetAndMetadata>()
260262
val records = consumer.poll(0)
261-
if (!records.isEmpty) {
262-
this.topics.forEach { topic ->
263-
val topicRecords = records.records(topic)
264-
if (!topicRecords.iterator().hasNext()) {
265-
return@forEach
263+
return this.topics
264+
.filter { topic -> records.records(topic).iterator().hasNext() }
265+
.map { topic -> topic to records.records(topic) }
266+
.map { (topic, topicRecords) ->
267+
executeAction(action, topic, topicRecords)
268+
topicRecords.last()
266269
}
267-
val lastRecord = topicRecords.last()
268-
val offsetAndMetadata = OffsetAndMetadata(lastRecord.offset(), "")
269-
val topicPartition = TopicPartition(lastRecord.topic(), lastRecord.partition())
270-
topicMap[topicPartition] = offsetAndMetadata
271-
topicPartitionOffsetMap[topicPartition] = offsetAndMetadata
272-
try {
273-
action(topic, topicRecords.map { JSONUtils.readValue<Any>(it.value()) })
274-
} catch (e: Exception) {
275-
// TODO add dead letter queue
276-
}
277-
}
278-
}
279-
return topicMap
270+
.map { it.topicPartition() to it.offsetAndMetadata() }
271+
.toMap()
280272
}
281273

282274
private fun readFromPartition(kafkaTopicConfig: KafkaTopicConfig,
283275
action: (String, List<Any>) -> Unit): Map<TopicPartition, OffsetAndMetadata> {
284-
val topicMap = mutableMapOf<TopicPartition, OffsetAndMetadata>()
285276
setSeek(kafkaTopicConfig.topicPartitionsMap)
286277
val records = consumer.poll(0)
287-
val consumerRecordsMap = toConsumerRecordsMap(kafkaTopicConfig.topicPartitionsMap, records)
288-
if (consumerRecordsMap.isNotEmpty()) {
289-
try {
290-
consumerRecordsMap.forEach {
291-
val lastRecord = it.value.last()
292-
val topicPartition = TopicPartition(lastRecord.topic(), lastRecord.partition())
293-
val offsetAndMetadata = OffsetAndMetadata(lastRecord.offset(), "")
294-
topicMap[topicPartition] = offsetAndMetadata
295-
topicPartitionOffsetMap[topicPartition] = offsetAndMetadata
296-
action(it.key.topic(), it.value.map { JSONUtils.readValue<Any>(it.value()) })
278+
return kafkaTopicConfig.topicPartitionsMap
279+
.mapValues { records.records(it.key) }
280+
.filterValues { it.isNotEmpty() }
281+
.mapKeys { it.key.topic() }
282+
.map { (topic, topicRecords) ->
283+
executeAction(action, topic, topicRecords)
284+
topicRecords.last()
297285
}
298-
} catch (e: Exception) {
299-
// TODO add dead letter queue
300-
}
301-
}
302-
return topicMap
286+
.map { it.topicPartition() to it.offsetAndMetadata() }
287+
.toMap()
303288
}
304289

305290
private fun commitData(commit: Boolean, topicMap: Map<TopicPartition, OffsetAndMetadata>) {
306-
if (commit && topicMap.isNotEmpty()) {
291+
if (commit) {
307292
consumer.commitSync(topicMap)
308293
}
309294
}
@@ -322,41 +307,4 @@ class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
322307
}
323308
commitData(kafkaTopicConfig.commit, topicMap)
324309
}
325-
}
326-
327-
class StreamsConsumerRebalanceListener(private val topicPartitionOffsetMap: Map<TopicPartition, OffsetAndMetadata>,
328-
private val consumer: KafkaConsumer<String, ByteArray>,
329-
private val autoOffsetReset: String,
330-
private val log: Log): ConsumerRebalanceListener {
331-
332-
override fun onPartitionsRevoked(partitions: Collection<TopicPartition>) {
333-
val offsets = partitions
334-
.map {
335-
val offset = consumer.position(it)
336-
if (log.isDebugEnabled) {
337-
log.debug("onPartitionsRevoked: for topic ${it.topic()} partition ${it.partition()}, the last saved offset is: $offset")
338-
}
339-
it to OffsetAndMetadata(offset, "")
340-
}
341-
.toMap()
342-
consumer.commitSync(offsets)
343-
}
344-
345-
override fun onPartitionsAssigned(partitions: Collection<TopicPartition>) {
346-
for (partition in partitions) {
347-
val offset = (topicPartitionOffsetMap[partition] ?: consumer.committed(partition))?.offset()
348-
if (log.isDebugEnabled) {
349-
log.debug("onPartitionsAssigned: for ${partition.topic()} partition ${partition.partition()}, the retrieved offset is: $offset")
350-
}
351-
if (offset == null) {
352-
when (autoOffsetReset) {
353-
"latest" -> consumer.seekToEnd(listOf(partition))
354-
"earliest" -> consumer.seekToBeginning(listOf(partition))
355-
else -> throw RuntimeException("No kafka.auto.offset.reset property specified")
356-
}
357-
} else {
358-
consumer.seek(partition, offset + 1)
359-
}
360-
}
361-
}
362310
}

0 commit comments

Comments
 (0)