Skip to content

Commit 21142e7

Browse files
authored
Fixes #382: Unable to add a topic for a label with a specific pattern (#418)
* fixes #382: Unable to add a topic for a label with a specific pattern * added test cases in RoutingConfigurationTest * added more test cases to cover the patterns * changes tests * reset config db in test
1 parent a4063ec commit 21142e7

File tree

3 files changed

+150
-5
lines changed

3 files changed

+150
-5
lines changed

producer/src/main/kotlin/streams/RoutingConfiguration.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package streams
22

3+
import org.apache.commons.lang3.StringUtils
34
import org.apache.kafka.common.internals.Topic
45
import org.neo4j.graphdb.Entity
56
import org.neo4j.graphdb.Node
67
import org.neo4j.graphdb.Relationship
78
import streams.events.*
89

910

10-
private val PATTERN_REG: Regex = "^(\\w+\\s*(?::\\s*(?:[\\w|\\*]+)\\s*)*)\\s*(?:\\{\\s*(-?[\\w|\\*]+\\s*(?:,\\s*-?[\\w|\\*]+\\s*)*)\\})?\$".toRegex()
11-
private val PATTERN_COLON_REG = "\\s*:\\s*".toRegex()
11+
private val PATTERN_REG: Regex = "^(\\s*\\:*\\s*\\`*\\s*\\w+\\s*(?:\\:*\\s*\\`*\\s*\\:?(?:[\\w\\`|\\*]+)\\s*)*\\`*\\:?)\\s*(?:\\{\\s*(-?[\\w|\\*]+\\s*(?:,\\s*-?[\\w|\\*]+\\s*)*)\\})?\$".toRegex()
12+
private val PATTERN_COLON_REG = "\\s*:\\s*(?=(?:[^\\`]*\\`[^\\`]*\\`)*[^\\`]*\$)".toRegex()
1213
private val PATTERN_COMMA = "\\s*,\\s*".toRegex()
1314
private const val PATTERN_WILDCARD = "*"
1415
private const val PATTERN_PROP_MINUS = '-'
1516
private const val PATTERN_SPLIT = ";"
17+
private const val BACKTICK_CHAR = "`"
1618

