Skip to content

Commit 6917083

Browse files
conker84jexp
authored andcommitted
fixes #181: Validate configuration before attempting to start (#217)
1 parent 4555942 commit 6917083

File tree

11 files changed

+283
-77
lines changed

11 files changed

+283
-77
lines changed
Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,42 @@
11
package streams.utils
22

3+
import java.io.IOException
4+
import java.net.Socket
5+
import java.net.URI
6+
37
object ValidationUtils {
48

5-
fun validateTopics(cdcMergeTopics: Set<String>, cdcSchemaTopics: Set<String>,
6-
cypherTopics: Set<String>, nodePatternTopics: Set<String>, relPatternTopics: Set<String>) {
7-
val allTopicsLists = mutableListOf<String>()
8-
allTopicsLists.addAll(cdcMergeTopics)
9-
allTopicsLists.addAll(cdcSchemaTopics)
10-
allTopicsLists.addAll(cypherTopics)
11-
allTopicsLists.addAll(nodePatternTopics)
12-
allTopicsLists.addAll(relPatternTopics)
13-
val crossDefinedTopics = allTopicsLists.map { it to 1 }
14-
.groupBy({ it.first }, { it.second })
15-
.mapValues { it.value.reduce { acc, i -> acc + i } }
16-
.filterValues { it > 1 }
17-
.keys
18-
if (crossDefinedTopics.isNotEmpty()) {
19-
throw RuntimeException("The following topics are cross defined: $crossDefinedTopics")
9+
fun isServerReachable(url: String, port: Int): Boolean = try {
10+
Socket(url, port).use { true }
11+
} catch (e: IOException) {
12+
false
13+
}
14+
15+
fun checkServersUnreachable(urls: String, separator: String = ","): List<String> = urls
16+
.split(separator)
17+
.map {
18+
val uri = URI.create(it)
19+
when (uri.host.isNullOrBlank()) {
20+
true -> {
21+
val splitted = it.split(":")
22+
URI("fake-scheme", "", splitted.first(), splitted.last().toInt(),
23+
"", "", "")
24+
}
25+
else -> uri
26+
}
27+
}
28+
.filter { uri -> !isServerReachable(uri.host, uri.port) }
29+
.map { if (it.scheme == "fake-scheme") "${it.host}:${it.port}" else it.toString() }
30+
31+
fun validateConnection(url: String, kafkaPropertyKey: String, checkReachable: Boolean = true) {
32+
if (url.isBlank()) {
33+
throw RuntimeException("The `kafka.$kafkaPropertyKey` property is empty")
34+
} else if (checkReachable) {
35+
val unreachableServers = ValidationUtils.checkServersUnreachable(url)
36+
if (unreachableServers.isNotEmpty()) {
37+
throw RuntimeException("The servers defined into the property `kafka.$kafkaPropertyKey` are not reachable: $unreachableServers")
38+
}
2039
}
2140
}
41+
2242
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package streams.utils
2+
3+
import org.junit.Test
4+
import org.testcontainers.containers.GenericContainer
5+
import kotlin.test.assertEquals
6+
import kotlin.test.assertTrue
7+
8+
class FakeWebServer: GenericContainer<FakeWebServer>("alpine") {
9+
override fun start() {
10+
this.withCommand("/bin/sh", "-c", "while true; do { echo -e 'HTTP/1.1 200 OK'; echo ; } | nc -l -p 8000; done")
11+
.withExposedPorts(8000)
12+
super.start()
13+
}
14+
15+
fun getUrl() = "http://localhost:${getMappedPort(8000)}"
16+
}
17+
18+
class ValidationUtilsTest {
19+
20+
@Test
21+
fun `should reach the server`() {
22+
val httpServer = FakeWebServer()
23+
httpServer.start()
24+
assertTrue { ValidationUtils.checkServersUnreachable(httpServer.getUrl()).isEmpty() }
25+
httpServer.stop()
26+
}
27+
28+
@Test
29+
fun `should not reach the server`() {
30+
val urls = "http://my.fake.host:1234,PLAINTEXT://my.fake.host1:1234,my.fake.host2:1234"
31+
val checkServersUnreachable = ValidationUtils
32+
.checkServersUnreachable(urls)
33+
assertTrue { checkServersUnreachable.isNotEmpty() }
34+
assertEquals(urls.split(",").toList(), checkServersUnreachable)
35+
}
36+
}

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -37,54 +37,52 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
3737
private lateinit var eventSink: StreamsEventSink
3838

3939
override fun start() {
40-
try {
4140
dependencies.availabilityGuard().addListener(object: AvailabilityListener {
4241
override fun unavailable() {}
4342

4443
override fun available() {
45-
streamsLog.info("Initialising the Streams Sink module")
46-
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration)
47-
val streamsTopicService = StreamsTopicService(db)
48-
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
49-
streamsSinkConfiguration.sourceIdStrategyConfig)
50-
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db,
51-
logService.getUserLog(StreamsEventSinkQueryExecution::class.java),
52-
strategyMap)
44+
try {
45+
streamsLog.info("Initialising the Streams Sink module")
46+
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration)
47+
val streamsTopicService = StreamsTopicService(db)
48+
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
49+
streamsSinkConfiguration.sourceIdStrategyConfig)
50+
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db,
51+
logService.getUserLog(StreamsEventSinkQueryExecution::class.java),
52+
strategyMap)
5353

54-
// Create the Sink
55-
val log = logService.getUserLog(StreamsEventSinkFactory::class.java)
56-
eventSink = StreamsEventSinkFactory
57-
.getStreamsEventSink(configuration,
58-
streamsQueryExecution,
59-
streamsTopicService,
60-
log,
61-
db)
62-
// start the Sink
63-
if (Neo4jUtils.isCluster(db)) {
64-
log.info("The Sink module is running in a cluster, checking for the ${Neo4jUtils.LEADER}")
65-
Neo4jUtils.waitForTheLeader(db, log) { initSinkModule(streamsTopicService, streamsSinkConfiguration) }
66-
} else {
67-
// check if is writeable instance
68-
Neo4jUtils.executeInWriteableInstance(db) { initSinkModule(streamsTopicService, streamsSinkConfiguration) }
69-
}
54+
// Create the Sink
55+
val log = logService.getUserLog(StreamsEventSinkFactory::class.java)
56+
eventSink = StreamsEventSinkFactory
57+
.getStreamsEventSink(configuration,
58+
streamsQueryExecution,
59+
streamsTopicService,
60+
log,
61+
db)
62+
// start the Sink
63+
if (Neo4jUtils.isCluster(db)) {
64+
log.info("The Sink module is running in a cluster, checking for the ${Neo4jUtils.LEADER}")
65+
Neo4jUtils.waitForTheLeader(db, log) { initSinkModule(streamsTopicService, streamsSinkConfiguration) }
66+
} else {
67+
// check if is writeable instance
68+
Neo4jUtils.executeInWriteableInstance(db) { initSinkModule(streamsTopicService, streamsSinkConfiguration) }
69+
}
7070

71-
// Register required services for the Procedures
72-
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
73-
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
74-
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
71+
// Register required services for the Procedures
72+
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
73+
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
74+
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
75+
} catch (e: Exception) {
76+
streamsLog.error("Error initializing the streaming sink:", e)
77+
}
7578
}
76-
7779
})
78-
} catch (e: Exception) {
79-
streamsLog.error("Error initializing the streaming sink", e)
80-
}
8180
}
8281

