Skip to content

Commit 7cdf9ed

Browse files
authored
Fixes #442: Enable production of all unique constraints on source messages for relationships (#461)
* fixes #442 * changed method name * added documentation * code clean * merged relKeyStrategy in relRoutingConf - various changes review * moved relType enum * removed comments * removed sortedSet - merged tests * changes router tests * changed assertions - simplified tests
1 parent 943e473 commit 7cdf9ed

File tree

17 files changed

+657
-54
lines changed

17 files changed

+657
-54
lines changed

common/src/main/kotlin/streams/events/StreamsEvent.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ data class RelationshipPayload(override val id: String,
4646

4747
enum class StreamsConstraintType { UNIQUE, NODE_PROPERTY_EXISTS, RELATIONSHIP_PROPERTY_EXISTS }
4848

49+
enum class RelKeyStrategy { DEFAULT, ALL }
50+
4951
data class Constraint(val label: String?,
5052
val properties: Set<String>,
5153
val type: StreamsConstraintType)

common/src/main/kotlin/streams/utils/SchemaUtils.kt

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,34 @@
11
package streams.utils
22

33
import streams.events.Constraint
4+
import streams.events.RelKeyStrategy
45
import streams.events.StreamsConstraintType
56
import streams.events.StreamsTransactionEvent
67
import streams.serialization.JSONUtils
78
import streams.service.StreamsSinkEntity
89

910
object SchemaUtils {
10-
fun getNodeKeys(labels: List<String>, propertyKeys: Set<String>, constraints: List<Constraint>): Set<String> =
11+
fun getNodeKeys(labels: List<String>, propertyKeys: Set<String>, constraints: List<Constraint>, keyStrategy: RelKeyStrategy = RelKeyStrategy.DEFAULT): Set<String> =
1112
constraints
1213
.filter { constraint ->
1314
constraint.type == StreamsConstraintType.UNIQUE
1415
&& propertyKeys.containsAll(constraint.properties)
1516
&& labels.contains(constraint.label)
1617
}
17-
// we order first by properties.size, then by label name and finally by properties name alphabetically
18-
// with properties.sorted() we ensure that ("foo", "bar") and ("bar", "foo") are no different
19-
// with toString() we force it.properties to have the natural sort order, that is alphabetically
20-
.minWith((compareBy({ it.properties.size }, { it.label }, { it.properties.sorted().toString() })))
21-
?.properties
22-
.orEmpty()
18+
.let {
19+
when (keyStrategy) {
20+
RelKeyStrategy.DEFAULT -> {
21+
// with 'DEFAULT' we order first by properties.size, then by label name and finally by properties name alphabetically
22+
// with properties.sorted() we ensure that ("foo", "bar") and ("bar", "foo") are no different
23+
// with toString() we force it.properties to have the natural sort order, that is alphabetically
24+
it.minWith((compareBy({ it.properties.size }, { it.label }, { it.properties.sorted().toString() })))
25+
?.properties
26+
.orEmpty()
27+
}
28+
// with 'ALL' strategy we get a set with all properties
29+
RelKeyStrategy.ALL -> it.flatMap { it.properties }.toSet()
30+
}
31+
}
2332

2433
fun toStreamsTransactionEvent(streamsSinkEntity: StreamsSinkEntity,
2534
evaluation: (StreamsTransactionEvent) -> Boolean)

common/src/test/kotlin/streams/utils/SchemaUtilsTest.kt

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package streams.utils
22

33
import org.junit.Test
44
import streams.events.Constraint
5+
import streams.events.RelKeyStrategy
56
import streams.events.StreamsConstraintType
67
import streams.utils.SchemaUtils.getNodeKeys
78
import kotlin.test.assertEquals
@@ -47,6 +48,31 @@ class SchemaUtilsTest {
4748
assertEquals(expectedKeys, actualKeys)
4849
}
4950

51+
@Test
52+
fun `getNodeKeys should return all keys when RelKeyStrategy is ALL`() {
53+
54+
val pair1 = "LabelX" to setOf("foo", "aaa")
55+
val pair2 = "LabelB" to setOf("bar", "foo")
56+
val pair3 = "LabelC" to setOf("baz", "bar")
57+
val pair4 = "LabelB" to setOf("bar", "bez")
58+
val pair5 = "LabelA" to setOf("bar", "baa", "xcv")
59+
val pair6 = "LabelC" to setOf("aaa", "baa", "xcz")
60+
val pair7 = "LabelA" to setOf("foo", "aac")
61+
val pair8 = "LabelA" to setOf("foo", "aab")
62+
val props = listOf(pair1, pair2, pair3, pair4, pair5, pair6, pair7, pair8)
63+
64+
// we shuffle the constraints to ensure that the result doesn't depend from the ordering
65+
val constraints = props.map {
66+
Constraint(label = it.first, properties = it.second, type = StreamsConstraintType.UNIQUE)
67+
}.shuffled()
68+
69+
val propertyKeys = setOf("prop", "prop2", "foo", "bar", "baz", "bez", "aaa", "aab", "baa", "aac", "xcz", "xcv")
70+
val actualKeys = getNodeKeys(props.map { it.first }, propertyKeys, constraints, RelKeyStrategy.ALL)
71+
val expectedKeys = setOf("aaa", "aab", "aac", "baa", "bar", "baz", "bez", "foo", "xcv", "xcz")
72+
73+
assertEquals(expectedKeys, actualKeys)
74+
}
75+
5076
@Test
5177
fun `getNodeKeys should return the key sorted properly (with one label)`() {
5278
// the method getNodeKeys should select the constraint with lowest properties
@@ -70,6 +96,27 @@ class SchemaUtilsTest {
7096
assertEquals(expectedKeys, actualKeys)
7197
}
7298

99+
@Test
100+
fun `getNodeKeys should return all keys when RelKeyStrategy is ALL (with one label)`() {
101+
102+
val pair1 = "LabelA" to setOf("foo", "bar")
103+
val pair2 = "LabelA" to setOf("bar", "foo")
104+
val pair3 = "LabelA" to setOf("baz", "bar")
105+
val pair4 = "LabelA" to setOf("bar", "bez")
106+
val props = listOf(pair1, pair2, pair3, pair4)
107+
108+
// we shuffle the constraints to ensure that the result doesn't depend from the ordering
109+
val constraints = props.map {
110+
Constraint(label = it.first, properties = it.second, type = StreamsConstraintType.UNIQUE)
111+
}.shuffled()
112+
113+
val propertyKeys = setOf("prop", "foo", "bar", "baz", "bez")
114+
val actualKeys = getNodeKeys(listOf("LabelA"), propertyKeys, constraints, RelKeyStrategy.ALL)
115+
val expectedKeys = setOf("bar", "baz", "bez", "foo")
116+
117+
assertEquals(expectedKeys, actualKeys)
118+
}
119+
73120
@Test
74121
fun `getNodeKeys should return empty in case it didn't match anything`() {
75122
val props = mapOf("LabelA" to setOf("foo", "bar"),

doc/asciidoc/producer/configuration.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ kafka.streams.log.compaction.strategy=delete
1717
1818
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
1919
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
20+
streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=<default/all>
2021
streams.source.enabled=<true/false, default=true>
2122
streams.source.schema.polling.interval=<MILLIS, the polling interval for getting the schema information>
2223
----

doc/asciidoc/producer/index.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ include::message-structure.adoc[]
3232

3333
include::patterns.adoc[]
3434

35+
=== Relationship key strategy
36+
37+
include::key-strategy.adoc[]
38+
39+
3540
=== Transaction Event Handler
3641

3742
The transaction event handler is the core of the Stream Producer and allows to stream database changes.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
With `streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=default`,
2+
when I produce a message for relationships, only one of the associated node constraints will be returned,
3+
based on the following rule.
4+
5+
If there are multiple constraints, we sort by the number of the properties associated to the constraint,
6+
then by label name (alphabetically) and finally by properties name (alphabetically).
7+
8+
Finally, we take the properties of the first one.
9+
10+
So, if we have a start node with labels `Person` and `Other`,
11+
and we create 2 constraints like these:
12+
13+
[source, cypher]
14+
----
15+
CREATE CONSTRAINT ON (p:Other) ASSERT p.zxc IS UNIQUE;
16+
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
17+
----
18+
19+
the `start.ids` field produced, will be:
20+
[source,json]
21+
----
22+
{"zxc": "FooBar"}
23+
----
24+
25+
because properties size is the same (that is 1), but first label by name is `Other`.
26+
27+
Otherwise, with:
28+
29+
[source, cypher]
30+
----
31+
CREATE CONSTRAINT ON (p:Other) ASSERT p.zxc IS UNIQUE;
32+
CREATE CONSTRAINT ON (p:Other) ASSERT p.name IS UNIQUE;
33+
----
34+
35+
the `start.ids` field produced, will be:
36+
[source,json]
37+
----
38+
{"name": "Sherlock"}
39+
----
40+
41+
because of `name` property name compared to `zxc`.
42+
43+
44+
Otherwise with `streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=all`,
45+
any property participating in a unique constraint will be produced.
46+
So, with a start node with labels `Person`, `Other`, `Another` and with these constraints:
47+
48+
[source, cypher]
49+
----
50+
CREATE CONSTRAINT ON (p:Another) ASSERT p.zxc IS UNIQUE;
51+
CREATE CONSTRAINT ON (p:Other) ASSERT p.name IS UNIQUE;
52+
CREATE CONSTRAINT ON (p:Person) ASSERT p.surname IS UNIQUE;
53+
----
54+
55+
the `start.ids` field produced, will be:
56+
[source,json]
57+
----
58+
{ "name": "Sherlock", "surname": "Holmes", "zxc": "FooBar"}
59+
----
60+

doc/asciidoc/producer/message-structure.adoc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11

2-
== Message structure
2+
=== Message structure
33

44
The message key structure depends on `kafka.streams.log.compaction.strategy`.
55

@@ -111,4 +111,12 @@ We obtain this key:
111111
[source,json]
112112
----
113113
{"start": "0", "end": "1", "label": "BUYS"}
114-
----
114+
----
115+
116+
[NOTE]
117+
====
118+
In case of relationships with multiple constraints on start or end node,
119+
the `ids` fields depend on `streams.source.topic.relationships.<TOPIC_NAME>.key_strategy` config.
120+
121+
xref:key-strategy.adoc[See 'key-strategy' section to more details]
122+
====

producer/src/main/kotlin/streams/RoutingConfiguration.kt

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.apache.kafka.common.internals.Topic
44
import org.neo4j.graphdb.Entity
55
import org.neo4j.graphdb.Node
66
import org.neo4j.graphdb.Relationship
7+
import org.neo4j.logging.Log
78
import streams.events.*
89

910

@@ -152,6 +153,7 @@ data class NodeRoutingConfiguration(val labels: List<String> = emptyList(),
152153
}
153154

154155
data class RelationshipRoutingConfiguration(val name: String = "",
156+
val relKeyStrategy: RelKeyStrategy = RelKeyStrategy.DEFAULT,
155157
override val topic: String = "neo4j",
156158
override val all: Boolean = true,
157159
override val include: List<String> = emptyList(),
@@ -168,7 +170,7 @@ data class RelationshipRoutingConfiguration(val name: String = "",
168170
}
169171

170172
companion object {
171-
fun parse(topic: String, pattern: String): List<RelationshipRoutingConfiguration> {
173+
fun parse(topic: String, pattern: String, keyStrategyString: String = RelKeyStrategy.DEFAULT.toString(), log: Log? = null): List<RelationshipRoutingConfiguration> {
172174
Topic.validate(topic)
173175
if (pattern == PATTERN_WILDCARD) {
174176
return listOf(RelationshipRoutingConfiguration(topic = topic))
@@ -183,8 +185,16 @@ data class RelationshipRoutingConfiguration(val name: String = "",
183185
throw IllegalArgumentException("The pattern $pattern for topic $topic is invalid")
184186
}
185187
val properties = RoutingProperties.from(matcher)
188+
189+
val keyStrategy = try {
190+
RelKeyStrategy.valueOf(keyStrategyString.toUpperCase())
191+
} catch (e: IllegalArgumentException) {
192+
log?.warn("Invalid key strategy setting, switching to default value ${RelKeyStrategy.DEFAULT.toString().toLowerCase()}")
193+
RelKeyStrategy.DEFAULT
194+
}
195+
186196
RelationshipRoutingConfiguration(name = labels.first(), topic = topic, all = properties.all,
187-
include = properties.include, exclude = properties.exclude)
197+
include = properties.include, exclude = properties.exclude, relKeyStrategy= keyStrategy)
188198
}
189199
}
190200
}
@@ -226,11 +236,12 @@ data class RelationshipRoutingConfiguration(val name: String = "",
226236
}
227237
}
228238

239+
229240
object RoutingConfigurationFactory {
230-
fun getRoutingConfiguration(topic: String, line: String, entityType: EntityType): List<RoutingConfiguration> {
241+
fun getRoutingConfiguration(topic: String, line: String, entityType: EntityType, keyStrategy: String = RelKeyStrategy.DEFAULT.toString(), log: Log? = null): List<RoutingConfiguration> {
231242
return when (entityType) {
232243
EntityType.node -> NodeRoutingConfiguration.parse(topic, line)
233-
EntityType.relationship -> RelationshipRoutingConfiguration.parse(topic, line)
244+
EntityType.relationship -> RelationshipRoutingConfiguration.parse(topic, line, keyStrategy, log)
234245
}
235246
}
236247
}

producer/src/main/kotlin/streams/StreamsEventRouterConfiguration.kt

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
package streams
22

33
import org.apache.commons.lang3.StringUtils
4+
import org.neo4j.logging.Log
45
import streams.events.EntityType
6+
import streams.events.RelKeyStrategy
57

68

7-
private inline fun <reified T> filterMap(config: Map<String, String>, routingPrefix: String): List<T> {
9+
private inline fun <reified T> filterMap(config: Map<String, String>, routingPrefix: String, routingSuffix: String? = null, log: Log? = null): List<T> {
810
val entityType = when (T::class) {
911
NodeRoutingConfiguration::class -> EntityType.node
1012
RelationshipRoutingConfiguration::class -> EntityType.relationship
1113
else -> throw IllegalArgumentException("The class must be an instance of RoutingConfiguration")
1214
}
1315
return config
14-
.filterKeys { it.startsWith(routingPrefix) }
15-
.flatMap { RoutingConfigurationFactory
16-
.getRoutingConfiguration(it.key.replace(routingPrefix, StringUtils.EMPTY) , it.value, entityType) as List<T>
16+
.filterKeys { it.startsWith(routingPrefix) && routingSuffix?.let { suffix -> !it.endsWith(suffix) } ?: true }
17+
.flatMap {
18+
val entryKey = it.key
19+
20+
val keyStrategy = routingSuffix?.let { suffix ->
21+
config.entries.firstOrNull { it.key.startsWith(entryKey) && it.key.endsWith(suffix) }?.value
22+
} ?: RelKeyStrategy.DEFAULT.toString().toLowerCase()
23+
24+
RoutingConfigurationFactory.getRoutingConfiguration(entryKey.replace(routingPrefix, StringUtils.EMPTY), it.value, entityType, keyStrategy, log) as List<T>
1725
}
1826
}
1927

@@ -23,6 +31,7 @@ private object StreamsRoutingConfigurationConstants {
2331
const val ENABLED = "streams.source.enabled"
2432
const val SCHEMA_POLLING_INTERVAL = "streams.source.schema.polling.interval"
2533
const val PROCEDURES_ENABLED = "streams.procedures.enabled"
34+
const val KEY_STRATEGY_SUFFIX = ".key_strategy"
2635
}
2736

2837
data class StreamsEventRouterConfiguration(val enabled: Boolean = true,
@@ -38,12 +47,15 @@ data class StreamsEventRouterConfiguration(val enabled: Boolean = true,
3847
}
3948

4049
companion object {
41-
fun from(config: Map<String, String>): StreamsEventRouterConfiguration {
50+
fun from(config: Map<String, String>, log: Log? = null): StreamsEventRouterConfiguration {
4251
val nodeRouting = filterMap<NodeRoutingConfiguration>(config = config,
4352
routingPrefix = StreamsRoutingConfigurationConstants.NODE_ROUTING_KEY_PREFIX)
4453

4554
val relRouting = filterMap<RelationshipRoutingConfiguration>(config = config,
46-
routingPrefix = StreamsRoutingConfigurationConstants.REL_ROUTING_KEY_PREFIX)
55+
routingPrefix = StreamsRoutingConfigurationConstants.REL_ROUTING_KEY_PREFIX,
56+
routingSuffix = StreamsRoutingConfigurationConstants.KEY_STRATEGY_SUFFIX,
57+
log = log
58+
)
4759

4860
val default = StreamsEventRouterConfiguration()
4961
return default.copy(enabled = config.getOrDefault(StreamsRoutingConfigurationConstants.ENABLED, default.enabled).toString().toBoolean(),

producer/src/main/kotlin/streams/StreamsRouterConfigurationListener.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI,
5353
// i.e. the Schema Registry
5454
val config = KafkaConfiguration.create(configMap).excludeSinkProps()
5555
val lastConfig = lastConfig?.excludeSinkProps()
56-
val streamsConfig = StreamsEventRouterConfiguration.from(configMap)
56+
val streamsConfig = StreamsEventRouterConfiguration.from(configMap, log)
5757
config != lastConfig || streamsConfig != streamsEventRouterConfiguration
5858
}
5959
else -> true
@@ -79,7 +79,7 @@ class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI,
7979

8080
private fun start(configMap: Map<String, String>) {
8181
lastConfig = KafkaConfiguration.create(configMap)
82-
streamsEventRouterConfiguration = StreamsEventRouterConfiguration.from(configMap)
82+
streamsEventRouterConfiguration = StreamsEventRouterConfiguration.from(configMap, log)
8383
streamsEventRouter = StreamsEventRouterFactory.getStreamsEventRouter(configMap, log)
8484
streamsConstraintsService = StreamsConstraintsService(db, streamsEventRouterConfiguration!!.schemaPollingInterval)
8585
if (streamsEventRouterConfiguration?.enabled == true || streamsEventRouterConfiguration?.proceduresEnabled == true) {

0 commit comments

Comments
 (0)