Skip to content

Commit ff04c71

Browse files
authored
Fixes #466: Problem of streams.publish procedure (#469)
1 parent 1488e8f commit ff04c71

File tree

2 files changed

+50
-34
lines changed

2 files changed

+50
-34
lines changed

producer/src/main/kotlin/streams/Extensions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ fun Node.toMap(): Map<String, Any?> {
1515
}
1616

1717
fun Relationship.toMap(): Map<String, Any?> {
18-
return mapOf("id" to id.toString(), "properties" to allProperties, "label" to type,
18+
return mapOf("id" to id.toString(), "properties" to allProperties, "label" to type.name(),
1919
"start" to startNode.toMap(),
2020
"end" to endNode.toMap(),
2121
"type" to EntityType.relationship)

producer/src/test/kotlin/streams/integrations/KafkaEventRouterProcedureTSE.kt

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.apache.kafka.clients.admin.AdminClient
44
import org.apache.kafka.clients.admin.NewTopic
55
import org.junit.Test
66
import org.neo4j.graphdb.QueryExecutionException
7+
import org.neo4j.graphdb.Result
78
import streams.events.StreamsEvent
89
import streams.extensions.execute
910
import streams.utils.JSONUtils
@@ -121,14 +122,7 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
121122
"CALL streams.publish.sync('neo4j', n) \n" +
122123
"YIELD value \n" +
123124
"RETURN value") {
124-
assertTrue { it.hasNext() }
125-
val resultMap = (it.next())["value"] as Map<String, Any>
126-
assertNotNull(resultMap["offset"])
127-
assertNotNull(resultMap["partition"])
128-
assertNotNull(resultMap["keySize"])
129-
assertNotNull(resultMap["valueSize"])
130-
assertNotNull(resultMap["timestamp"])
131-
assertFalse { it.hasNext() }
125+
assertSyncResult(it)
132126
}
133127

134128
val records = kafkaConsumer.poll(5000)
@@ -143,14 +137,7 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
143137
setUpProcedureTests()
144138
val message = "Hello World"
145139
db.execute("CALL streams.publish.sync('neo4j', '$message')") {
146-
assertTrue { it.hasNext() }
147-
val resultMap = (it.next())["value"] as Map<String, Any>
148-
assertNotNull(resultMap["offset"])
149-
assertNotNull(resultMap["partition"])
150-
assertNotNull(resultMap["keySize"])
151-
assertNotNull(resultMap["valueSize"])
152-
assertNotNull(resultMap["timestamp"])
153-
assertFalse { it.hasNext() }
140+
assertSyncResult(it)
154141
}
155142

156143
val records = kafkaConsumer.poll(5000)
@@ -160,6 +147,38 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
160147
}}
161148
}
162149

150+
151+
@Test
152+
fun testProcedureWithRelationship() {
153+
setUpProcedureTests()
154+
db.execute("CREATE (:Foo {one: 'two'})-[:KNOWS {alpha: 'beta'}]->(:Bar {three: 'four'})")
155+
val recordsCreation = kafkaConsumer.poll(5000)
156+
assertEquals(3, recordsCreation.count())
157+
158+
db.execute("""
159+
MATCH (:Foo)-[r:KNOWS]->(:Bar)
160+
|CALL streams.publish.sync('neo4j', r)
161+
|YIELD value RETURN value""".trimMargin()) {
162+
assertSyncResult(it)
163+
}
164+
val records = kafkaConsumer.poll(5000)
165+
assertEquals(1, records.count())
166+
167+
val payload = JSONUtils.readValue<StreamsEvent>(records.first().value()).payload as Map<String, Any>
168+
assertTrue(payload["id"] is String)
169+
assertEquals(mapOf("alpha" to "beta"), payload["properties"])
170+
assertEquals("KNOWS", payload["label"])
171+
assertEquals("relationship", payload["type"])
172+
val start = payload["start"] as Map<String, Any>
173+
assertEquals(listOf("Foo"), start["labels"])
174+
assertEquals(mapOf("one" to "two"), start["properties"])
175+
assertEquals("node", start["type"])
176+
val end = payload["end"] as Map<String, Any>
177+
assertEquals(listOf("Bar"), end["labels"])
178+
assertEquals(mapOf("three" to "four"), end["properties"])
179+
assertEquals("node", end["type"])
180+
}
181+
163182
@Test
164183
fun testProcedureSyncWithKeyNull() {
165184
setUpProcedureTests()
@@ -169,15 +188,8 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
169188
assertEquals(1, recordsCreation.count())
170189

171190
val message = "Hello World"
172-
db.execute("MATCH (n:Foo {id: 1}) CALL streams.publish.sync('neo4j', '$message', {key: n.foo}) YIELD value RETURN value") {
173-
assertTrue { it.hasNext() }
174-
val resultMap = (it.next())["value"] as Map<String, Any>
175-
assertNotNull(resultMap["offset"])
176-
assertNotNull(resultMap["partition"])
177-
assertNotNull(resultMap["keySize"])
178-
assertNotNull(resultMap["valueSize"])
179-
assertNotNull(resultMap["timestamp"])
180-
assertFalse { it.hasNext() }
191+
db.execute("MATCH (n:Foo {id: 1}) CALL streams.publish.sync('neo4j', '$message', {key: n.foo}) YIELD value RETURN value") {
192+
assertSyncResult(it)
181193
}
182194

183195
val records = kafkaConsumer.poll(5000)
@@ -204,14 +216,7 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
204216
val keyRecord = "test"
205217
val partitionRecord = 1
206218
db.execute("CALL streams.publish.sync('$topic', '$message', {key: '$keyRecord', partition: $partitionRecord })") {
207-
assertTrue { it.hasNext() }
208-
val resultMap = (it.next())["value"] as Map<String, Any>
209-
assertNotNull(resultMap["offset"])
210-
assertEquals(partitionRecord, resultMap["partition"])
211-
assertNotNull(resultMap["keySize"])
212-
assertNotNull(resultMap["valueSize"])
213-
assertNotNull(resultMap["timestamp"])
214-
assertFalse { it.hasNext() }
219+
assertSyncResult(it)
215220
}
216221

217222
val records = kafkaConsumer.poll(5000)
@@ -280,4 +285,15 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
280285
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
281286
kafkaConsumer.subscribe(listOf("neo4j"))
282287
}
288+
289+
private fun assertSyncResult(it: Result) {
290+
assertTrue { it.hasNext() }
291+
val resultMap = (it.next())["value"] as Map<String, Any>
292+
assertNotNull(resultMap["offset"])
293+
assertNotNull(resultMap["partition"])
294+
assertNotNull(resultMap["keySize"])
295+
assertNotNull(resultMap["valueSize"])
296+
assertNotNull(resultMap["timestamp"])
297+
assertFalse { it.hasNext() }
298+
}
283299
}

0 commit comments

Comments
 (0)