Skip to content

Commit f066934

Browse files
authored
Fixes #442: (4.0v) Enable production of all unique constraints on source messages for relationships (#462)
* wip: enterprise test * added enterprise test * simplified tests
1 parent ff9fdee commit f066934

19 files changed

+817
-78
lines changed

common/src/main/kotlin/streams/events/StreamsEvent.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ data class RelationshipPayload(override val id: String,
4646

4747
enum class StreamsConstraintType { UNIQUE, NODE_PROPERTY_EXISTS, RELATIONSHIP_PROPERTY_EXISTS }
4848

49+
enum class RelKeyStrategy { DEFAULT, ALL }
50+
4951
data class Constraint(val label: String?,
5052
val properties: Set<String>,
5153
val type: StreamsConstraintType)

common/src/main/kotlin/streams/utils/SchemaUtils.kt

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,34 @@
11
package streams.utils
22

33
import streams.events.Constraint
4+
import streams.events.RelKeyStrategy
45
import streams.events.StreamsConstraintType
56
import streams.events.StreamsTransactionEvent
67
import streams.service.StreamsSinkEntity
78

89
object SchemaUtils {
9-
fun getNodeKeys(labels: List<String>, propertyKeys: Set<String>, constraints: List<Constraint>): Set<String> =
10+
fun getNodeKeys(labels: List<String>, propertyKeys: Set<String>, constraints: List<Constraint>, keyStrategy: RelKeyStrategy = RelKeyStrategy.DEFAULT): Set<String> =
1011
constraints
1112
.filter { constraint ->
1213
constraint.type == StreamsConstraintType.UNIQUE
1314
&& propertyKeys.containsAll(constraint.properties)
1415
&& labels.contains(constraint.label)
1516
}
16-
// we order first by properties.size, then by label name and finally by properties name alphabetically
17-
// with properties.sorted() we ensure that ("foo", "bar") and ("bar", "foo") are no different
18-
// with toString() we force it.properties to have the natural sort order, that is alphabetically
19-
.minWithOrNull((compareBy({ it.properties.size }, { it.label }, { it.properties.sorted().toString() })))
20-
?.properties
21-
.orEmpty()
17+
.let {
18+
when(keyStrategy) {
19+
RelKeyStrategy.DEFAULT -> {
20+
// we order first by properties.size, then by label name and finally by properties name alphabetically
21+
// with properties.sorted() we ensure that ("foo", "bar") and ("bar", "foo") are no different
22+
// with toString() we force it.properties to have the natural sort order, that is alphabetically
23+
it.minWithOrNull((compareBy({ it.properties.size }, { it.label }, { it.properties.sorted().toString() })))
24+
?.properties
25+
.orEmpty()
26+
}
27+
// with 'ALL' strategy we get a set with all properties
28+
RelKeyStrategy.ALL -> it.flatMap { it.properties }.toSet()
29+
}
30+
}
31+
2232

2333
fun toStreamsTransactionEvent(streamsSinkEntity: StreamsSinkEntity,
2434
evaluation: (StreamsTransactionEvent) -> Boolean)

common/src/test/kotlin/streams/utils/SchemaUtilsTest.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package streams.utils
22

33
import org.junit.Test
44
import streams.events.Constraint
5+
import streams.events.RelKeyStrategy
56
import streams.events.StreamsConstraintType
67
import streams.utils.SchemaUtils.getNodeKeys
78
import kotlin.test.assertEquals
@@ -48,6 +49,31 @@ class SchemaUtilsTest {
4849
assertEquals(expectedKeys, actualKeys)
4950
}
5051

52+
@Test
53+
fun `getNodeKeys should return all keys when RelKeyStrategy is ALL`() {
54+
55+
val pair1 = "LabelX" to setOf("foo", "aaa")
56+
val pair2 = "LabelB" to setOf("bar", "foo")
57+
val pair3 = "LabelC" to setOf("baz", "bar")
58+
val pair4 = "LabelB" to setOf("bar", "bez")
59+
val pair5 = "LabelA" to setOf("bar", "baa", "xcv")
60+
val pair6 = "LabelC" to setOf("aaa", "baa", "xcz")
61+
val pair7 = "LabelA" to setOf("foo", "aac")
62+
val pair8 = "LabelA" to setOf("foo", "aab")
63+
val props = listOf(pair1, pair2, pair3, pair4, pair5, pair6, pair7, pair8)
64+
65+
// we shuffle the constraints to ensure that the result doesn't depend from the ordering
66+
val constraints = props.map {
67+
Constraint(label = it.first, properties = it.second, type = StreamsConstraintType.UNIQUE)
68+
}.shuffled()
69+
70+
val propertyKeys = setOf("prop", "prop2", "foo", "bar", "baz", "bez", "aaa", "aab", "baa", "aac", "xcz", "xcv")
71+
val actualKeys = getNodeKeys(props.map { it.first }, propertyKeys, constraints, RelKeyStrategy.ALL)
72+
val expectedKeys = setOf("aaa", "aab", "aac", "baa", "bar", "baz", "bez", "foo", "xcv", "xcz")
73+
74+
assertEquals(expectedKeys, actualKeys)
75+
}
76+
5177
@Test
5278
fun `getNodeKeys should return the key sorted properly (with one label)`() {
5379
// the method getNodeKeys should select the constraint with lowest properties
@@ -70,6 +96,26 @@ class SchemaUtilsTest {
7096

7197
assertEquals(expectedKeys, actualKeys)
7298
}
99+
@Test
100+
fun `getNodeKeys should return all keys when RelKeyStrategy is ALL (with one label)`() {
101+
102+
val pair1 = "LabelA" to setOf("foo", "bar")
103+
val pair2 = "LabelA" to setOf("bar", "foo")
104+
val pair3 = "LabelA" to setOf("baz", "bar")
105+
val pair4 = "LabelA" to setOf("bar", "bez")
106+
val props = listOf(pair1, pair2, pair3, pair4)
107+
108+
// we shuffle the constraints to ensure that the result doesn't depend from the ordering
109+
val constraints = props.map {
110+
Constraint(label = it.first, properties = it.second, type = StreamsConstraintType.UNIQUE)
111+
}.shuffled()
112+
113+
val propertyKeys = setOf("prop", "foo", "bar", "baz", "bez")
114+
val actualKeys = getNodeKeys(listOf("LabelA"), propertyKeys, constraints, RelKeyStrategy.ALL)
115+
val expectedKeys = setOf("bar", "baz", "bez", "foo")
116+
117+
assertEquals(expectedKeys, actualKeys)
118+
}
73119

74120
@Test
75121
fun `getNodeKeys should return empty in case it didn't match anything`() {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
With `streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=default`,
2+
when I produce a message for relationships, only one of the associated node constraints will be returned,
3+
based on the following rule.
4+
5+
If there are multiple constraints, we sort by the number of the properties associated to the constraint,
6+
then by label name (alphabetically) and finally by properties name (alphabetically).
7+
8+
Finally, we take the properties of the first one.
9+
10+
So, if we have a start node with labels `Person` and `Other`,
11+
and we create 2 constraints like these:
12+
13+
[source, cypher]
14+
----
15+
CREATE CONSTRAINT ON (p:Other) ASSERT p.zxc IS UNIQUE;
16+
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
17+
----
18+
19+
the `start.ids` field produced, will be:
20+
[source,json]
21+
----
22+
{"zxc": "FooBar"}
23+
----
24+
25+
because properties size is the same (that is 1), but first label by name is `Other`.
26+
27+
Otherwise, with:
28+
29+
[source, cypher]
30+
----
31+
CREATE CONSTRAINT ON (p:Other) ASSERT p.zxc IS UNIQUE;
32+
CREATE CONSTRAINT ON (p:Other) ASSERT p.name IS UNIQUE;
33+
----
34+
35+
the `start.ids` field produced, will be:
36+
[source,json]
37+
----
38+
{"name": "Sherlock"}
39+
----
40+
41+
because of `name` property name compared to `zxc`.
42+
43+
44+
Otherwise with `streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=all`,
45+
any property participating in a unique constraint will be produced.
46+
So, with a start node with labels `Person`, `Other`, `Another` and with these constraints:
47+
48+
[source, cypher]
49+
----
50+
CREATE CONSTRAINT ON (p:Another) ASSERT p.zxc IS UNIQUE;
51+
CREATE CONSTRAINT ON (p:Other) ASSERT p.name IS UNIQUE;
52+
CREATE CONSTRAINT ON (p:Person) ASSERT p.surname IS UNIQUE;
53+
----
54+
55+
the `start.ids` field produced, will be:
56+
[source,json]
57+
----
58+
{ "name": "Sherlock", "surname": "Holmes", "zxc": "FooBar"}
59+
----

doc/docs/modules/ROOT/pages/message-structure.adoc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
== Message structure
1+
=== Message structure
22

33
The message key structure depends on `kafka.streams.log.compaction.strategy`.
44

@@ -111,3 +111,10 @@ We obtain this key:
111111
----
112112
{"start": "0", "end": "1", "label": "BUYS"}
113113
----
114+
115+
[NOTE]
116+
====
117+
In case of relationships with multiple constraints on start or end node,
118+
the `ids` fields depend on `streams.source.topic.relationships.<TOPIC_NAME>.key_strategy` config.
119+
xref:key-strategy.adoc[See 'key-strategy' section to more details]
120+
====

doc/docs/modules/ROOT/pages/producer-configuration.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ kafka.streams.log.compaction.strategy=delete
2121
2222
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
2323
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
24+
streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=<default/all>
2425
streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
2526
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
27+
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<default/all>
2628
streams.source.enabled=<true/false, default=true>
2729
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>
2830
streams.procedures.enabled.from.<DB_NAME>=<true/false, default=true>
@@ -67,6 +69,7 @@ Following the list of new properties that allows to support multi-tenancy:
6769
----
6870
streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
6971
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
72+
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<PATTERN>
7073
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>
7174
----
7275

doc/docs/modules/ROOT/pages/producer.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ include::message-structure.adoc[]
2929

3030
include::producer-patterns.adoc[]
3131

32+
=== Relationship key strategy
33+
34+
include::key-strategy.adoc[]
35+
36+
3237
== Transaction Event Handler
3338

3439
The transaction event handler is the core of the Stream Producer and allows to stream database changes.

producer/src/main/kotlin/streams/RoutingConfiguration.kt

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.apache.kafka.common.internals.Topic
55
import org.neo4j.graphdb.Entity
66
import org.neo4j.graphdb.Node
77
import org.neo4j.graphdb.Relationship
8+
import org.neo4j.logging.Log
89
import streams.events.*
910

1011

@@ -154,6 +155,7 @@ data class NodeRoutingConfiguration(val labels: List<String> = emptyList(),
154155
}
155156

156157
data class RelationshipRoutingConfiguration(val name: String = "",
158+
val relKeyStrategy: RelKeyStrategy = RelKeyStrategy.DEFAULT,
157159
override val topic: String = "neo4j",
158160
override val all: Boolean = true,
159161
override val include: List<String> = emptyList(),
@@ -170,7 +172,7 @@ data class RelationshipRoutingConfiguration(val name: String = "",
170172
}
171173

172174
companion object {
173-
fun parse(topic: String, pattern: String): List<RelationshipRoutingConfiguration> {
175+
fun parse(topic: String, pattern: String, keyStrategyString: String = RelKeyStrategy.DEFAULT.toString(), log: Log? = null): List<RelationshipRoutingConfiguration> {
174176
Topic.validate(topic)
175177
if (pattern == PATTERN_WILDCARD) {
176178
return listOf(RelationshipRoutingConfiguration(topic = topic))
@@ -185,9 +187,17 @@ data class RelationshipRoutingConfiguration(val name: String = "",
185187
throw IllegalArgumentException("The pattern $pattern for topic $topic is invalid")
186188
}
187189
val properties = RoutingProperties.from(matcher)
190+
191+
val relKeyStrategy = try {
192+
RelKeyStrategy.valueOf(keyStrategyString.toUpperCase())
193+
} catch (e: IllegalArgumentException) {
194+
log?.warn("Invalid key strategy setting, switching to default value ${RelKeyStrategy.DEFAULT.toString().toLowerCase()}")
195+
RelKeyStrategy.DEFAULT
196+
}
197+
188198
RelationshipRoutingConfiguration(name = labels.first().trim().replace(BACKTICK_CHAR, StringUtils.EMPTY),
189199
topic = topic, all = properties.all,
190-
include = properties.include, exclude = properties.exclude)
200+
include = properties.include, exclude = properties.exclude, relKeyStrategy = relKeyStrategy)
191201
}
192202
}
193203
}
@@ -230,10 +240,10 @@ data class RelationshipRoutingConfiguration(val name: String = "",
230240
}
231241

232242
object RoutingConfigurationFactory {
233-
fun getRoutingConfiguration(topic: String, line: String, entityType: EntityType): List<RoutingConfiguration> {
243+
fun getRoutingConfiguration(topic: String, line: String, entityType: EntityType, keyStrategy: String = RelKeyStrategy.DEFAULT.toString(), log: Log? = null): List<RoutingConfiguration> {
234244
return when (entityType) {
235245
EntityType.node -> NodeRoutingConfiguration.parse(topic, line)
236-
EntityType.relationship -> RelationshipRoutingConfiguration.parse(topic, line)
246+
EntityType.relationship -> RelationshipRoutingConfiguration.parse(topic, line, keyStrategy, log)
237247
}
238248
}
239249
}

producer/src/main/kotlin/streams/StreamsEventRouterConfiguration.kt

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,41 @@
11
package streams
22

33
import org.apache.commons.lang3.StringUtils
4+
import org.neo4j.logging.Log
45
import streams.config.StreamsConfig
56
import streams.events.EntityType
7+
import streams.events.RelKeyStrategy
68

79

8-
private inline fun <reified T> filterMap(config: Map<String, String>, routingPrefix: String, dbName: String = ""): List<T> {
10+
private inline fun <reified T> filterMap(config: Map<String, String>, routingPrefix: String, dbName: String = "", routingSuffix: String? = null, log: Log? = null): List<T> {
911
val entityType = when (T::class) {
1012
NodeRoutingConfiguration::class -> EntityType.node
1113
RelationshipRoutingConfiguration::class -> EntityType.relationship
1214
else -> throw IllegalArgumentException("The class must be an instance of RoutingConfiguration")
1315
}
1416
return config
1517
.filterKeys {
18+
val startWithPrefixAndNotEndWithSuffix = it.startsWith(routingPrefix) && routingSuffix?.let { suffix -> !it.endsWith(suffix) } ?: true
1619
if (it.contains(StreamsRoutingConfigurationConstants.FROM)) {
1720
val topicDbName = it.replace(routingPrefix, StringUtils.EMPTY)
1821
.split(StreamsRoutingConfigurationConstants.FROM)[1]
19-
it.startsWith(routingPrefix) && topicDbName == dbName // for `from.<db>` we compare the routing prefix and the db name
22+
startWithPrefixAndNotEndWithSuffix && topicDbName == dbName // for `from.<db>` we compare the routing prefix and the db name
2023
} else {
21-
dbName == "" && it.startsWith(routingPrefix) // for the default db we only filter by routingPrefix
24+
// for the default db we only filter by routingPrefix
25+
dbName == "" && startWithPrefixAndNotEndWithSuffix
2226
}
2327
}
2428
.flatMap {
25-
val topic = it.key.replace(routingPrefix, StringUtils.EMPTY)
26-
.split(StreamsRoutingConfigurationConstants.FROM)[0]
29+
val prefixAndTopic = it.key.split(StreamsRoutingConfigurationConstants.FROM)[0]
30+
31+
val keyStrategy = routingSuffix?.let { suffix ->
32+
print("suffix - $suffix")
33+
config.entries.firstOrNull{ it.key.startsWith(prefixAndTopic) && it.key.endsWith(suffix) }?.value
34+
} ?: RelKeyStrategy.DEFAULT.toString().toLowerCase()
35+
2736
RoutingConfigurationFactory
28-
.getRoutingConfiguration(topic, it.value, entityType) as List<T>
37+
.getRoutingConfiguration(prefixAndTopic.replace(routingPrefix, StringUtils.EMPTY),
38+
it.value, entityType, keyStrategy, log) as List<T>
2939
}
3040
}
3141

@@ -34,6 +44,7 @@ private object StreamsRoutingConfigurationConstants {
3444
const val REL_ROUTING_KEY_PREFIX: String = "streams.source.topic.relationships."
3545
const val SCHEMA_POLLING_INTERVAL = "streams.source.schema.polling.interval"
3646
const val FROM = ".from."
47+
const val KEY_STRATEGY_SUFFIX = ".key_strategy"
3748
}
3849

3950
data class StreamsEventRouterConfiguration(val enabled: Boolean = StreamsConfig.SOURCE_ENABLED_VALUE,
@@ -50,19 +61,23 @@ data class StreamsEventRouterConfiguration(val enabled: Boolean = StreamsConfig.
5061

5162
companion object {
5263

53-
fun from(streamsConfig: Map<String, String>, dbName: String, isDefaultDb: Boolean): StreamsEventRouterConfiguration {
64+
fun from(streamsConfig: Map<String, String>, dbName: String, isDefaultDb: Boolean, log: Log? = null): StreamsEventRouterConfiguration {
5465
var nodeRouting = filterMap<NodeRoutingConfiguration>(config = streamsConfig,
5566
routingPrefix = StreamsRoutingConfigurationConstants.NODE_ROUTING_KEY_PREFIX,
5667
dbName = dbName)
5768
var relRouting = filterMap<RelationshipRoutingConfiguration>(config = streamsConfig,
5869
routingPrefix = StreamsRoutingConfigurationConstants.REL_ROUTING_KEY_PREFIX,
59-
dbName = dbName)
70+
dbName = dbName,
71+
routingSuffix = StreamsRoutingConfigurationConstants.KEY_STRATEGY_SUFFIX,
72+
log = log)
6073

6174
if (isDefaultDb) {
62-
nodeRouting += filterMap<NodeRoutingConfiguration>(config = streamsConfig,
75+
nodeRouting += filterMap(config = streamsConfig,
6376
routingPrefix = StreamsRoutingConfigurationConstants.NODE_ROUTING_KEY_PREFIX)
64-
relRouting += filterMap<RelationshipRoutingConfiguration>(config = streamsConfig,
65-
routingPrefix = StreamsRoutingConfigurationConstants.REL_ROUTING_KEY_PREFIX)
77+
relRouting += filterMap(config = streamsConfig,
78+
routingPrefix = StreamsRoutingConfigurationConstants.REL_ROUTING_KEY_PREFIX,
79+
routingSuffix = StreamsRoutingConfigurationConstants.KEY_STRATEGY_SUFFIX,
80+
log = log)
6681
}
6782

6883
val default = StreamsEventRouterConfiguration()

producer/src/main/kotlin/streams/StreamsRouterConfigurationListener.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI,
5454
// i.e. the Schema Registry
5555
val config = KafkaConfiguration.create(configMap).excludeSinkProps()
5656
val lastConfig = lastConfig?.excludeSinkProps()
57-
val streamsConfig = StreamsEventRouterConfiguration.from(configMap, db.databaseName(), isDefaultDb = db.isDefaultDb())
57+
val streamsConfig = StreamsEventRouterConfiguration.from(configMap, db.databaseName(), isDefaultDb = db.isDefaultDb(), log)
5858
config != lastConfig || streamsConfig != streamsEventRouterConfiguration
5959
}
6060
else -> true
@@ -80,7 +80,7 @@ class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI,
8080

8181
private fun start(configMap: Map<String, String>) {
8282
lastConfig = KafkaConfiguration.create(configMap)
83-
streamsEventRouterConfiguration = StreamsEventRouterConfiguration.from(configMap, db.databaseName(), isDefaultDb = db.isDefaultDb())
83+
streamsEventRouterConfiguration = StreamsEventRouterConfiguration.from(configMap, db.databaseName(), isDefaultDb = db.isDefaultDb(), log)
8484
streamsEventRouter = StreamsEventRouterFactory.getStreamsEventRouter(configMap, db, log)
8585
streamsConstraintsService = StreamsConstraintsService(db, streamsEventRouterConfiguration!!.schemaPollingInterval)
8686
if (streamsEventRouterConfiguration?.enabled == true || streamsEventRouterConfiguration?.proceduresEnabled == true) {

0 commit comments

Comments
 (0)