8382
private fun initSinkModule(streamsTopicService: StreamsTopicService, streamsSinkConfiguration: StreamsSinkConfiguration) {
8483
streamsTopicService.clearAll()
8584
streamsTopicService.setAll(streamsSinkConfiguration.topics)
8685
eventSink.start()
87-
streamsLog.info("Streams Sink module initialised")
8886
}
8987

9088
override fun stop() {

consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,25 @@ import org.neo4j.kernel.configuration.Config
88
import streams.StreamsSinkConfiguration
99
import streams.extensions.toPointCase
1010
import streams.serialization.JSONUtils
11+
import streams.utils.ValidationUtils
12+
import streams.utils.ValidationUtils.validateConnection
1113
import java.util.*
1214

1315

1416
private const val kafkaConfigPrefix = "kafka."
1517

1618
private val SUPPORTED_DESERIALIZER = listOf(ByteArrayDeserializer::class.java.name, KafkaAvroDeserializer::class.java.name)
1719

18-
private fun validateDeserializers(config: Map<String, String>) {
19-
val kafkaKeyConfig = config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG].orEmpty()
20-
val kafkaValueConfig = config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG].orEmpty()
21-
val key = if (kafkaKeyConfig.isNotBlank() && !SUPPORTED_DESERIALIZER.contains(kafkaKeyConfig)) {
20+
private fun validateDeserializers(config: KafkaSinkConfiguration) {
21+
val key = if (!SUPPORTED_DESERIALIZER.contains(config.keyDeserializer)) {
2222
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
23-
} else if (kafkaValueConfig.isNotBlank() && !SUPPORTED_DESERIALIZER.contains(kafkaValueConfig)) {
23+
} else if (!SUPPORTED_DESERIALIZER.contains(config.valueDeserializer)) {
2424
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
2525
} else {
2626
""
2727
}
2828
if (key.isNotBlank()) {
29-
throw RuntimeException("The property kafka.$key contains an invalid deserializer. Supported deserializers are $SUPPORTED_DESERIALIZER")
29+
throw RuntimeException("The property `kafka.$key` contains an invalid deserializer. Supported deserializers are $SUPPORTED_DESERIALIZER")
3030
}
3131
}
3232

@@ -42,19 +42,25 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
4242

4343
companion object {
4444

45-
fun from(cfg: Config) : KafkaSinkConfiguration {
45+
fun from(cfg: Config): KafkaSinkConfiguration {
4646
return from(cfg.raw)
4747
}
4848

49-
fun from(cfg: Map<String, String>) : KafkaSinkConfiguration {
49+
fun from(cfg: Map<String, String>): KafkaSinkConfiguration {
50+
val kafkaCfg = create(cfg)
51+
validate(kafkaCfg)
52+
return kafkaCfg
53+
}
54+
55+
// Visible for testing
56+
fun create(cfg: Map<String, String>): KafkaSinkConfiguration {
5057
val config = cfg
5158
.filterKeys { it.startsWith(kafkaConfigPrefix) }
5259
.mapKeys { it.key.substring(kafkaConfigPrefix.length) }
5360
val default = KafkaSinkConfiguration()
5461

5562

5663
val keys = JSONUtils.asMap(default).keys.map { it.toPointCase() }
57-
validate(config)
5864
val extraProperties = config.filterKeys { !keys.contains(it) }
5965

6066
val streamsSinkConfiguration = StreamsSinkConfiguration.from(cfg)
@@ -71,7 +77,14 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
7177
)
7278
}
7379

74-
private fun validate(config: Map<String, String>) {
80+
private fun validate(config: KafkaSinkConfiguration) {
81+
validateConnection(config.zookeeperConnect, "zookeeper.connect", false)
82+
validateConnection(config.bootstrapServers, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
83+
val schemaRegistryUrlKey = "schema.registry.url"
84+
if (config.extraProperties.containsKey(schemaRegistryUrlKey)) {
85+
val schemaRegistryUrl = config.extraProperties.getOrDefault(schemaRegistryUrlKey, "")
86+
validateConnection(schemaRegistryUrl, schemaRegistryUrlKey, false)
87+
}
7588
validateDeserializers(config)
7689
}
7790
}

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkBase.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ open class KafkaEventSinkBase {
6060
graphDatabaseBuilder = org.neo4j.test.TestGraphDatabaseFactory()
6161
.newImpermanentDatabaseBuilder()
6262
.setConfig("kafka.bootstrap.servers", KafkaEventSinkSuiteIT.kafka.bootstrapServers)
63+
.setConfig("kafka.zookeeper.connect", KafkaEventSinkSuiteIT.kafka.envMap["KAFKA_ZOOKEEPER_CONNECT"])
6364
.setConfig("streams.sink.enabled", "true")
6465
kafkaProducer = createProducer()
6566
kafkaAvroProducer = createProducer(valueSerializer = KafkaAvroSerializer::class.java.name,
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package integrations.kafka
2+
3+
import io.confluent.kafka.serializers.KafkaAvroDeserializer
4+
import org.junit.Test
5+
import org.neo4j.kernel.internal.GraphDatabaseAPI
6+
import org.neo4j.test.TestGraphDatabaseFactory
7+
import org.testcontainers.containers.GenericContainer
8+
import kotlin.test.assertEquals
9+
10+
11+
class FakeWebServer: GenericContainer<FakeWebServer>("alpine") {
12+
override fun start() {
13+
this.withCommand("/bin/sh", "-c", "while true; do { echo -e 'HTTP/1.1 200 OK'; echo ; } | nc -l -p 8000; done")
14+
.withExposedPorts(8000)
15+
super.start()
16+
}
17+
18+
fun getUrl() = "http://localhost:${getMappedPort(8000)}"
19+
}
20+
21+
class KafkaEventSinkNoConfigurationIT {
22+
23+
private val topic = "no-config"
24+
25+
@Test
26+
fun `the db should start even with no bootstrap servers provided()`() {
27+
val db = TestGraphDatabaseFactory()
28+
.newImpermanentDatabaseBuilder()
29+
.setConfig("kafka.bootstrap.servers", "")
30+
.setConfig("streams.sink.enabled", "true")
31+
.setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})")
32+
.newGraphDatabase() as GraphDatabaseAPI
33+
val count = db.execute("MATCH (n) RETURN COUNT(n) AS count").columnAs<Long>("count").next()
34+
assertEquals(0L, count)
35+
}
36+
37+
@Test
38+
fun `the db should start even with AVRO serializers and no schema registry url provided()`() {
39+
val fakeWebServer = FakeWebServer()
40+
fakeWebServer.start()
41+
val url = fakeWebServer.getUrl().replace("http://", "")
42+
val db = TestGraphDatabaseFactory()
43+
.newImpermanentDatabaseBuilder()
44+
.setConfig("kafka.bootstrap.servers", url)
45+
.setConfig("kafka.zookeeper.connect", url)
46+
.setConfig("streams.sink.enabled", "true")
47+
.setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})")
48+
.setConfig("kafka.key.deserializer", KafkaAvroDeserializer::class.java.name)
49+
.setConfig("kafka.value.deserializer", KafkaAvroDeserializer::class.java.name)
50+
.newGraphDatabase() as GraphDatabaseAPI
51+
val count = db.execute("MATCH (n) RETURN COUNT(n) AS count").columnAs<Long>("count").next()
52+
assertEquals(0L, count)
53+
fakeWebServer.stop()
54+
}
55+
}

0 commit comments

Comments
 (0)