Skip to content

Commit 4aa4bba

Browse files
conker84jexp
authored andcommitted
Multiple Node Topic Patterns Don't Work fixes #110 (#127)
1 parent 7e61c00 commit 4aa4bba

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

producer/src/main/kotlin/streams/RoutingConfiguration.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ data class NodeRoutingConfiguration(val labels: List<String> = emptyList(),
117117
fun prepareEvent(streamsTransactionEvent: StreamsTransactionEvent, routingConf: List<NodeRoutingConfiguration>): Map<String, StreamsTransactionEvent> {
118118
return routingConf
119119
.filter {
120-
it.all || it.labels.any { hasLabel(it, streamsTransactionEvent) }
120+
it.labels.isEmpty() || it.labels.any { hasLabel(it, streamsTransactionEvent) }
121121
}
122122
.map {
123123
val nodePayload = streamsTransactionEvent.payload as NodePayload
@@ -192,7 +192,7 @@ data class RelationshipRoutingConfiguration(val name: String = "",
192192
fun prepareEvent(streamsTransactionEvent: StreamsTransactionEvent, routingConf: List<RelationshipRoutingConfiguration>): Map<String, StreamsTransactionEvent> {
193193
return routingConf
194194
.filter {
195-
it.all || isRelationshipType(it.name, streamsTransactionEvent)
195+
it.name.isNullOrBlank() || isRelationshipType(it.name, streamsTransactionEvent)
196196
}
197197
.map {
198198
val relationshipPayload = streamsTransactionEvent.payload as RelationshipPayload

producer/src/test/kotlin/streams/integrations/KafkaEventRouterIT.kt

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package streams.integrations
22

3+
import kotlinx.coroutines.async
4+
import kotlinx.coroutines.runBlocking
35
import org.apache.kafka.clients.consumer.ConsumerConfig
46
import org.apache.kafka.clients.consumer.KafkaConsumer
57
import org.apache.kafka.common.serialization.ByteArrayDeserializer
@@ -39,6 +41,7 @@ class KafkaEventRouterIT {
3941

4042
private val WITH_REL_ROUTING_METHOD_SUFFIX = "WithRelRouting"
4143
private val WITH_NODE_ROUTING_METHOD_SUFFIX = "WithNodeRouting"
44+
private val MULTI_NODE_PATTERN_TEST: String = "MultiTopicPatternConfig"
4245

4346
@Rule
4447
@JvmField
@@ -55,8 +58,14 @@ class KafkaEventRouterIT {
5558
if (testName.methodName.endsWith(WITH_NODE_ROUTING_METHOD_SUFFIX)) {
5659
graphDatabaseBuilder.setConfig("streams.source.topic.nodes.person", "Person{*}")
5760
}
61+
if (testName.methodName.endsWith(MULTI_NODE_PATTERN_TEST)) {
62+
graphDatabaseBuilder.setConfig("streams.source.topic.nodes.neo4j-product", "Product{name, code}")
63+
.setConfig("streams.source.topic.nodes.neo4j-color", "Color{*}")
64+
.setConfig("streams.source.topic.nodes.neo4j-basket", "Basket{*}")
65+
.setConfig("streams.source.topic.relationships.neo4j-isin", "IS_IN{month,day}")
66+
.setConfig("streams.source.topic.relationships.neo4j-hascolor", "HAS_COLOR{*}")
67+
}
5868
db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
59-
6069
db.dependencyResolver.resolveDependency(Procedures::class.java)
6170
.registerProcedure(StreamsProcedures::class.java, true)
6271

@@ -157,4 +166,33 @@ class KafkaEventRouterIT {
157166
consumer.close()
158167
}
159168

169+
private fun getRecordCount(config: KafkaConfiguration, topic: String): Int {
170+
val consumer = createConsumer(config)
171+
consumer.subscribe(listOf(topic))
172+
val count = consumer.poll(5000).count()
173+
consumer.close()
174+
return count
175+
}
176+
177+
@Test
178+
fun testMultiTopicPatternConfig() = runBlocking {
179+
val config = KafkaConfiguration(bootstrapServers = kafka.bootstrapServers)
180+
db.execute("""
181+
CREATE (p:Product{id: "A1", code: "X1", name: "Name X1", price: 1000})-[:IS_IN{month:4, day:4, year:2018}]->(b:Basket{name:"Basket-A", created: "20181228"}),
182+
(p)-[:HAS_COLOR]->(c:Color{name: "Red"})
183+
""".trimIndent()).close()
184+
185+
val recordsProduct = async { getRecordCount(config, "neo4j-product") }
186+
val recordsColor = async { getRecordCount(config, "neo4j-color") }
187+
val recordsBasket = async { getRecordCount(config, "neo4j-basket") }
188+
val recordsIsIn = async { getRecordCount(config, "neo4j-isin") }
189+
val recordsHasColor = async { getRecordCount(config, "neo4j-hascolor") }
190+
191+
assertEquals(1, recordsProduct.await())
192+
assertEquals(1, recordsColor.await())
193+
assertEquals(1, recordsBasket.await())
194+
assertEquals(1, recordsIsIn.await())
195+
assertEquals(1, recordsHasColor.await())
196+
}
197+
160198
}

0 commit comments

Comments
 (0)