Skip to content

Commit 94363c4

Browse files
authored
fixes #266: [Multidatabase] Sink: Implement “sink.to” syntax / config (#278)
1 parent e17d6d6 commit 94363c4

File tree

48 files changed

+745
-415
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+745
-415
lines changed

common/src/main/kotlin/streams/config/StreamsConfig.kt

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,39 @@ package streams.config
22

33
import org.neo4j.dbms.api.DatabaseManagementService
44
import org.neo4j.kernel.lifecycle.LifecycleAdapter
5-
import org.neo4j.logging.internal.LogService
5+
import org.neo4j.logging.Log
66
import streams.extensions.getDefaultDbName
77
import java.io.FileInputStream
88
import java.io.FileNotFoundException
99
import java.util.Properties
1010
import java.util.concurrent.ConcurrentHashMap
1111

12-
class StreamsConfig(logService: LogService, private val dbms: DatabaseManagementService) : LifecycleAdapter() {
12+
data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementService) : LifecycleAdapter() {
1313

1414
val config = ConcurrentHashMap<String, String>()
1515

16-
private val log = logService.getUserLog(StreamsConfig::class.java)
1716

1817
private lateinit var neo4jConfFolder: String
1918

2019
companion object {
20+
private lateinit var INSTANCE: StreamsConfig
2121
private val SUPPORTED_PREFIXES = listOf("streams", "kafka")
2222
private const val SUN_JAVA_COMMAND = "sun.java.command"
2323
private const val CONF_DIR_ARG = "config-dir="
2424
const val SOURCE_ENABLED = "streams.source.enabled"
2525
const val SOURCE_ENABLED_VALUE = true
2626
const val PROCEDURES_ENABLED = "streams.procedures.enabled"
2727
const val PROCEDURES_ENABLED_VALUE = true
28+
const val SINK_ENABLED = "streams.sink.enabled"
29+
const val SINK_ENABLED_VALUE = false
2830
const val DEFAULT_PATH = "."
2931
private var afterInitListeners = mutableListOf<((MutableMap<String, String>) -> Unit)>()
3032

3133
fun registerListener(after: (MutableMap<String, String>) -> Unit) {
3234
afterInitListeners.add(after)
3335
}
36+
37+
fun getInstance() = INSTANCE
3438
}
3539

3640
override fun init() {
@@ -40,6 +44,7 @@ class StreamsConfig(logService: LogService, private val dbms: DatabaseManagement
4044
neo4jConfFolder = getNeo4jConfFolder()
4145
loadConfiguration()
4246
afterInitListeners.forEach { it(config) }
47+
INSTANCE = this
4348
}
4449

4550
override fun stop() {
@@ -88,10 +93,10 @@ class StreamsConfig(logService: LogService, private val dbms: DatabaseManagement
8893

8994
val properties = Properties()
9095
try {
91-
log.info("the retrieved NEO4J_CONF dirs is $neo4jConfFolder")
96+
log.info("The retrieved NEO4J_CONF dir is $neo4jConfFolder")
9297
properties.load(FileInputStream("$neo4jConfFolder/neo4j.conf"))
9398
} catch (e: FileNotFoundException) {
94-
log.error("the neo4j.conf file is not under the directory defined into the directory $neo4jConfFolder, please set the NEO4J_CONF env correctly")
99+
log.error("The neo4j.conf file is not under the directory defined into the directory $neo4jConfFolder, please set the NEO4J_CONF env correctly")
95100
}
96101
return properties
97102
}
@@ -124,4 +129,9 @@ class StreamsConfig(logService: LogService, private val dbms: DatabaseManagement
124129
fun hasProceduresGloballyEnabled() = this.config.getOrDefault(PROCEDURES_ENABLED, PROCEDURES_ENABLED_VALUE).toString().toBoolean()
125130

126131
fun hasProceduresEnabled(dbName: String) = this.config.getOrDefault("${PROCEDURES_ENABLED}.$dbName", hasProceduresGloballyEnabled()).toString().toBoolean()
132+
133+
fun isSinkGloballyEnabled() = this.config.getOrDefault(SINK_ENABLED, SINK_ENABLED_VALUE).toString().toBoolean()
134+
135+
fun isSinkEnabled(dbName: String) = this.config.getOrDefault("${SINK_ENABLED}.to.$dbName", isSinkGloballyEnabled()).toString().toBoolean()
136+
127137
}

common/src/main/kotlin/streams/config/StreamsConfigExtensionFactory.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ class StreamsConfigExtensionFactory: ExtensionFactory<StreamsConfigExtensionFact
1616
}
1717

1818
override fun newInstance(context: ExtensionContext, dependencies: Dependencies): Lifecycle {
19-
return StreamsConfig(dependencies.log(), dependencies.dbms())
19+
return StreamsConfig(dependencies.log().getUserLog(StreamsConfig::class.java), dependencies.dbms())
2020
}
2121
}

common/src/main/kotlin/streams/service/Topics.kt

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@ package streams.service
33
import streams.service.sink.strategy.*
44
import kotlin.reflect.jvm.javaType
55

6+
class TopicValidationException(message: String): RuntimeException(message)
7+
8+
private fun TopicType.replaceKeyBy(replacePrefix: Pair<String, String>) = if (replacePrefix.first.isNullOrBlank())
9+
this.key
10+
else
11+
this.key.replace(replacePrefix.first, replacePrefix.second)
12+
613
data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
714
val cdcSourceIdTopics: Set<String> = emptySet(),
815
val cdcSchemaTopics: Set<String> = emptySet(),
@@ -11,6 +18,16 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
1118
val relPatternTopics: Map<String, RelationshipPatternConfiguration> = emptyMap(),
1219
val invalid: List<String> = emptyList()) {
1320

21+
operator fun plus(other: Topics): Topics {
22+
return Topics(cypherTopics = this.cypherTopics + other.cypherTopics,
23+
cdcSourceIdTopics = this.cdcSourceIdTopics + other.cdcSourceIdTopics,
24+
cdcSchemaTopics = this.cdcSchemaTopics + other.cdcSchemaTopics,
25+
cudTopics = this.cudTopics + other.cudTopics,
26+
nodePatternTopics = this.nodePatternTopics + other.nodePatternTopics,
27+
relPatternTopics = this.relPatternTopics + other.relPatternTopics,
28+
invalid = this.invalid + other.invalid)
29+
}
30+
1431
fun allTopics(): List<String> = this.asMap()
1532
.map {
1633
if (it.key.group == TopicTypeGroup.CDC || it.key.group == TopicTypeGroup.CUD) {
@@ -26,13 +43,16 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
2643
TopicType.PATTERN_NODE to nodePatternTopics, TopicType.PATTERN_RELATIONSHIP to relPatternTopics)
2744

2845
companion object {
29-
fun from(config: Map<*, *>, prefix: String, toReplace: String = "", invalidTopics: List<String> = emptyList()): Topics {
30-
val cypherTopicPrefix = TopicType.CYPHER.key.replace(prefix, toReplace)
31-
val sourceIdKey = TopicType.CDC_SOURCE_ID.key.replace(prefix, toReplace)
32-
val schemaKey = TopicType.CDC_SCHEMA.key.replace(prefix, toReplace)
33-
val cudKey = TopicType.CUD.key.replace(prefix, toReplace)
34-
val nodePatterKey = TopicType.PATTERN_NODE.key.replace(prefix, toReplace)
35-
val relPatterKey = TopicType.PATTERN_RELATIONSHIP.key.replace(prefix, toReplace)
46+
fun from(map: Map<String, Any?>, replacePrefix: Pair<String, String> = ("" to ""), dbName: String = "", invalidTopics: List<String> = emptyList()): Topics {
47+
val config = map
48+
.filterKeys { if (dbName.isNotBlank()) it.endsWith(".to.$dbName") else !it.contains(".to.") }
49+
.mapKeys { if (dbName.isNotBlank()) it.key.replace(".to.$dbName", "") else it.key }
50+
val cypherTopicPrefix = TopicType.CYPHER.replaceKeyBy(replacePrefix)
51+
val sourceIdKey = TopicType.CDC_SOURCE_ID.replaceKeyBy(replacePrefix)
52+
val schemaKey = TopicType.CDC_SCHEMA.replaceKeyBy(replacePrefix)
53+
val cudKey = TopicType.CUD.replaceKeyBy(replacePrefix)
54+
val nodePatterKey = TopicType.PATTERN_NODE.replaceKeyBy(replacePrefix)
55+
val relPatterKey = TopicType.PATTERN_RELATIONSHIP.replaceKeyBy(replacePrefix)
3656
val cypherTopics = TopicUtils.filterByPrefix(config, cypherTopicPrefix)
3757
val nodePatternTopics = TopicUtils
3858
.filterByPrefix(config, nodePatterKey, invalidTopics)

consumer/src/main/kotlin/streams/StreamsEventConsumer.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package streams
22

33
import org.neo4j.logging.Log
4+
import streams.config.StreamsConfig
45
import streams.service.StreamsSinkEntity
56
import streams.service.errors.ErrorService
67

@@ -23,5 +24,5 @@ abstract class StreamsEventConsumer(private val log: Log, private val dlqService
2324

2425

2526
abstract class StreamsEventConsumerFactory {
26-
abstract fun createStreamsEventConsumer(config: Map<String, String>, log: Log): StreamsEventConsumer
27+
abstract fun createStreamsEventConsumer(config: StreamsConfig, log: Log): StreamsEventConsumer
2728
}

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package streams
33
import org.neo4j.configuration.Config
44
import org.neo4j.kernel.internal.GraphDatabaseAPI
55
import org.neo4j.logging.Log
6+
import streams.config.StreamsConfig
67

7-
abstract class StreamsEventSink(private val config: Map<String, String>,
8+
abstract class StreamsEventSink(private val config: StreamsConfig,
89
private val queryExecution: StreamsEventSinkQueryExecution,
910
private val streamsTopicService: StreamsTopicService,
1011
private val log: Log,
@@ -26,10 +27,10 @@ abstract class StreamsEventSink(private val config: Map<String, String>,
2627
}
2728

2829
object StreamsEventSinkFactory {
29-
fun getStreamsEventSink(config: Map<String, String>, streamsQueryExecution: StreamsEventSinkQueryExecution,
30+
fun getStreamsEventSink(config: StreamsConfig, streamsQueryExecution: StreamsEventSinkQueryExecution,
3031
streamsTopicService: StreamsTopicService, log: Log, db: GraphDatabaseAPI): StreamsEventSink {
31-
return Class.forName(config.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
32-
.getConstructor(Map::class.java,
32+
return Class.forName(config.config.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
33+
.getConstructor(StreamsConfig::class.java,
3334
StreamsEventSinkQueryExecution::class.java,
3435
StreamsTopicService::class.java,
3536
Log::class.java,

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import org.neo4j.kernel.lifecycle.Lifecycle
1111
import org.neo4j.kernel.lifecycle.LifecycleAdapter
1212
import org.neo4j.logging.internal.LogService
1313
import streams.config.StreamsConfig
14-
import streams.extensions.getSystemDb
1514
import streams.extensions.isSystemDb
1615
import streams.procedures.StreamsSinkProcedures
1716
import streams.service.TopicUtils
@@ -50,9 +49,10 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
5049

5150
override fun available() {
5251
try {
52+
configuration.loadStreamsConfiguration()
5353
streamsLog.info("Initialising the Streams Sink module")
54-
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration.config)
55-
val streamsTopicService = StreamsTopicService(dbms.getSystemDb())
54+
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration, db.databaseName())
55+
val streamsTopicService = StreamsTopicService()
5656
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
5757
streamsSinkConfiguration.sourceIdStrategyConfig)
5858
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db,
@@ -62,7 +62,7 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
6262
// Create the Sink
6363
val log = logService.getUserLog(StreamsEventSinkFactory::class.java)
6464
eventSink = StreamsEventSinkFactory
65-
.getStreamsEventSink(configuration.config,
65+
.getStreamsEventSink(configuration,
6666
streamsQueryExecution,
6767
streamsTopicService,
6868
log,

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,48 @@
11
package streams
22

3+
import streams.config.StreamsConfig
34
import streams.service.TopicUtils
5+
import streams.service.TopicValidationException
46
import streams.service.Topics
57
import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
68

7-
8-
object StreamsSinkConfigurationConstants {
9-
const val STREAMS_CONFIG_PREFIX: String = "streams."
10-
const val ENABLED = "sink.enabled"
11-
const val PROCEDURES_ENABLED = "procedures.enabled"
12-
}
13-
14-
data class StreamsSinkConfiguration(val enabled: Boolean = false,
15-
val proceduresEnabled: Boolean = true,
9+
data class StreamsSinkConfiguration(val enabled: Boolean = StreamsConfig.SINK_ENABLED_VALUE,
10+
val proceduresEnabled: Boolean = StreamsConfig.PROCEDURES_ENABLED_VALUE,
1611
val sinkPollingInterval: Long = 10000,
1712
val topics: Topics = Topics(),
1813
val errorConfig: Map<String,Any?> = emptyMap(),
1914
val sourceIdStrategyConfig: SourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()) {
2015

2116
companion object {
22-
fun from(cfg: Map<String, String>, invalidTopics: List<String> = emptyList()): StreamsSinkConfiguration {
17+
fun from(cfg: StreamsConfig, dbName: String, invalidTopics: List<String> = emptyList()): StreamsSinkConfiguration {
2318
val default = StreamsSinkConfiguration()
24-
val config = cfg
25-
.filterKeys { it.startsWith(StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX) }
26-
.mapKeys { it.key.substring(StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX.length) }
2719

28-
val topics = Topics.from(config = config,
29-
prefix = StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX,
30-
invalidTopics = invalidTopics)
20+
var topics = Topics.from(map = cfg.config, dbName = dbName, invalidTopics = invalidTopics)
21+
val isDefaultDb = cfg.isDefaultDb(dbName)
22+
if (isDefaultDb) {
23+
topics += Topics.from(map = cfg.config, invalidTopics = invalidTopics)
24+
}
3125

32-
TopicUtils.validate<RuntimeException>(topics)
26+
TopicUtils.validate<TopicValidationException>(topics)
3327

28+
val sourceIdStrategyConfigPrefix = "streams.sink.topic.cdc.sourceId"
29+
val (sourceIdStrategyLabelNameKey, sourceIdStrategyIdNameKey) = if (isDefaultDb) {
30+
"labelName" to "idName"
31+
} else {
32+
"labelName.to.$dbName" to "idName.to.$dbName"
33+
}
3434
val defaultSourceIdStrategyConfig = SourceIdIngestionStrategyConfig()
35-
3635
val sourceIdStrategyConfig = SourceIdIngestionStrategyConfig(
37-
config.getOrDefault("sink.topic.cdc.sourceId.labelName", defaultSourceIdStrategyConfig.labelName),
38-
config.getOrDefault("sink.topic.cdc.sourceId.idName", defaultSourceIdStrategyConfig.idName))
36+
cfg.config.getOrDefault("$sourceIdStrategyConfigPrefix.$sourceIdStrategyLabelNameKey", defaultSourceIdStrategyConfig.labelName),
37+
cfg.config.getOrDefault("$sourceIdStrategyConfigPrefix.$sourceIdStrategyIdNameKey", defaultSourceIdStrategyConfig.idName))
3938

40-
val errorHandler = config.filterKeys { it.startsWith("sink.error") }.mapKeys { it.key.substring("sink.".length) }
39+
val errorHandler = cfg.config
40+
.filterKeys { it.startsWith("streams.sink.error") }
41+
.mapKeys { it.key.substring("streams.sink.".length) }
4142

42-
return default.copy(enabled = config.getOrDefault(StreamsSinkConfigurationConstants.ENABLED, default.enabled).toString().toBoolean(),
43-
proceduresEnabled = config.getOrDefault(StreamsSinkConfigurationConstants.PROCEDURES_ENABLED, default.proceduresEnabled)
44-
.toString().toBoolean(),
45-
sinkPollingInterval = config.getOrDefault("sink.polling.interval", default.sinkPollingInterval).toString().toLong(),
43+
return default.copy(enabled = cfg.isSinkEnabled(dbName),
44+
proceduresEnabled = cfg.hasProceduresEnabled(dbName),
45+
sinkPollingInterval = cfg.config.getOrDefault("streams.sink.polling.interval", default.sinkPollingInterval).toString().toLong(),
4646
topics = topics,
4747
errorConfig = errorHandler,
4848
sourceIdStrategyConfig = sourceIdStrategyConfig)

0 commit comments

Comments
 (0)