Skip to content

Commit e6bf29b

Browse files
conker84moxious
authored andcommitted
fixes #252: Source with non-existant topic hangs indefinitely (#257)
1 parent 486d8a1 commit e6bf29b

29 files changed

+531
-99
lines changed

common/src/main/kotlin/streams/service/Topics.kt

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
88
val cdcSchemaTopics: Set<String> = emptySet(),
99
val cudTopics: Set<String> = emptySet(),
1010
val nodePatternTopics: Map<String, NodePatternConfiguration> = emptyMap(),
11-
val relPatternTopics: Map<String, RelationshipPatternConfiguration> = emptyMap()) {
11+
val relPatternTopics: Map<String, RelationshipPatternConfiguration> = emptyMap(),
12+
val invalid: List<String> = emptyList()) {
1213

1314
fun allTopics(): List<String> = this.asMap()
1415
.map {
@@ -25,7 +26,7 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
2526
TopicType.PATTERN_NODE to nodePatternTopics, TopicType.PATTERN_RELATIONSHIP to relPatternTopics)
2627

2728
companion object {
28-
fun from(config: Map<*, *>, prefix: String, toReplace: String = ""): Topics {
29+
fun from(config: Map<*, *>, prefix: String, toReplace: String = "", invalidTopics: List<String> = emptyList()): Topics {
2930
val cypherTopicPrefix = TopicType.CYPHER.key.replace(prefix, toReplace)
3031
val sourceIdKey = TopicType.CDC_SOURCE_ID.key.replace(prefix, toReplace)
3132
val schemaKey = TopicType.CDC_SCHEMA.key.replace(prefix, toReplace)
@@ -34,14 +35,14 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
3435
val relPatterKey = TopicType.PATTERN_RELATIONSHIP.key.replace(prefix, toReplace)
3536
val cypherTopics = TopicUtils.filterByPrefix(config, cypherTopicPrefix)
3637
val nodePatternTopics = TopicUtils
37-
.filterByPrefix(config, nodePatterKey)
38+
.filterByPrefix(config, nodePatterKey, invalidTopics)
3839
.mapValues { NodePatternConfiguration.parse(it.value) }
3940
val relPatternTopics = TopicUtils
40-
.filterByPrefix(config, relPatterKey)
41+
.filterByPrefix(config, relPatterKey, invalidTopics)
4142
.mapValues { RelationshipPatternConfiguration.parse(it.value) }
42-
val cdcSourceIdTopics = TopicUtils.splitTopics(config[sourceIdKey] as? String)
43-
val cdcSchemaTopics = TopicUtils.splitTopics(config[schemaKey] as? String)
44-
val cudTopics = TopicUtils.splitTopics(config[cudKey] as? String)
43+
val cdcSourceIdTopics = TopicUtils.splitTopics(config[sourceIdKey] as? String, invalidTopics)
44+
val cdcSchemaTopics = TopicUtils.splitTopics(config[schemaKey] as? String, invalidTopics)
45+
val cudTopics = TopicUtils.splitTopics(config[cudKey] as? String, invalidTopics)
4546
return Topics(cypherTopics, cdcSourceIdTopics, cdcSchemaTopics, cudTopics, nodePatternTopics, relPatternTopics)
4647
}
4748
}
@@ -51,19 +52,22 @@ object TopicUtils {
5152

5253
@JvmStatic val TOPIC_SEPARATOR = ";"
5354

54-
fun filterByPrefix(config: Map<*, *>, prefix: String): Map<String, String> {
55+
fun filterByPrefix(config: Map<*, *>, prefix: String, invalidTopics: List<String> = emptyList()): Map<String, String> {
5556
val fullPrefix = "$prefix."
5657
return config
5758
.filterKeys { it.toString().startsWith(fullPrefix) }
5859
.mapKeys { it.key.toString().replace(fullPrefix, "") }
60+
.filterKeys { !invalidTopics.contains(it) }
5961
.mapValues { it.value.toString() }
6062
}
6163

62-
fun splitTopics(cdcMergeTopicsString: String?): Set<String> {
64+
fun splitTopics(cdcMergeTopicsString: String?, invalidTopics: List<String> = emptyList()): Set<String> {
6365
return if (cdcMergeTopicsString.isNullOrBlank()) {
6466
emptySet()
6567
} else {
66-
cdcMergeTopicsString.split(TOPIC_SEPARATOR).toSet()
68+
cdcMergeTopicsString.split(TOPIC_SEPARATOR)
69+
.filter { !invalidTopics.contains(it) }
70+
.toSet()
6771
}
6872
}
6973

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package streams.utils
2+
3+
import org.apache.kafka.clients.admin.AdminClient
4+
import org.apache.kafka.common.config.ConfigResource
5+
import java.util.*
6+
7+
object KafkaValidationUtils {
8+
fun getInvalidTopicsError(invalidTopics: List<String>) = "The BROKER config `auto.create.topics.enable` is false, the following topics need to be created into the Kafka cluster otherwise the messages will be discarded: $invalidTopics"
9+
10+
fun getInvalidTopics(kafkaProps: Properties, allTopics: List<String>) = getInvalidTopics(AdminClient.create(kafkaProps), allTopics)
11+
12+
fun getInvalidTopics(client: AdminClient, allTopics: List<String>): List<String> {
13+
val kafkaTopics = client.listTopics().names().get()
14+
val invalidTopics = allTopics.filter { !kafkaTopics.contains(it) }
15+
return if (invalidTopics.isNotEmpty()) {
16+
if (isAutoCreateTopicsEnabled(client)) {
17+
emptyList()
18+
} else {
19+
invalidTopics
20+
}
21+
} else {
22+
invalidTopics
23+
}
24+
}
25+
26+
fun isAutoCreateTopicsEnabled(kafkaProps: Properties) = isAutoCreateTopicsEnabled(AdminClient.create(kafkaProps))
27+
28+
fun isAutoCreateTopicsEnabled(client: AdminClient): Boolean {
29+
val firstNodeId = client.describeCluster().nodes().get().first().id()
30+
val configs = client.describeConfigs(listOf(ConfigResource(ConfigResource.Type.BROKER, firstNodeId.toString()))).all().get()
31+
return configs.values
32+
.flatMap { it.entries() }
33+
.find { it.name() == "auto.create.topics.enable" }
34+
?.value()
35+
?.toBoolean() ?: false
36+
}
37+
}

consumer/src/main/kotlin/streams/StreamsEventConsumer.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ abstract class StreamsEventConsumer(private val log: Log, private val dlqService
1717

1818
abstract fun read(action: (String, List<StreamsSinkEntity>) -> Unit)
1919

20+
abstract fun invalidTopics(): List<String>
21+
2022
}
2123

2224

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ abstract class StreamsEventSink(private val config: Config,
2121

2222
open fun getEventSinkConfigMapper(): StreamsEventSinkConfigMapper = StreamsEventSinkConfigMapper(streamsConfigMap, mappingKeys)
2323

24+
open fun printInvalidTopics() {}
25+
2426
}
2527

2628
object StreamsEventSinkFactory {

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
8383
streamsTopicService.clearAll()
8484
streamsTopicService.setAll(streamsSinkConfiguration.topics)
8585
eventSink.start()
86+
eventSink.printInvalidTopics()
8687
}
8788

8889
override fun stop() {

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ data class StreamsSinkConfiguration(val enabled: Boolean = false,
2424
return from(cfg.raw)
2525
}
2626

27-
fun from(cfg: Map<String, String>): StreamsSinkConfiguration {
27+
fun from(cfg: Map<String, String>, invalidTopics: List<String> = emptyList()): StreamsSinkConfiguration {
2828
val default = StreamsSinkConfiguration()
2929
val config = cfg
3030
.filterKeys { it.startsWith(StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX) }
3131
.mapKeys { it.key.substring(StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX.length) }
3232

33-
val topics = Topics.from(config, StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX)
33+
val topics = Topics.from(config = config,
34+
prefix = StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX,
35+
invalidTopics = invalidTopics)
3436

3537
TopicUtils.validate<RuntimeException>(topics)
3638

consumer/src/main/kotlin/streams/StreamsTopicService.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,13 @@
11
package streams
22

3-
import org.apache.commons.lang3.StringUtils
43
import org.neo4j.kernel.impl.core.EmbeddedProxySPI
54
import org.neo4j.kernel.impl.core.GraphProperties
65
import org.neo4j.kernel.internal.GraphDatabaseAPI
76
import streams.serialization.JSONUtils
87
import streams.service.STREAMS_TOPIC_KEY
98
import streams.service.TopicType
10-
import streams.service.TopicTypeGroup
11-
import streams.service.sink.strategy.NodePatternConfiguration
12-
import streams.service.sink.strategy.RelationshipPatternConfiguration
13-
import streams.utils.Neo4jUtils
14-
import streams.service.TopicUtils
159
import streams.service.Topics
10+
import streams.utils.Neo4jUtils
1611

1712
class StreamsTopicService(private val db: GraphDatabaseAPI) {
1813
private val properties: GraphProperties = db.dependencyResolver.resolveDependency(EmbeddedProxySPI::class.java).newGraphPropertiesProxy()

consumer/src/main/kotlin/streams/kafka/KafkaAutoCommitEventConsumer.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
4242
private val log: Log,
4343
private val errorService: ErrorService): StreamsEventConsumer(log, errorService) {
4444

45+
override fun invalidTopics(): List<String> = config.streamsSinkConfiguration.topics.invalid
46+
4547
private var isSeekSet = false
4648

4749
val consumer: KafkaConsumer<*, *> = when {

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,29 @@
11
package streams.kafka
22

3-
import kotlinx.coroutines.*
3+
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.GlobalScope
5+
import kotlinx.coroutines.Job
6+
import kotlinx.coroutines.cancelAndJoin
7+
import kotlinx.coroutines.delay
8+
import kotlinx.coroutines.isActive
9+
import kotlinx.coroutines.launch
10+
import kotlinx.coroutines.runBlocking
411
import org.apache.kafka.clients.consumer.ConsumerConfig
512
import org.neo4j.kernel.configuration.Config
613
import org.neo4j.kernel.internal.GraphDatabaseAPI
714
import org.neo4j.logging.Log
8-
import streams.*
15+
import streams.StreamsEventConsumer
16+
import streams.StreamsEventConsumerFactory
17+
import streams.StreamsEventSink
18+
import streams.StreamsEventSinkQueryExecution
19+
import streams.StreamsSinkConfiguration
20+
import streams.StreamsSinkConfigurationConstants
21+
import streams.StreamsTopicService
922
import streams.service.errors.ErrorService
1023
import streams.service.errors.KafkaErrorService
24+
import streams.utils.KafkaValidationUtils.getInvalidTopicsError
1125
import streams.utils.Neo4jUtils
26+
import streams.utils.StreamsUtils
1227
import java.util.concurrent.TimeUnit
1328

1429

@@ -117,5 +132,13 @@ class KafkaEventSink(private val config: Config,
117132
}
118133
}
119134

135+
override fun printInvalidTopics() {
136+
StreamsUtils.ignoreExceptions({
137+
if (eventConsumer.invalidTopics().isNotEmpty()) {
138+
log.warn(getInvalidTopicsError(eventConsumer.invalidTopics()))
139+
}
140+
}, UninitializedPropertyAccessException::class.java)
141+
}
142+
120143
}
121144

consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import org.neo4j.kernel.configuration.Config
88
import streams.StreamsSinkConfiguration
99
import streams.extensions.toPointCase
1010
import streams.serialization.JSONUtils
11-
import streams.utils.ValidationUtils
11+
import streams.utils.KafkaValidationUtils.getInvalidTopics
1212
import streams.utils.ValidationUtils.validateConnection
1313
import java.util.*
1414

@@ -49,7 +49,12 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
4949
fun from(cfg: Map<String, String>): KafkaSinkConfiguration {
5050
val kafkaCfg = create(cfg)
5151
validate(kafkaCfg)
52-
return kafkaCfg
52+
val invalidTopics = getInvalidTopics(kafkaCfg.asProperties(), kafkaCfg.streamsSinkConfiguration.topics.allTopics())
53+
return if (invalidTopics.isNotEmpty()) {
54+
kafkaCfg.copy(streamsSinkConfiguration = StreamsSinkConfiguration.from(cfg, invalidTopics))
55+
} else {
56+
kafkaCfg
57+
}
5358
}
5459

5560
// Visible for testing
@@ -59,12 +64,10 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
5964
.mapKeys { it.key.substring(kafkaConfigPrefix.length) }
6065
val default = KafkaSinkConfiguration()
6166

62-
6367
val keys = JSONUtils.asMap(default).keys.map { it.toPointCase() }
6468
val extraProperties = config.filterKeys { !keys.contains(it) }
6569

6670
val streamsSinkConfiguration = StreamsSinkConfiguration.from(cfg)
67-
6871
return default.copy(zookeeperConnect = config.getOrDefault("zookeeper.connect",default.zookeeperConnect),
6972
keyDeserializer = config.getOrDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, default.keyDeserializer),
7073
valueDeserializer = config.getOrDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, default.valueDeserializer),

0 commit comments

Comments
 (0)