Skip to content

Commit 3fa6faf

Browse files
conker84jexp
authored andcommitted
fixes #104 (#105)
1 parent e3f5338 commit 3fa6faf

File tree

3 files changed

+86
-10
lines changed

3 files changed

+86
-10
lines changed

producer/src/main/kotlin/streams/StreamsEventRouterConfiguration.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11
package streams
22

3+
import org.apache.commons.lang3.StringUtils
34
import streams.events.EntityType
45

56

6-
private fun <T> filterMap(config: Map<String, String>, routingPrefix: String): List<T> {
7+
private inline fun <reified T> filterMap(config: Map<String, String>, routingPrefix: String): List<T> {
8+
val entityType = when (T::class) {
9+
NodeRoutingConfiguration::class -> EntityType.node
10+
RelationshipRoutingConfiguration::class -> EntityType.relationship
11+
else -> throw IllegalArgumentException("The class must be an instance of RoutingConfiguration")
12+
}
713
return config
814
.filterKeys { it.startsWith(routingPrefix) }
9-
.flatMap { RoutingConfigurationFactory.getRoutingConfiguration(it.key.replace(routingPrefix, "") , it.value, EntityType.node) as List<T> }
15+
.flatMap { RoutingConfigurationFactory
16+
.getRoutingConfiguration(it.key.replace(routingPrefix, StringUtils.EMPTY) , it.value, entityType) as List<T>
17+
}
1018
}
1119

1220
private object StreamsRoutingConfigurationConstants {

producer/src/test/kotlin/streams/RoutingConfigurationTest.kt

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,24 @@ import kotlin.test.assertTrue
88

99
class RoutingConfigurationTest {
1010

11-
@Test(expected = IllegalArgumentException::class)
11+
@Test
1212
fun badPatternShouldThrowIllegalArgumentException() {
13-
RoutingConfigurationFactory.getRoutingConfiguration("topic1", "Label(1,2)", EntityType.node)
13+
val topic = "topic1"
14+
assertIllegalArgumentException(topic, "Label(1,2)", EntityType.node)
15+
assertIllegalArgumentException(topic, "Label{}", EntityType.node)
16+
assertIllegalArgumentException(topic, "KNOWS{}", EntityType.relationship)
17+
}
18+
19+
private fun assertIllegalArgumentException(topic: String, pattern: String, entityType: EntityType) {
20+
var hasException = false
21+
try {
22+
RoutingConfigurationFactory.getRoutingConfiguration(topic, pattern, entityType)
23+
} catch (e: Exception) {
24+
assertTrue { e is IllegalArgumentException }
25+
assertEquals("The pattern $pattern for topic $topic is invalid", e.message)
26+
hasException = true
27+
}
28+
assertTrue { hasException }
1429
}
1530

1631
@Test

producer/src/test/kotlin/streams/integrations/KafkaEventRouterIT.kt

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
44
import org.apache.kafka.clients.consumer.KafkaConsumer
55
import org.apache.kafka.common.serialization.ByteArrayDeserializer
66
import org.apache.kafka.common.serialization.StringDeserializer
7-
import org.junit.After
8-
import org.junit.Before
9-
import org.junit.ClassRule
10-
import org.junit.Test
7+
import org.junit.*
8+
import org.junit.rules.TestName
119
import org.neo4j.kernel.impl.proc.Procedures
1210
import org.neo4j.kernel.internal.GraphDatabaseAPI
1311
import org.neo4j.test.TestGraphDatabaseFactory
@@ -39,11 +37,26 @@ class KafkaEventRouterIT {
3937

4038
lateinit var db: GraphDatabaseAPI
4139

40+
private val WITH_REL_ROUTING_METHOD_SUFFIX = "WithRelRouting"
41+
private val WITH_NODE_ROUTING_METHOD_SUFFIX = "WithNodeRouting"
42+
43+
@Rule
44+
@JvmField
45+
var testName = TestName()
46+
4247
@Before
4348
fun setUp() {
44-
db = TestGraphDatabaseFactory().newImpermanentDatabaseBuilder()
49+
var graphDatabaseBuilder = TestGraphDatabaseFactory()
50+
.newImpermanentDatabaseBuilder()
4551
.setConfig("kafka.bootstrap.servers", kafka.bootstrapServers)
46-
.newGraphDatabase() as GraphDatabaseAPI
52+
if (testName.methodName.endsWith(WITH_REL_ROUTING_METHOD_SUFFIX)) {
53+
graphDatabaseBuilder.setConfig("streams.source.topic.relationships.knows", "KNOWS{*}")
54+
}
55+
if (testName.methodName.endsWith(WITH_NODE_ROUTING_METHOD_SUFFIX)) {
56+
graphDatabaseBuilder.setConfig("streams.source.topic.nodes.person", "Person{*}")
57+
}
58+
db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
59+
4760
db.dependencyResolver.resolveDependency(Procedures::class.java)
4861
.registerProcedure(StreamsProcedures::class.java, true)
4962

@@ -76,6 +89,46 @@ class KafkaEventRouterIT {
7689
consumer.close()
7790
}
7891

92+
@Test
93+
fun testCreateRelationshipWithRelRouting() {
94+
val config = KafkaConfiguration(bootstrapServers = kafka.bootstrapServers)
95+
val consumer = createConsumer(config)
96+
consumer.subscribe(listOf("knows"))
97+
db.execute("CREATE (:Person {name:'Andrea'})-[:KNOWS{since: 2014}]->(:Person {name:'Michael'})").close()
98+
val records = consumer.poll(5000)
99+
assertEquals(1, records.count())
100+
assertEquals(true, records.all {
101+
JSONUtils.readValue(it.value(), Map::class.java).let {
102+
var payload = it["payload"] as Map<String, Any?>
103+
val after = payload["after"] as Map<String, Any?>
104+
val properties = after["properties"] as Map<String, Any?>
105+
payload["type"] == "relationship" && payload["label"] == "KNOWS" && properties["since"] == 2014
106+
}
107+
})
108+
consumer.close()
109+
}
110+
111+
@Test
112+
fun testCreateNodeWithNodeRouting() {
113+
val config = KafkaConfiguration(bootstrapServers = kafka.bootstrapServers)
114+
val consumer = createConsumer(config)
115+
consumer.subscribe(listOf("person"))
116+
db.execute("CREATE (:Person {name:'Andrea'})").close()
117+
val records = consumer.poll(5000)
118+
assertEquals(1, records.count())
119+
assertEquals(true, records.all {
120+
JSONUtils.readValue(it.value(), Map::class.java).let {
121+
var payload = it["payload"] as Map<String, Any?>
122+
val after = payload["after"] as Map<String, Any?>
123+
val labels = after["labels"] as List<String>
124+
val propertiesAfter = after["properties"] as Map<String, Any?>
125+
val meta = it["meta"] as Map<String, Any?>
126+
labels == listOf("Person") && propertiesAfter == mapOf("name" to "Andrea") && meta["operation"] == "created"
127+
}
128+
})
129+
consumer.close()
130+
}
131+
79132
private fun createConsumer(config: KafkaConfiguration): KafkaConsumer<String, ByteArray> {
80133
val props = config.asProperties()
81134
props["group.id"] = "neo4j"

0 commit comments

Comments
 (0)