Skip to content

Commit 73fe355

Browse files
committed
added StreamsConfig
1 parent e165b2c commit 73fe355

30 files changed

+366
-290
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package streams.config
2+
3+
import org.neo4j.configuration.Config
4+
import org.neo4j.kernel.lifecycle.LifecycleAdapter
5+
import org.neo4j.logging.internal.LogService
6+
import java.io.FileInputStream
7+
import java.io.FileNotFoundException
8+
import java.util.Properties
9+
import java.util.concurrent.ConcurrentHashMap
10+
import java.util.regex.Matcher
11+
import java.util.regex.Pattern
12+
13+
class StreamsConfig(private val neo4jConfig: Config, logService: LogService) : LifecycleAdapter() {
14+
15+
val config = ConcurrentHashMap<String, String>()
16+
17+
private val log = logService.getInternalLog(StreamsConfig::class.java)
18+
19+
private val SUN_JAVA_COMMAND = "sun.java.command"
20+
private val CONF_DIR_PATTERN = Pattern.compile("--config-dir=(\\S+)")
21+
22+
companion object {
23+
private var afterInitListeners = mutableListOf<((MutableMap<String, String>) -> Unit)>()
24+
25+
fun registerListener(after: (MutableMap<String, String>) -> Unit) {
26+
afterInitListeners.add(after)
27+
}
28+
}
29+
30+
override fun init() {
31+
log.debug("Init StreamsConfig")
32+
33+
loadConfiguration()
34+
afterInitListeners.forEach { it(config) }
35+
}
36+
37+
38+
39+
private fun loadConfiguration() {
40+
val neo4jConfFolder = System.getenv().getOrDefault("NEO4J_CONF", determineNeo4jConfFolder())
41+
42+
val properties = Properties()
43+
try {
44+
log.info("the retrieved NEO4J_CONF dirs is $neo4jConfFolder")
45+
properties.load(FileInputStream("$neo4jConfFolder/neo4j.conf"))
46+
} catch (e: FileNotFoundException) {
47+
log.error("the neo4j.conf file is not under the directory defined into the directory $neo4jConfFolder, please set the NEO4J_CONF env correctly")
48+
}
49+
50+
val filteredValues = properties
51+
.filterKeys { it.toString().startsWith("streams") || it.toString().startsWith("kafka") }
52+
.mapNotNull {
53+
if (it.value == null) {
54+
null
55+
} else {
56+
it.key.toString() to it.value.toString()
57+
}
58+
}
59+
.toMap()
60+
log.debug("Neo4j Streams configuration from neo4j.conf file: $filteredValues")
61+
62+
config.putAll(filteredValues)
63+
}
64+
65+
private fun determineNeo4jConfFolder(): String? { // sun.java.command=com.neo4j.server.enterprise.CommercialEntryPoint --home-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02 --config-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02/conf
66+
val command = System.getProperty(SUN_JAVA_COMMAND)
67+
val matcher: Matcher = CONF_DIR_PATTERN.matcher(command)
68+
return if (matcher.find()) {
69+
val neo4jConfFolder = matcher.group(1)
70+
log.info("from system properties: NEO4J_CONF=%s", neo4jConfFolder)
71+
neo4jConfFolder
72+
} else {
73+
log.info("cannot determine conf folder from sys property %s, assuming '.' ", command)
74+
"."
75+
}
76+
}
77+
78+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package streams.config
2+
3+
import org.neo4j.configuration.Config
4+
import org.neo4j.dbms.api.DatabaseManagementService
5+
import org.neo4j.kernel.extension.ExtensionFactory
6+
import org.neo4j.kernel.extension.ExtensionType
7+
import org.neo4j.kernel.extension.context.ExtensionContext
8+
import org.neo4j.kernel.lifecycle.Lifecycle
9+
import org.neo4j.logging.internal.LogService
10+
11+
class StreamsConfigExtensionFactory: ExtensionFactory<StreamsConfigExtensionFactory.Dependencies>(ExtensionType.GLOBAL, StreamsConfig::class.java.simpleName) {
12+
interface Dependencies {
13+
fun log(): LogService
14+
fun config(): Config
15+
}
16+
17+
override fun newInstance(context: ExtensionContext, dependencies: Dependencies): Lifecycle {
18+
return StreamsConfig(dependencies.config(), dependencies.log())
19+
}
20+
}
Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,14 @@
11
package streams.extensions
22

3-
import org.neo4j.configuration.Config
43
import org.neo4j.graphdb.GraphDatabaseService
4+
import org.neo4j.graphdb.Result
55

66
fun GraphDatabaseService.execute(cypher: String) = this.execute(cypher, emptyMap())
7+
fun GraphDatabaseService.execute(cypher: String, params: Map<String, Any>) = this.executeTransactionally(cypher, params)
78

8-
fun GraphDatabaseService.execute(cypher: String, params: Map<String, Any>) = this.beginTx().use {
9+
fun <T> GraphDatabaseService.execute(cypher: String, lambda: ((Result) -> T)) = this.execute(cypher, emptyMap(), lambda)
10+
fun <T> GraphDatabaseService.execute(cypher: String, params: Map<String, Any>, lambda: ((Result) -> T)) = this.beginTx().use {
911
val resp = it.execute(cypher, params)
1012
it.commit()
11-
resp
12-
}
13-
14-
fun Config.raw() = this.values.mapNotNull {
15-
val value = it.value?.toString()
16-
if (value == null) {
17-
null
18-
} else {
19-
val key = it.key.name()
20-
key to value
21-
}
22-
}.toMap()
13+
lambda.let { it(resp) }
14+
}

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ object Neo4jUtils {
2727
return false
2828
}
2929

30-
val role = db.execute("CALL dbms.cluster.role()").columnAs<String>("role").next()
30+
val role = db.execute("CALL dbms.cluster.role()") { it.columnAs<String>("role").next() }
3131
return role.equals(LEADER, ignoreCase = true)
3232
} catch (e: QueryExecutionException) {
3333
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
@@ -44,7 +44,7 @@ object Neo4jUtils {
4444

4545
fun isCluster(db: GraphDatabaseAPI): Boolean {
4646
try {
47-
db.execute("CALL dbms.cluster.role()").columnAs<String>("role").next()
47+
db.execute("CALL dbms.cluster.role()") { it.columnAs<String>("role").next() }
4848
return true
4949
} catch (e: QueryExecutionException) {
5050
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
@@ -59,11 +59,12 @@ object Neo4jUtils {
5959
return db.execute("""
6060
CALL dbms.cluster.overview() YIELD role
6161
RETURN role
62-
""".trimIndent())
63-
.columnAs<String>("role")
64-
.stream()
65-
.toList()
66-
.contains(LEADER)
62+
""".trimIndent()) {
63+
it.columnAs<String>("role")
64+
.stream()
65+
.toList()
66+
.contains(LEADER)
67+
}
6768
} catch (e: QueryExecutionException) {
6869
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
6970
return false
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
streams.config.StreamsConfigExtensionFactory

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@ package streams
33
import org.neo4j.configuration.Config
44
import org.neo4j.kernel.internal.GraphDatabaseAPI
55
import org.neo4j.logging.Log
6-
import streams.extensions.raw
76

8-
abstract class StreamsEventSink(private val config: Config,
7+
abstract class StreamsEventSink(private val config: Map<String, String>,
98
private val queryExecution: StreamsEventSinkQueryExecution,
109
private val streamsTopicService: StreamsTopicService,
1110
private val log: Log,
@@ -27,9 +26,9 @@ abstract class StreamsEventSink(private val config: Config,
2726
}
2827

2928
object StreamsEventSinkFactory {
30-
fun getStreamsEventSink(config: Config, streamsQueryExecution: StreamsEventSinkQueryExecution,
29+
fun getStreamsEventSink(config: Map<String, String>, streamsQueryExecution: StreamsEventSinkQueryExecution,
3130
streamsTopicService: StreamsTopicService, log: Log, db: GraphDatabaseAPI): StreamsEventSink {
32-
return Class.forName(config.raw().getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
31+
return Class.forName(config.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
3332
.getConstructor(Config::class.java,
3433
StreamsEventSinkQueryExecution::class.java,
3534
StreamsTopicService::class.java,

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.neo4j.kernel.internal.GraphDatabaseAPI
1010
import org.neo4j.kernel.lifecycle.Lifecycle
1111
import org.neo4j.kernel.lifecycle.LifecycleAdapter
1212
import org.neo4j.logging.internal.LogService
13+
import streams.config.StreamsConfig
1314
import streams.procedures.StreamsSinkProcedures
1415
import streams.service.TopicUtils
1516
import streams.utils.Neo4jUtils
@@ -24,14 +25,14 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
2425
interface Dependencies {
2526
fun graphdatabaseAPI(): GraphDatabaseAPI
2627
fun log(): LogService
27-
fun config(): Config
28+
fun streamsConfig(): StreamsConfig
2829
fun availabilityGuard(): AvailabilityGuard
2930
}
3031

3132
class StreamsEventLifecycle(private val dependencies: Dependencies): LifecycleAdapter() {
3233
private val db = dependencies.graphdatabaseAPI()
3334
private val logService = dependencies.log()
34-
private val configuration = dependencies.config()
35+
private val configuration = dependencies.streamsConfig()
3536
private var streamsLog = logService.getUserLog(StreamsEventLifecycle::class.java)
3637

3738
private lateinit var eventSink: StreamsEventSink
@@ -43,7 +44,7 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
4344
override fun available() {
4445
try {
4546
streamsLog.info("Initialising the Streams Sink module")
46-
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration)
47+
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration.config)
4748
val streamsTopicService = StreamsTopicService(db)
4849
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
4950
streamsSinkConfiguration.sourceIdStrategyConfig)
@@ -54,7 +55,7 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
5455
// Create the Sink
5556
val log = logService.getUserLog(StreamsEventSinkFactory::class.java)
5657
eventSink = StreamsEventSinkFactory
57-
.getStreamsEventSink(configuration,
58+
.getStreamsEventSink(configuration.config,
5859
streamsQueryExecution,
5960
streamsTopicService,
6061
log,

consumer/src/main/kotlin/streams/StreamsEventSinkQueryExecution.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ class StreamsEventSinkQueryExecution(private val streamsTopicService: StreamsTop
2525

2626
override fun write(query: String, params: Collection<Any>) {
2727
if (Neo4jUtils.isWriteableInstance(db)) {
28-
val result = db.execute(query, mapOf("events" to params))
29-
if (log.isDebugEnabled) {
30-
log.debug("Query statistics:\n${result.queryStatistics}")
28+
db.execute(query, mapOf("events" to params)) { result ->
29+
if (log.isDebugEnabled) {
30+
log.debug("Query statistics:\n${result.queryStatistics}")
31+
}
32+
result.close()
3133
}
32-
result.close()
3334
} else {
3435
if (log.isDebugEnabled) {
3536
log.debug("Not writeable instance")

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package streams
22

3-
import org.neo4j.configuration.Config
4-
import streams.extensions.raw
53
import streams.service.TopicUtils
6-
import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
74
import streams.service.Topics
5+
import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
86

97

108
object StreamsSinkConfigurationConstants {
@@ -21,10 +19,6 @@ data class StreamsSinkConfiguration(val enabled: Boolean = false,
2119
val sourceIdStrategyConfig: SourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()) {
2220

2321
companion object {
24-
fun from(cfg: Config): StreamsSinkConfiguration {
25-
return from(cfg.raw())
26-
}
27-
2822
fun from(cfg: Map<String, String>, invalidTopics: List<String> = emptyList()): StreamsSinkConfiguration {
2923
val default = StreamsSinkConfiguration()
3024
val config = cfg

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlinx.coroutines.isActive
99
import kotlinx.coroutines.launch
1010
import kotlinx.coroutines.runBlocking
1111
import org.apache.kafka.clients.consumer.ConsumerConfig
12-
import org.neo4j.configuration.Config
1312
import org.neo4j.kernel.internal.GraphDatabaseAPI
1413
import org.neo4j.logging.Log
1514
import streams.StreamsEventConsumer
@@ -19,7 +18,6 @@ import streams.StreamsEventSinkQueryExecution
1918
import streams.StreamsSinkConfiguration
2019
import streams.StreamsSinkConfigurationConstants
2120
import streams.StreamsTopicService
22-
import streams.extensions.raw
2321
import streams.service.errors.ErrorService
2422
import streams.service.errors.KafkaErrorService
2523
import streams.utils.KafkaValidationUtils.getInvalidTopicsError
@@ -28,7 +26,7 @@ import streams.utils.StreamsUtils
2826
import java.util.concurrent.TimeUnit
2927

3028

31-
class KafkaEventSink(private val config: Config,
29+
class KafkaEventSink(private val config: Map<String, String>,
3230
private val queryExecution: StreamsEventSinkQueryExecution,
3331
private val streamsTopicService: StreamsTopicService,
3432
private val log: Log,
@@ -37,7 +35,7 @@ class KafkaEventSink(private val config: Config,
3735
private lateinit var eventConsumer: StreamsEventConsumer
3836
private lateinit var job: Job
3937

40-
override val streamsConfigMap = config.raw().filterKeys {
38+
override val streamsConfigMap = config.filterKeys {
4139
it.startsWith("kafka.") || (it.startsWith("streams.") && !it.startsWith("streams.sink.topic.cypher."))
4240
}.toMap()
4341

@@ -78,7 +76,7 @@ class KafkaEventSink(private val config: Config,
7876
}
7977
log.info("Starting the Kafka Sink")
8078
this.eventConsumer = getEventConsumerFactory()
81-
.createStreamsEventConsumer(config.raw(), log)
79+
.createStreamsEventConsumer(config, log)
8280
.withTopics(topics)
8381
this.eventConsumer.start()
8482
this.job = createJob()

0 commit comments

Comments
 (0)