Skip to content

Commit 3a6d184

Browse files
Fixed #295: Lower-case and normalize all database names in 4.0 configs (#300)
1 parent 3d03279 commit 3a6d184

File tree

6 files changed

+81
-29
lines changed

6 files changed

+81
-29
lines changed

common/src/main/kotlin/streams/service/Topics.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
4545
companion object {
4646
fun from(map: Map<String, Any?>, replacePrefix: Pair<String, String> = ("" to ""), dbName: String = "", invalidTopics: List<String> = emptyList()): Topics {
4747
val config = map
48-
.filterKeys { if (dbName.isNotBlank()) it.endsWith(".to.$dbName") else !it.contains(".to.") }
49-
.mapKeys { if (dbName.isNotBlank()) it.key.replace(".to.$dbName", "") else it.key }
48+
.filterKeys { if (dbName.isNotBlank()) it.toLowerCase().endsWith(".to.$dbName") else !it.contains(".to.") }
49+
.mapKeys { if (dbName.isNotBlank()) it.key.replace(".to.$dbName", "", true) else it.key }
5050
val cypherTopicPrefix = TopicType.CYPHER.replaceKeyBy(replacePrefix)
5151
val sourceIdKey = TopicType.CDC_SOURCE_ID.replaceKeyBy(replacePrefix)
5252
val schemaKey = TopicType.CDC_SCHEMA.replaceKeyBy(replacePrefix)

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkEnterpriseTSE.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class KafkaEventSinkEnterpriseTSE {
2828
companion object {
2929

3030
private var startedFromSuite = true
31-
val DB_NAME_NAMES = listOf("foo", "bar")
31+
val DB_NAME_NAMES = listOf("foo", "bar", "nonLowerCaseDb")
3232

3333
@JvmStatic
3434
val neo4j = Neo4jContainerExtension()
@@ -48,7 +48,8 @@ class KafkaEventSinkEnterpriseTSE {
4848
DB_NAME_NAMES.forEach { neo4j.withNeo4jConfig("streams.sink.enabled.to.$it", "true") } // we enable the sink plugin only for the instances
4949
neo4j.withNeo4jConfig("streams.sink.topic.cypher.enterpriseCypherTopic.to.foo", "MERGE (c:Customer_foo {id: event.id, foo: 'foo'})")
5050
neo4j.withNeo4jConfig("streams.sink.topic.cypher.enterpriseCypherTopic.to.bar", "MERGE (c:Customer_bar {id: event.id, bar: 'bar'})")
51-
neo4j.withDatabases("foo", "bar", "baz")
51+
neo4j.withNeo4jConfig("streams.sink.topic.cypher.enterpriseCypherTopic.to.nonLowerCaseDb", "MERGE (c:Customer_nonLowerCaseDb {id: event.id, nonLowerCaseDb: 'nonLowerCaseDb'})")
52+
neo4j.withDatabases("foo", "bar", "nonLowerCaseDb", "baz")
5253
neo4j.start()
5354
Assume.assumeTrue("Neo4j must be running", neo4j.isRunning)
5455
}, IllegalStateException::class.java)
@@ -111,6 +112,10 @@ class KafkaEventSinkEnterpriseTSE {
111112
val nodes = getData("bar")
112113
1 == nodes.size && mapOf("id" to 1L, "bar" to "bar") == nodes[0]
113114
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
115+
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
116+
val nodes = getData("nonLowerCaseDb")
117+
1 == nodes.size && mapOf("id" to 1L, "nonLowerCaseDb" to "nonLowerCaseDb") == nodes[0]
118+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
114119

115120
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
116121
val nodes = getData("neo4j")

consumer/src/test/kotlin/streams/StreamsSinkConfigurationTest.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,21 @@ class StreamsSinkConfigurationTest {
9898
assertEquals(customId, streamsSinkConf.sourceIdStrategyConfig.idName)
9999
}
100100

101+
@Test
102+
fun shouldReturnConfigurationFromMapWithNonLowerCaseDbName() {
103+
val topic = "mytopic"
104+
val topicKey = "streams.sink.topic.cypher.$topic.to.nonLowerCaseDb"
105+
val topicValue = "MERGE (n:Label{ id: event.id })"
106+
val config = mapOf(
107+
topicKey to topicValue,
108+
"streams.sink.enabled" to "false",
109+
"streams.sink.enabled.to.nonLowerCaseDb" to "true")
110+
streamsConfig.config.putAll(config)
111+
val streamsSinkConf = StreamsSinkConfiguration.from(streamsConfig, "nonlowercasedb")
112+
assertFalse { streamsSinkConf.enabled }
113+
assertEquals(topicValue, streamsSinkConf.topics.cypherTopics[topic])
114+
}
115+
101116
@Test(expected = TopicValidationException::class)
102117
fun shouldFailWithCrossDefinedTopics() {
103118
val topic = "topic-neo"

doc/asciidoc/consumer/configuration.adoc

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,11 @@ streams.sink.topic.cdc.schema.to.<DB_NAME>=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON
3737
streams.sink.topic.pattern.node.<TOPIC_NAME>.to.<DB_NAME>=<NODE_EXTRACTION_PATTERN>
3838
streams.sink.topic.pattern.relationship.<TOPIC_NAME>.to.<DB_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
3939
streams.sink.enabled.to.<DB_NAME>=<true/false, default=true>
40-
streams.procedures.enabled.<DB_NAME>=<true/false, default=true>
4140
----
4241

4342
This means that for each db instance you can specify if:
4443
* use the source connector
4544
* the routing patterns
46-
* use the procedures
4745

4846
So if you have a instance name `foo` you can specify a configuration in this way:
4947

@@ -54,7 +52,6 @@ streams.sink.topic.cdc.schema.to.foo=<LIST_OF_TOPICS_SEPARATE_BY_SEMICOLON>
5452
streams.sink.topic.pattern.node.<TOPIC_NAME>.to.foo=<NODE_EXTRACTION_PATTERN>
5553
streams.sink.topic.pattern.relationship.<TOPIC_NAME>.to.foo=<RELATIONSHIP_EXTRACTION_PATTERN>
5654
streams.sink.enabled.to.foo=<true/false, default=true>
57-
streams.procedures.enabled.foo=<true/false, default=true>
5855
----
5956

6057
The old properties:
@@ -68,21 +65,22 @@ streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PA
6865
streams.sink.enabled=<true/false, default=true>
6966
----
7067

71-
are still valid and they refer to Neo4j's default db instance, which is usually called `neo4j`, but can be controlled by separate Neo4j system configuration.
68+
are still valid and they refer to Neo4j's default db instance, which is usually called `neo4j`, but can be controlled by
69+
separate Neo4j system configuration.
7270

73-
[abstract]
74-
.How manage the Neo4j's default db
75-
--
76-
The default database is controlled by Neo4j's dbms.default_database configuration property so we're being clear about which default database applies for this user.
77-
Database names are case-sensitive, and must follow Neo4j database naming rules (Reference: https://neo4j.com/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration)
78-
--
71+
[NOTE]
72+
====
73+
The default database is controlled by Neo4j's *dbms.default_database* configuration property so we're being clear about
74+
which default database applies for this user.
75+
Database names are case-insensitive and normalized to lowercase, and must follow Neo4j database naming rules.
76+
(Reference: https://neo4j.com/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration)
77+
====
7978

80-
In particular the following two properties will be used ad default values
81-
for non-default db instances, in case of the specific configuration params are not provided:
79+
In particular the following property will be used as default values
80+
for non-default db instances, in case of the specific configuration params is not provided:
8281

8382
----
8483
streams.sink.enabled=<true/false, default=true>
85-
streams.procedures.enabled=<true/false, default=true>
8684
----
8785

8886
This means that if you have Neo4j with 3 db instances:

doc/asciidoc/procedures/index.adoc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,43 @@ image::../../images/procedure_not_found.png[title="Neo4j Streams procedure not f
4242
4343
====
4444

45+
==== Multi Database Support
46+
47+
Neo4j 4.0 Enterprise has https://neo4j.com/docs/operations-manual/4.0/manage-databases/[multi-tenancy support],
48+
in order to support this feature you can set for each database instance a configuration suffix with the following pattern
49+
`<DB_NAME>` to the properties in your neo4j.conf file.
50+
51+
So, to enable the Streams procedures the following property should be added:
52+
53+
.neo4j.conf
54+
[subs="verbatim"]
55+
----
56+
streams.procedures.enabled.<DB_NAME>=<true/false, default=true>
57+
----
58+
59+
So if you have a instance name `foo` you can specify a configuration in this way:
60+
61+
.neo4j.conf
62+
----
63+
streams.procedures.enabled.foo=<true/false, default=true>
64+
----
65+
66+
The old property:
67+
68+
.neo4j.conf
69+
----
70+
streams.procedures.enabled=<true/false, default=true>
71+
----
72+
73+
are still valid and it refers to Neo4j's default db instance.
74+
75+
In particular the following property will be used as default value
76+
for non-default db instances, in case of the specific configuration params is not provided:
77+
78+
----
79+
streams.procedures.enabled=<true/false, default=true>
80+
----
81+
4582
=== streams.publish
4683

4784
This procedure allows custom message streaming from Neo4j to the configured environment by using the underlying configured Producer.

doc/asciidoc/producer/configuration.adoc

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,18 @@ Following the list of new properties that allows to support multi-tenancy:
5353
streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
5454
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
5555
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>
56-
streams.procedures.enabled.<DB_NAME>=<true/false, default=true>
5756
----
5857

5958
This means that for each db instance you can specify if:
6059
* use the source connector
6160
* the routing patterns
62-
* use the procedures
6361

6462
So if you have a instance name `foo` you can specify a configuration in this way:
6563

6664
----
6765
streams.source.topic.nodes.myTopic.from.foo=<PATTERN>
6866
streams.source.topic.relationships.myTopic.from.foo=<PATTERN>
6967
streams.source.enabled.from.foo=<true/false, default=true>
70-
streams.procedures.enabled.foo=<true/false, default=true>
7168
----
7269

7370
The old properties:
@@ -81,19 +78,19 @@ streams.procedures.enabled=<true/false, default=true>
8178

8279
are still valid and they refer to Neo4j's default db instance.
8380

84-
[abstract]
85-
.How manage the Neo4j's default db
86-
--
87-
The default database is controlled by Neo4j's dbms.default_database configuration property so we're being clear about which default database applies for this user.
88-
Database names are case-sensitive, and must follow Neo4j database naming rules (Reference: https://neo4j.com/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration)
89-
--
81+
[NOTE]
82+
====
83+
The default database is controlled by Neo4j's *dbms.default_database* configuration property so we're being clear about
84+
which default database applies for this user.
85+
Database names are case-insensitive and normalized to lowercase, and must follow Neo4j database naming rules.
86+
(Reference: https://neo4j.com/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration)
87+
====
9088

91-
In particular the following two properties will be used ad default values
92-
for non-default db instances, in case of the specific configuration params are not provided:
89+
In particular the following property will be used as default value
90+
for non-default db instances, in case of the specific configuration params is not provided:
9391

9492
----
9593
streams.source.enabled=<true/false, default=true>
96-
streams.procedures.enabled=<true/false, default=true>
9794
----
9895

9996
This means that if you have Neo4j with 3 db instances:

0 commit comments

Comments
 (0)