11package streams.kafka
22
33import kotlinx.coroutines.*
4- import org.apache.kafka.clients.consumer.*
4+ import org.apache.kafka.clients.consumer.ConsumerConfig
5+ import org.apache.kafka.clients.consumer.ConsumerRecord
6+ import org.apache.kafka.clients.consumer.KafkaConsumer
7+ import org.apache.kafka.clients.consumer.OffsetAndMetadata
58import org.apache.kafka.common.TopicPartition
69import org.neo4j.kernel.configuration.Config
710import org.neo4j.kernel.internal.GraphDatabaseAPI
811import org.neo4j.logging.Log
912import streams.*
13+ import streams.extensions.offsetAndMetadata
14+ import streams.extensions.topicPartition
1015import streams.serialization.JSONUtils
1116import streams.utils.Neo4jUtils
12- import streams.utils.StreamsUtils
13- import java.util.concurrent.ConcurrentHashMap
1417import java.util.concurrent.TimeUnit
1518
1619
@@ -47,7 +50,7 @@ class KafkaEventSink(private val config: Config,
4750 }
4851 }
4952
50- override fun start () {
53+ override fun start () { // TODO move to the abstract class
5154 val streamsConfig = StreamsSinkConfiguration .from(config)
5255 val topics = streamsTopicService.getTopics()
5356 val isWriteableInstance = Neo4jUtils .isWriteableInstance(db)
@@ -74,15 +77,15 @@ class KafkaEventSink(private val config: Config,
7477 log.info(" Kafka Sink started" )
7578 }
7679
77- override fun stop () = runBlocking {
80+ override fun stop () = runBlocking { // TODO move to the abstract class
7881 log.info(" Stopping Kafka Sink daemon Job" )
7982 try {
8083 job.cancelAndJoin()
8184 log.info(" Kafka Sink daemon Job stopped" )
8285 } catch (e : UninitializedPropertyAccessException ) { /* ignoring this one only */ }
8386 }
8487
85- override fun getEventSinkConfigMapper (): StreamsEventSinkConfigMapper {
88+ override fun getEventSinkConfigMapper (): StreamsEventSinkConfigMapper { // TODO move to the abstract class
8689 return object : StreamsEventSinkConfigMapper (streamsConfigMap, mappingKeys) {
8790 override fun convert (config : Map <String , String >): Map <String , String > {
8891 val props = streamsConfigMap
@@ -172,37 +175,47 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
172175 }
173176
174177 override fun stop () {
175- StreamsUtils .ignoreExceptions({ consumer.close() }, UninitializedPropertyAccessException :: class .java )
178+ consumer.close()
176179 }
177180
178181 private fun readSimple (action : (String , List <Any >) -> Unit ) {
179182 val records = consumer.poll(0 )
180- if (! records.isEmpty) {
181- try {
182- this .topics.forEach { topic ->
183- val topicRecords = records.records(topic)
184- if (! topicRecords.iterator().hasNext()) {
185- return @forEach
186- }
187- action(topic, topicRecords.map { JSONUtils .readValue<Any >(it.value()) })
188- }
189- } catch (e: Exception ) {
190- // TODO add dead letter queue
191- }
183+ this .topics
184+ .filter { topic -> records.records(topic).iterator().hasNext() }
185+ .map { topic -> topic to records.records(topic) }
186+ .forEach { (topic, topicRecords) -> executeAction(action, topic, topicRecords) }
187+ }
188+
189+ fun executeAction (action : (String , List <Any >) -> Unit , topic : String , topicRecords : MutableIterable <ConsumerRecord <String , ByteArray >>) {
190+ try {
191+ action(topic, convert(topicRecords))
192+ } catch (e: Exception ) {
193+ // TODO send to the DLQ
192194 }
193195 }
194196
197+ private fun convert (topicRecords : MutableIterable <ConsumerRecord <String , ByteArray >>) = topicRecords
198+ .map {
199+ try {
200+ " ok" to JSONUtils .readValue<Any >(it.value())
201+ } catch (e: Exception ) {
202+ " error" to it
203+ }
204+ }
205+ .groupBy({ it.first }, { it.second })
206+ .let {
207+ // TODO send content of the "error" key to the DLQ
208+ it.getOrDefault(" ok" , emptyList())
209+ }
210+
195211 private fun readFromPartition (config : KafkaTopicConfig , action : (String , List <Any >) -> Unit ) {
196212 setSeek(config.topicPartitionsMap)
197213 val records = consumer.poll(0 )
198- val consumerRecordsMap = toConsumerRecordsMap(config.topicPartitionsMap, records)
199- if (consumerRecordsMap.isNotEmpty()) {
200- try {
201- consumerRecordsMap.forEach { action(it.key.topic(), it.value.map { JSONUtils .readValue<Any >(it.value()) }) }
202- } catch (e: Exception ) {
203- // TODO add dead letter queue
204- }
205- }
214+ config.topicPartitionsMap
215+ .mapValues { records.records(it.key) }
216+ .filterValues { it.isNotEmpty() }
217+ .mapKeys { it.key.topic() }
218+ .forEach { (topic, topicRecords) -> executeAction(action, topic, topicRecords) }
206219 }
207220
208221 override fun read (action : (String , List <Any >) -> Unit ) {
@@ -218,14 +231,6 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
218231 }
219232 }
220233
221- fun toConsumerRecordsMap (topicPartitionsMap : Map <TopicPartition , Long >,
222- records : ConsumerRecords <String , ByteArray >)
223- : Map <TopicPartition , List <ConsumerRecord <String , ByteArray >>> = topicPartitionsMap
224- .mapValues {
225- records.records(it.key)
226- }
227- .filterValues { it.isNotEmpty() }
228-
229234 fun setSeek (topicPartitionsMap : Map <TopicPartition , Long >) {
230235 if (isSeekSet) {
231236 return
@@ -245,65 +250,45 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
245250class KafkaManualCommitEventConsumer (private val config : KafkaSinkConfiguration ,
246251 private val log : Log ): KafkaAutoCommitEventConsumer(config, log) {
247252
248- private val topicPartitionOffsetMap = ConcurrentHashMap <TopicPartition , OffsetAndMetadata >()
249-
250253 override fun start () {
251254 if (topics.isEmpty()) {
252255 log.info(" No topics specified Kafka Consumer will not started" )
253256 return
254257 }
255- this .consumer.subscribe(topics, StreamsConsumerRebalanceListener (topicPartitionOffsetMap, consumer, config.autoOffsetReset, log) )
258+ this .consumer.subscribe(topics)
256259 }
257260
258261 private fun readSimple (action : (String , List <Any >) -> Unit ): Map <TopicPartition , OffsetAndMetadata > {
259- val topicMap = mutableMapOf<TopicPartition , OffsetAndMetadata >()
260262 val records = consumer.poll(0 )
261- if (! records.isEmpty) {
262- this .topics.forEach { topic ->
263- val topicRecords = records.records(topic)
264- if (! topicRecords.iterator().hasNext()) {
265- return @forEach
263+ return this .topics
264+ .filter { topic -> records.records(topic).iterator().hasNext() }
265+ .map { topic -> topic to records.records(topic) }
266+ .map { (topic, topicRecords) ->
267+ executeAction(action, topic, topicRecords)
268+ topicRecords.last()
266269 }
267- val lastRecord = topicRecords.last()
268- val offsetAndMetadata = OffsetAndMetadata (lastRecord.offset(), " " )
269- val topicPartition = TopicPartition (lastRecord.topic(), lastRecord.partition())
270- topicMap[topicPartition] = offsetAndMetadata
271- topicPartitionOffsetMap[topicPartition] = offsetAndMetadata
272- try {
273- action(topic, topicRecords.map { JSONUtils .readValue<Any >(it.value()) })
274- } catch (e: Exception ) {
275- // TODO add dead letter queue
276- }
277- }
278- }
279- return topicMap
270+ .map { it.topicPartition() to it.offsetAndMetadata() }
271+ .toMap()
280272 }
281273
282274 private fun readFromPartition (kafkaTopicConfig : KafkaTopicConfig ,
283275 action : (String , List <Any >) -> Unit ): Map <TopicPartition , OffsetAndMetadata > {
284- val topicMap = mutableMapOf<TopicPartition , OffsetAndMetadata >()
285276 setSeek(kafkaTopicConfig.topicPartitionsMap)
286277 val records = consumer.poll(0 )
287- val consumerRecordsMap = toConsumerRecordsMap(kafkaTopicConfig.topicPartitionsMap, records)
288- if (consumerRecordsMap.isNotEmpty()) {
289- try {
290- consumerRecordsMap.forEach {
291- val lastRecord = it.value.last()
292- val topicPartition = TopicPartition (lastRecord.topic(), lastRecord.partition())
293- val offsetAndMetadata = OffsetAndMetadata (lastRecord.offset(), " " )
294- topicMap[topicPartition] = offsetAndMetadata
295- topicPartitionOffsetMap[topicPartition] = offsetAndMetadata
296- action(it.key.topic(), it.value.map { JSONUtils .readValue<Any >(it.value()) })
278+ return kafkaTopicConfig.topicPartitionsMap
279+ .mapValues { records.records(it.key) }
280+ .filterValues { it.isNotEmpty() }
281+ .mapKeys { it.key.topic() }
282+ .map { (topic, topicRecords) ->
283+ executeAction(action, topic, topicRecords)
284+ topicRecords.last()
297285 }
298- } catch (e: Exception ) {
299- // TODO add dead letter queue
300- }
301- }
302- return topicMap
286+ .map { it.topicPartition() to it.offsetAndMetadata() }
287+ .toMap()
303288 }
304289
305290 private fun commitData (commit : Boolean , topicMap : Map <TopicPartition , OffsetAndMetadata >) {
306- if (commit && topicMap.isNotEmpty() ) {
291+ if (commit) {
307292 consumer.commitSync(topicMap)
308293 }
309294 }
@@ -322,41 +307,4 @@ class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
322307 }
323308 commitData(kafkaTopicConfig.commit, topicMap)
324309 }
325- }
326-
327- class StreamsConsumerRebalanceListener (private val topicPartitionOffsetMap : Map <TopicPartition , OffsetAndMetadata >,
328- private val consumer : KafkaConsumer <String , ByteArray >,
329- private val autoOffsetReset : String ,
330- private val log : Log ): ConsumerRebalanceListener {
331-
332- override fun onPartitionsRevoked (partitions : Collection <TopicPartition >) {
333- val offsets = partitions
334- .map {
335- val offset = consumer.position(it)
336- if (log.isDebugEnabled) {
337- log.debug(" onPartitionsRevoked: for topic ${it.topic()} partition ${it.partition()} , the last saved offset is: $offset " )
338- }
339- it to OffsetAndMetadata (offset, " " )
340- }
341- .toMap()
342- consumer.commitSync(offsets)
343- }
344-
345- override fun onPartitionsAssigned (partitions : Collection <TopicPartition >) {
346- for (partition in partitions) {
347- val offset = (topicPartitionOffsetMap[partition] ? : consumer.committed(partition))?.offset()
348- if (log.isDebugEnabled) {
349- log.debug(" onPartitionsAssigned: for ${partition.topic()} partition ${partition.partition()} , the retrieved offset is: $offset " )
350- }
351- if (offset == null ) {
352- when (autoOffsetReset) {
353- " latest" -> consumer.seekToEnd(listOf (partition))
354- " earliest" -> consumer.seekToBeginning(listOf (partition))
355- else -> throw RuntimeException (" No kafka.auto.offset.reset property specified" )
356- }
357- } else {
358- consumer.seek(partition, offset + 1 )
359- }
360- }
361- }
362310}
0 commit comments