11package streams.kafka
22
33import kotlinx.coroutines.*
4- import org.apache.kafka.clients.consumer.KafkaConsumer
4+ import org.apache.kafka.clients.consumer.*
5+ import org.apache.kafka.common.TopicPartition
56import org.neo4j.kernel.configuration.Config
67import org.neo4j.logging.Log
78import streams.*
9+ import streams.kafka.KafkaTopicConfig.Companion.toTopicPartitionMap
810import streams.serialization.JSONUtils
911import streams.utils.StreamsUtils
12+ import java.util.concurrent.ConcurrentHashMap
13+
1014
1115class KafkaEventSink (private val config : Config ,
1216 private val queryExecution : StreamsEventSinkQueryExecution ,
1317 private val streamsTopicService : StreamsTopicService ,
1418 private val log : Log ): StreamsEventSink(config, queryExecution, streamsTopicService, log) {
1519
16- private lateinit var eventConsumer: StreamsEventConsumer < * >
20+ private lateinit var eventConsumer: StreamsEventConsumer
1721 private lateinit var job: Job
1822
1923 private val streamsConfigMap = config.raw.filterKeys {
2024 it.startsWith(" kafka." ) || (it.startsWith(" streams." ) && ! it.startsWith(" streams.sink.topic.cypher." ))
2125 }.toMap()
2226
23- private val mappingKeys = mapOf (" timeout" to " streams.sink.polling.interval" ,
24- " from" to " kafka.auto.offset.reset" )
27+ private val mappingKeys = mapOf (
28+ " zookeeper" to " kafka.zookeeper.connect" ,
29+ " broker" to " kafka.${ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG } " ,
30+ " from" to " kafka.${ConsumerConfig .AUTO_OFFSET_RESET_CONFIG } " ,
31+ " autoCommit" to " kafka.${ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG } " ,
32+ " groupId" to " kafka.${ConsumerConfig .GROUP_ID_CONFIG } " )
2533
2634 override fun getEventConsumerFactory (): StreamsEventConsumerFactory {
2735 return object : StreamsEventConsumerFactory () {
28- override fun createStreamsEventConsumer (config : Map <String , String >, log : Log ): StreamsEventConsumer < * > {
36+ override fun createStreamsEventConsumer (config : Map <String , String >, log : Log ): StreamsEventConsumer {
2937 val kafkaConfig = KafkaSinkConfiguration .from(config)
30- val kafkaConsumer = KafkaConsumer <String , ByteArray >(kafkaConfig.asProperties())
31- return KafkaEventConsumer (kafkaConsumer, kafkaConfig.streamsSinkConfiguration, log)
38+ return if (kafkaConfig.enableAutoCommit) {
39+ KafkaAutoCommitEventConsumer (kafkaConfig, log)
40+ } else {
41+ KafkaManualCommitEventConsumer (kafkaConfig, log)
42+ }
3243 }
3344 }
3445 }
@@ -71,12 +82,11 @@ class KafkaEventSink(private val config: Config,
7182 return GlobalScope .launch(Dispatchers .IO ) {
7283 try {
7384 while (isActive) {
74- val data= eventConsumer.read()
75- data?.forEach {
85+ eventConsumer.read { topic, data ->
7686 if (log.isDebugEnabled) {
77- log.debug(" Reading data from topic ${it.key} , with data ${it.value} " )
87+ log.debug(" Reading data from topic $topic " )
7888 }
79- queryExecution.writeForTopic(it.key, it.value )
89+ queryExecution.writeForTopic(topic, data )
8090 }
8191 }
8292 eventConsumer.stop()
@@ -90,13 +100,38 @@ class KafkaEventSink(private val config: Config,
90100
91101}
92102
93- class KafkaEventConsumer (private val consumer : KafkaConsumer <String , ByteArray >,
94- private val config : StreamsSinkConfiguration ,
95- private val log : Log ): StreamsEventConsumer<KafkaConsumer<String, ByteArray>>(consumer, config, log) {
103+ data class KafkaTopicConfig (val commit : Boolean , val topicPartitionsMap : Map <TopicPartition , Long >) {
104+ companion object {
105+ private fun toTopicPartitionMap (topicConfig : Map <String ,
106+ List <Map <String , Any >>>): Map <TopicPartition , Long > = topicConfig
107+ .flatMap { topicConfigEntry ->
108+ topicConfigEntry.value.map {
109+ val partition = it.getValue(" partition" ).toString().toInt()
110+ val offset = it.getValue(" offset" ).toString().toLong()
111+ TopicPartition (topicConfigEntry.key, partition) to offset
112+ }
113+ }
114+ .associateBy({ it.first }, { it.second })
115+
116+ fun fromMap (map : Map <String , Any >): KafkaTopicConfig {
117+ val commit = map.getOrDefault(" commit" , true ).toString().toBoolean()
118+ val topicPartitionsMap = toTopicPartitionMap(map
119+ .getOrDefault(" partitions" , emptyMap<String , List <Map <String , Any >>>()) as Map <String , List <Map <String , Any >>>)
120+ return KafkaTopicConfig (commit = commit, topicPartitionsMap = topicPartitionsMap)
121+ }
122+ }
123+ }
124+
125+ open class KafkaAutoCommitEventConsumer (private val config : KafkaSinkConfiguration ,
126+ private val log : Log ): StreamsEventConsumer(log) {
96127
97- private lateinit var topics : Set < String >
128+ private var isSeekSet = false
98129
99- override fun withTopics (topics : Set <String >): StreamsEventConsumer <KafkaConsumer <String , ByteArray >> {
130+ val consumer = KafkaConsumer <String , ByteArray >(config.asProperties())
131+
132+ lateinit var topics: Set <String >
133+
134+ override fun withTopics (topics : Set <String >): StreamsEventConsumer {
100135 this .topics = topics
101136 return this
102137 }
@@ -113,15 +148,172 @@ class KafkaEventConsumer(private val consumer: KafkaConsumer<String, ByteArray>,
113148 StreamsUtils .ignoreExceptions({ consumer.close() }, UninitializedPropertyAccessException ::class .java)
114149 }
115150
116- override fun read (): Map <String , List <Any >>? {
117- val records = consumer.poll(config.sinkPollingInterval)
118- if (records != null && ! records.isEmpty) {
119- return records
120- .map {
121- it.topic()!! to JSONUtils .readValue<Any >(it.value())
151+ private fun readSimple (action : (String , List <Any >) -> Unit ) {
152+ val records = consumer.poll(0 )
153+ if (! records.isEmpty) {
154+ try {
155+ this .topics.forEach { topic ->
156+ action(topic, records.records(topic).map { JSONUtils .readValue<Any >(it.value()) })
157+ }
158+ } catch (e: Exception ) {
159+ // TODO add dead letter queue
160+ }
161+ }
162+ }
163+
164+ private fun readFromPartition (config : KafkaTopicConfig , action : (String , List <Any >) -> Unit ) {
165+ setSeek(config.topicPartitionsMap)
166+ val records = consumer.poll(0 )
167+ val consumerRecordsMap = toConsumerRecordsMap(config.topicPartitionsMap, records)
168+ if (consumerRecordsMap.isNotEmpty()) {
169+ try {
170+ consumerRecordsMap.forEach { action(it.key.topic(), it.value.map { JSONUtils .readValue<Any >(it.value()) }) }
171+ } catch (e: Exception ) {
172+ // TODO add dead letter queue
173+ }
174+ }
175+ }
176+
177+ override fun read (topicConfig : Map <String , Any >, action : (String , List <Any >) -> Unit ) {
178+ val kafkaTopicConfig = KafkaTopicConfig .fromMap(topicConfig)
179+ if (kafkaTopicConfig.topicPartitionsMap.isEmpty()) {
180+ readSimple(action)
181+ } else {
182+ readFromPartition(kafkaTopicConfig, action)
183+ }
184+ }
185+
186+ fun toConsumerRecordsMap (topicPartitionsMap : Map <TopicPartition , Long >,
187+ records : ConsumerRecords <String , ByteArray >)
188+ : Map <TopicPartition , List <ConsumerRecord <String , ByteArray >>> = topicPartitionsMap
189+ .mapValues {
190+ records.records(it.key)
191+ }
192+ .filterValues { it.isNotEmpty() }
193+
194+ fun setSeek (topicPartitionsMap : Map <TopicPartition , Long >) {
195+ if (isSeekSet) {
196+ return
197+ }
198+ isSeekSet = true
199+ consumer.poll(0 ) // dummy call see: https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition
200+ topicPartitionsMap.forEach {
201+ when (it.value) {
202+ - 1L -> consumer.seekToBeginning(listOf (it.key))
203+ - 2L -> consumer.seekToEnd(listOf (it.key))
204+ else -> consumer.seek(it.key, it.value)
205+ }
206+ }
207+ }
208+ }
209+
210+ class KafkaManualCommitEventConsumer (private val config : KafkaSinkConfiguration ,
211+ private val log : Log ): KafkaAutoCommitEventConsumer(config, log) {
212+
213+ private val topicPartitionOffsetMap = ConcurrentHashMap <TopicPartition , OffsetAndMetadata >()
214+
215+ override fun start () {
216+ if (topics.isEmpty()) {
217+ log.info(" No topics specified Kafka Consumer will not started" )
218+ return
219+ }
220+ this .consumer.subscribe(topics, StreamsConsumerRebalanceListener (topicPartitionOffsetMap, consumer, config.autoOffsetReset, log))
221+ }
222+
223+ private fun readSimple (action : (String , List <Any >) -> Unit ): Map <TopicPartition , OffsetAndMetadata > {
224+ val topicMap = mutableMapOf<TopicPartition , OffsetAndMetadata >()
225+ val records = consumer.poll(0 )
226+ if (! records.isEmpty) {
227+ this .topics.forEach { topic ->
228+ val topicRecords = records.records(topic)
229+ val lastRecord = topicRecords.last()
230+ val offsetAndMetadata = OffsetAndMetadata (lastRecord.offset(), " " )
231+ val topicPartition = TopicPartition (lastRecord.topic(), lastRecord.partition())
232+ topicMap[topicPartition] = offsetAndMetadata
233+ topicPartitionOffsetMap[topicPartition] = offsetAndMetadata
234+ try {
235+ action(topic, topicRecords.map { JSONUtils .readValue<Any >(it.value()) })
236+ } catch (e: Exception ) {
237+ // TODO add dead letter queue
238+ }
239+ }
240+ }
241+ return topicMap
242+ }
243+
244+ private fun readFromPartition (kafkaTopicConfig : KafkaTopicConfig ,
245+ action : (String , List <Any >) -> Unit ): Map <TopicPartition , OffsetAndMetadata > {
246+ val topicMap = mutableMapOf<TopicPartition , OffsetAndMetadata >()
247+ setSeek(kafkaTopicConfig.topicPartitionsMap)
248+ val records = consumer.poll(0 )
249+ val consumerRecordsMap = toConsumerRecordsMap(kafkaTopicConfig.topicPartitionsMap, records)
250+ if (consumerRecordsMap.isNotEmpty()) {
251+ try {
252+ consumerRecordsMap.forEach {
253+ val lastRecord = it.value.last()
254+ val topicPartition = TopicPartition (lastRecord.topic(), lastRecord.partition())
255+ val offsetAndMetadata = OffsetAndMetadata (lastRecord.offset(), " " )
256+ topicMap[topicPartition] = offsetAndMetadata
257+ topicPartitionOffsetMap[topicPartition] = offsetAndMetadata
258+ action(it.key.topic(), it.value.map { JSONUtils .readValue<Any >(it.value()) })
259+ }
260+ } catch (e: Exception ) {
261+ // TODO add dead letter queue
262+ }
263+ }
264+ return topicMap
265+ }
266+
267+ private fun commitData (commit : Boolean , topicMap : Map <TopicPartition , OffsetAndMetadata >) {
268+ if (commit && topicMap.isNotEmpty()) {
269+ consumer.commitSync(topicMap)
270+ }
271+ }
272+
273+ override fun read (topicConfig : Map <String , Any >, action : (String , List <Any >) -> Unit ) {
274+ val kafkaTopicConfig = KafkaTopicConfig .fromMap(topicConfig)
275+ val topicMap = if (kafkaTopicConfig.topicPartitionsMap.isEmpty()) {
276+ readSimple(action)
277+ } else {
278+ readFromPartition(kafkaTopicConfig, action)
279+ }
280+ commitData(kafkaTopicConfig.commit, topicMap)
281+ }
282+ }
283+
284+ class StreamsConsumerRebalanceListener (private val topicPartitionOffsetMap : Map <TopicPartition , OffsetAndMetadata >,
285+ private val consumer : KafkaConsumer <String , ByteArray >,
286+ private val autoOffsetReset : String ,
287+ private val log : Log ): ConsumerRebalanceListener {
288+
289+ override fun onPartitionsRevoked (partitions : Collection <TopicPartition >) {
290+ val offsets = partitions
291+ .map {
292+ val offset = consumer.position(it)
293+ if (log.isDebugEnabled) {
294+ log.debug(" for topic ${it.topic()} partition ${it.partition()} , the last saved offset is: $offset " )
122295 }
123- .groupBy({ it.first }, { it.second })
296+ it to OffsetAndMetadata (offset, " " )
297+ }
298+ .toMap()
299+ consumer.commitSync(offsets)
300+ }
301+
302+ override fun onPartitionsAssigned (partitions : Collection <TopicPartition >) {
303+ for (partition in partitions) {
304+ val offset = topicPartitionOffsetMap[partition]?.offset()
305+ if (log.isDebugEnabled) {
306+ log.debug(" for ${partition.topic()} partition ${partition.partition()} , the retrieved offset is: $offset " )
307+ }
308+ if (offset == null ) {
309+ when (autoOffsetReset) {
310+ " latest" -> consumer.seekToEnd(listOf (partition))
311+ " earliest" -> consumer.seekToBeginning(listOf (partition))
312+ else -> throw RuntimeException (" No kafka.auto.offset.reset property specified" )
313+ }
314+ } else {
315+ consumer.seek(partition, offset + 1 )
316+ }
124317 }
125- return null
126318 }
127319}
0 commit comments