Skip to content

Commit e7abb9f

Browse files
committed
fixes #195: High CPU usage when kafka even sink is enabled
1 parent 3d92c99 commit e7abb9f

File tree

3 files changed

+19
-6
lines changed

3 files changed

+19
-6
lines changed

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ abstract class StreamsEventConsumer(private val log: Log) {
3030

3131
abstract fun read(topicConfig: Map<String, Any> = emptyMap(), action: (String, List<Any>) -> Unit)
3232

33+
abstract fun read(action: (String, List<Any>) -> Unit)
34+
3335
}
3436

3537
abstract class StreamsEventConsumerFactory {

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
7878

7979
})
8080
} catch (e: Exception) {
81-
e.printStackTrace()
8281
streamsLog.error("Error initializing the streaming sink", e)
8382
}
8483
}

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,10 @@ class KafkaEventSink(private val config: Config,
7575
}
7676

7777
override fun stop() = runBlocking {
78-
log.info("Stopping Sink daemon Job")
78+
log.info("Stopping Kafka Sink daemon Job")
7979
try {
8080
job.cancelAndJoin()
81+
log.info("Kafka Sink daemon Job stopped")
8182
} catch (e : UninitializedPropertyAccessException) { /* ignoring this one only */ }
8283
}
8384

@@ -98,20 +99,22 @@ class KafkaEventSink(private val config: Config,
9899
return GlobalScope.launch(Dispatchers.IO) { // TODO improve exception management
99100
try {
100101
while (isActive) {
101-
if (Neo4jUtils.isWriteableInstance(db)) {
102+
val timeMillis = if (Neo4jUtils.isWriteableInstance(db)) {
102103
eventConsumer.read { topic, data ->
103104
if (log.isDebugEnabled) {
104105
log.debug("Reading data from topic $topic")
105106
}
106107
queryExecution.writeForTopic(topic, data)
107108
}
109+
TimeUnit.SECONDS.toMillis(1)
108110
} else {
109-
val timeMillis = TimeUnit.MILLISECONDS.toMinutes(5)
111+
val timeMillis = TimeUnit.MINUTES.toMillis(5)
110112
if (log.isDebugEnabled) {
111113
log.debug("Not in a writeable instance, new check in $timeMillis millis")
112114
}
113-
delay(timeMillis)
115+
timeMillis
114116
}
117+
delay(timeMillis)
115118
}
116119
eventConsumer.stop()
117120
} catch (e: Throwable) {
@@ -135,7 +138,7 @@ data class KafkaTopicConfig(val commit: Boolean, val topicPartitionsMap: Map<Top
135138
TopicPartition(topicConfigEntry.key, partition) to offset
136139
}
137140
}
138-
.associateBy({ it.first }, { it.second })
141+
.toMap()
139142

140143
fun fromMap(map: Map<String, Any>): KafkaTopicConfig {
141144
val commit = map.getOrDefault("commit", true).toString().toBoolean()
@@ -202,6 +205,10 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
202205
}
203206
}
204207

208+
override fun read(action: (String, List<Any>) -> Unit) {
209+
readSimple(action)
210+
}
211+
205212
override fun read(topicConfig: Map<String, Any>, action: (String, List<Any>) -> Unit) {
206213
val kafkaTopicConfig = KafkaTopicConfig.fromMap(topicConfig)
207214
if (kafkaTopicConfig.topicPartitionsMap.isEmpty()) {
@@ -301,6 +308,11 @@ class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
301308
}
302309
}
303310

311+
override fun read(action: (String, List<Any>) -> Unit) {
312+
val topicMap = readSimple(action)
313+
commitData(true, topicMap)
314+
}
315+
304316
override fun read(topicConfig: Map<String, Any>, action: (String, List<Any>) -> Unit) {
305317
val kafkaTopicConfig = KafkaTopicConfig.fromMap(topicConfig)
306318
val topicMap = if (kafkaTopicConfig.topicPartitionsMap.isEmpty()) {

0 commit comments

Comments
 (0)