1719
data class RoutingProperties(val all: Boolean,
1820
val include: List<String>,
@@ -106,7 +108,7 @@ data class NodeRoutingConfiguration(val labels: List<String> = emptyList(),
106108
if (matcher == null) {
107109
throw IllegalArgumentException("The pattern $pattern for topic $topic is invalid")
108110
} else {
109-
val labels = matcher.groupValues[1].split(PATTERN_COLON_REG)
111+
val labels = matcher.groupValues[1].trim().split(PATTERN_COLON_REG).map { it.replace(BACKTICK_CHAR, StringUtils.EMPTY) }.filter{ it.isNotBlank() }
110112
val properties = RoutingProperties.from(matcher)
111113
NodeRoutingConfiguration(labels = labels, topic = topic, all = properties.all,
112114
include = properties.include, exclude = properties.exclude)
@@ -183,7 +185,8 @@ data class RelationshipRoutingConfiguration(val name: String = "",
183185
throw IllegalArgumentException("The pattern $pattern for topic $topic is invalid")
184186
}
185187
val properties = RoutingProperties.from(matcher)
186-
RelationshipRoutingConfiguration(name = labels.first(), topic = topic, all = properties.all,
188+
RelationshipRoutingConfiguration(name = labels.first().trim().replace(BACKTICK_CHAR, StringUtils.EMPTY),
189+
topic = topic, all = properties.all,
187190
include = properties.include, exclude = properties.exclude)
188191
}
189192
}

producer/src/test/kotlin/streams/RoutingConfigurationTest.kt

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,46 @@ class RoutingConfigurationTest {
8383
assertEquals(listOf("Label4"), routing[0].labels)
8484
assertTrue { routing[0].exclude.isEmpty() }
8585
assertEquals(listOf("p1","p2","p3","p4"), routing[0].include)
86+
87+
routing = RoutingConfigurationFactory.getRoutingConfiguration("topic7", "Label:`labels::label`{ p1,p2, p3, p4}", EntityType.node) as List<NodeRoutingConfiguration>
88+
assertEquals(1, routing.size)
89+
assertEquals("topic7", routing[0].topic)
90+
assertFalse { routing[0].all }
91+
assertEquals(listOf("Label", "labels::label"), routing[0].labels)
92+
assertTrue { routing[0].exclude.isEmpty() }
93+
assertEquals(listOf("p1","p2","p3","p4"), routing[0].include)
94+
95+
routing = RoutingConfigurationFactory.getRoutingConfiguration("topic8", " Label : ` lorem : ipsum : dolor : sit `{name, surname}", EntityType.node) as List<NodeRoutingConfiguration>
96+
assertEquals(1, routing.size)
97+
assertEquals("topic8", routing[0].topic)
98+
assertFalse { routing[0].all }
99+
assertEquals(listOf("Label", " lorem : ipsum : dolor : sit "), routing[0].labels)
100+
assertTrue { routing[0].exclude.isEmpty() }
101+
assertEquals(listOf("name","surname"), routing[0].include)
102+
103+
routing = RoutingConfigurationFactory.getRoutingConfiguration("topic9", " `labels::label`:Label:Label1{name, surname}", EntityType.node) as List<NodeRoutingConfiguration>
104+
assertEquals(1, routing.size)
105+
assertEquals("topic9", routing[0].topic)
106+
assertFalse { routing[0].all }
107+
assertEquals(listOf("labels::label", "Label", "Label1"), routing[0].labels)
108+
assertTrue { routing[0].exclude.isEmpty() }
109+
assertEquals(listOf("name","surname"), routing[0].include)
110+
111+
routing = RoutingConfigurationFactory.getRoutingConfiguration("topic10", ":Label:```labels::label```:Label1{one, two}", EntityType.node) as List<NodeRoutingConfiguration>
112+
assertEquals(1, routing.size)
113+
assertEquals("topic10", routing[0].topic)
114+
assertFalse { routing[0].all }
115+
assertEquals(listOf("Label", "labels::label", "Label1"), routing[0].labels)
116+
assertTrue { routing[0].exclude.isEmpty() }
117+
assertEquals(listOf("one","two"), routing[0].include)
118+
119+
routing = RoutingConfigurationFactory.getRoutingConfiguration("topic11", ":Label:`labels::label`:`labels1::label1`:Label1{name, surname}", EntityType.node) as List<NodeRoutingConfiguration>
120+
assertEquals(1, routing.size)
121+
assertEquals("topic11", routing[0].topic)
122+
assertFalse { routing[0].all }
123+
assertEquals(listOf("Label", "labels::label", "labels1::label1", "Label1"), routing[0].labels)
124+
assertTrue { routing[0].exclude.isEmpty() }
125+
assertEquals(listOf("name","surname"), routing[0].include)
86126
}
87127

88128
@Test
@@ -134,6 +174,14 @@ class RoutingConfigurationTest {
134174
assertEquals("LOVES",routing[0].name)
135175
assertTrue { routing[0].include.isEmpty() }
136176
assertEquals(listOf("p1","p2"),routing[0].exclude)
177+
178+
routing = RoutingConfigurationFactory.getRoutingConfiguration("topic6", "`KNOWS::VERY:WELL`{one, -two }", EntityType.relationship) as List<RelationshipRoutingConfiguration>
179+
assertEquals(1, routing.size)
180+
assertEquals("topic6", routing[0].topic)
181+
assertFalse { routing[0].all }
182+
assertEquals("KNOWS::VERY:WELL",routing[0].name)
183+
assertEquals(listOf("one"),routing[0].include)
184+
assertEquals(listOf("two"),routing[0].exclude)
137185
}
138186

139187
@Test(expected = IllegalArgumentException::class)
@@ -147,7 +195,7 @@ class RoutingConfigurationTest {
147195
// Given
148196
val payload = NodePayloadBuilder()
149197
.withBefore(NodeChange(properties = mapOf("prop1" to 1, "prop2" to "pippo", "prop3" to 3), labels = listOf("Label1", "Label2")))
150-
.withAfter(NodeChange(properties = mapOf("prop1" to 1, "prop2" to "pippo", "prop3" to 3, "prop4" to 4), labels = listOf("Label1", "Label2")))
198+
.withAfter(NodeChange(properties = mapOf("prop1" to 1, "prop2" to "pippo", "prop3" to 3, "prop4" to 4), labels = listOf("Label1", "Label2", "Label3 :: Label4")))
151199
.build()
152200
val streamsEvent = StreamsTransactionEventBuilder()
153201
.withMeta(StreamsEventMetaBuilder()

producer/src/test/kotlin/streams/integrations/KafkaEventRouterSimpleTSE.kt

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ import streams.KafkaTestUtils
1717
import streams.utils.JSONUtils
1818
import streams.setConfig
1919
import streams.start
20+
import java.util.*
2021
import java.util.concurrent.TimeUnit
2122
import kotlin.test.assertEquals
23+
import kotlin.test.assertTrue
2224

2325
class KafkaEventRouterSimpleTSE: KafkaEventRouterBaseTSE() {
2426

@@ -167,4 +169,96 @@ class KafkaEventRouterSimpleTSE: KafkaEventRouterBaseTSE() {
167169
count > 0
168170
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
169171
}
172+
173+
@Test
174+
fun testIssue382() {
175+
val topic = UUID.randomUUID().toString()
176+
db.setConfig("streams.source.topic.nodes.$topic", "Label:`labels::label`{*}").start()
177+
kafkaConsumer.subscribe(listOf(topic))
178+
db.execute("CREATE (:Label:`labels::label` {name:'John Doe', age:42})")
179+
val records = kafkaConsumer.poll(5000)
180+
assertEquals(1, records.count())
181+
182+
assertTrue { records.all {
183+
JSONUtils.asStreamsTransactionEvent(it.value()).let {
184+
val after = it.payload.after as NodeChange
185+
val labels = after.labels
186+
val propertiesAfter = after.properties
187+
labels == listOf("Label", "labels::label") && propertiesAfter == mapOf("name" to "John Doe", "age" to 42)
188+
&& it.meta.operation == OperationType.created
189+
&& it.schema.properties == mapOf("name" to "String", "age" to "Long")
190+
&& it.schema.constraints.isEmpty()
191+
}
192+
}}
193+
}
194+
195+
@Test
196+
fun testCreateNodeWithMultiplePatternAndWithMultipeTwoPointsAndWhiteSpaces() {
197+
val topic = UUID.randomUUID().toString()
198+
db.setConfig("streams.source.topic.nodes.$topic", "Label : ` lorem : ipsum : dolor : sit `{name, surname}").start()
199+
kafkaConsumer.subscribe(listOf(topic))
200+
db.execute("CREATE (:Label:` lorem : ipsum : dolor : sit ` {name:'John', surname:'Doe', age: 42})")
201+
val records = kafkaConsumer.poll(5000)
202+
assertEquals(1, records.count())
203+
204+
assertTrue { records.all {
205+
JSONUtils.asStreamsTransactionEvent(it.value()).let {
206+
val after = it.payload.after as NodeChange
207+
val labels = after.labels
208+
val propertiesAfter = after.properties
209+
labels == listOf("Label", " lorem : ipsum : dolor : sit ") && propertiesAfter == mapOf("name" to "John", "surname" to "Doe")
210+
&& it.meta.operation == OperationType.created
211+
&& it.schema.properties == mapOf("name" to "String", "surname" to "String", "age" to "Long")
212+
&& it.schema.constraints.isEmpty()
213+
}
214+
}}
215+
}
216+
217+
@Test
218+
fun testCreateNodeWithSinglePatternAndWithMultipeTwoPoints() {
219+
val topic = UUID.randomUUID().toString()
220+
db.setConfig("streams.source.topic.nodes.$topic", "` lorem:ipsum:dolor:sit `{*}").start()
221+
kafkaConsumer.subscribe(listOf(topic))
222+
db.execute("CREATE (:` lorem:ipsum:dolor:sit ` {name:'John Doe', age:42})")
223+
val records = kafkaConsumer.poll(5000)
224+
assertEquals(1, records.count())
225+
226+
assertTrue { records.all {
227+
JSONUtils.asStreamsTransactionEvent(it.value()).let {
228+
val after = it.payload.after as NodeChange
229+
val labels = after.labels
230+
val propertiesAfter = after.properties
231+
labels == listOf(" lorem:ipsum:dolor:sit ") && propertiesAfter == mapOf("name" to "John Doe", "age" to 42)
232+
&& it.meta.operation == OperationType.created
233+
&& it.schema.properties == mapOf("name" to "String", "age" to "Long")
234+
&& it.schema.constraints.isEmpty()
235+
}
236+
}}
237+
}
238+
239+
@Test
240+
fun testCreateRelWithBacktickPattern() {
241+
val topic = listOf(UUID.randomUUID().toString(), UUID.randomUUID().toString())
242+
db.setConfig("streams.source.topic.nodes.${topic[0]}", " `Note :: Test`{*}")
243+
.setConfig("streams.source.topic.relationships.${topic[1]}", "`KNOWS::VERY:WELL`{*}")
244+
.start()
245+
kafkaConsumer.subscribe(topic)
246+
db.execute("CREATE (:`Note :: Test`{name:'Foo'}), (:`Note :: Test`{name:'Bar'})")
247+
val recordsNodes = kafkaConsumer.poll(5000)
248+
assertEquals(2, recordsNodes.count())
249+
250+
db.execute("MATCH (a:`Note :: Test`{name:'Foo'}), (b:`Note :: Test`{name:'Bar'}) CREATE (a)-[:`KNOWS::VERY:WELL`{since: 2014}]->(b)")
251+
val records = kafkaConsumer.poll(5000)
252+
assertEquals(1, records.count())
253+
assertTrue { records.all {
254+
JSONUtils.asStreamsTransactionEvent(it.value()).let {
255+
val payload = it.payload as RelationshipPayload
256+
val properties = payload.after!!.properties!!
257+
payload.type == EntityType.relationship && payload.label == "KNOWS::VERY:WELL"
258+
&& properties["since"] == 2014
259+
&& it.schema.properties == mapOf("since" to "Long")
260+
&& it.schema.constraints.isEmpty()
261+
}
262+
}}
263+
}
170264
}

0 commit comments

Comments
 (0)