Skip to content

Commit 3b19274

Browse files
authored
Fixes #470: Relationships are not generated using CUD file format as sink ingestion strategy (#471)
1 parent 8e67bce commit 3b19274

File tree

4 files changed

+152
-3
lines changed

4 files changed

+152
-3
lines changed

common/src/main/kotlin/streams/service/sink/strategy/CUDIngestionStrategy.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ class CUDIngestionStrategy: IngestionStrategy {
233233
try {
234234
val data = toCUDEntity<CUDRelationship>(it)
235235
when {
236-
data!!.isValidOperation() -> if (data.from.ids.isNotEmpty() && data.to.ids.isNotEmpty() && data.properties.isNotEmpty()) data else null // TODO send to the DLQ the null
236+
data!!.isValidOperation() -> if (data.from.ids.isNotEmpty() && data.to.ids.isNotEmpty()) data else null // TODO send to the DLQ the null
237237
else -> null // TODO send to the DLQ the null
238238
}
239239
} catch (e: Exception) {
@@ -264,7 +264,7 @@ class CUDIngestionStrategy: IngestionStrategy {
264264
try {
265265
val data = toCUDEntity<CUDRelationship>(it)
266266
when (data?.op) {
267-
CUDOperations.delete -> if (data.from.ids.isNotEmpty() && data.to.ids.isNotEmpty() && data.properties.isEmpty()) data else null // TODO send to the DLQ the null
267+
CUDOperations.delete -> if (data.from.ids.isNotEmpty() && data.to.ids.isNotEmpty()) data else null // TODO send to the DLQ the null
268268
else -> null // TODO send to the DLQ the null
269269
}
270270
} catch (e: Exception) {

common/src/test/kotlin/streams/service/sink/strategy/CUDIngestionStrategyTest.kt

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,54 @@ class CUDIngestionStrategyTest {
512512
assertRelationshipEventsContainsKey(updateRelFooBarLabel, key, key)
513513
}
514514

515+
@Test
516+
fun `should create and delete relationship also without properties field`() {
517+
val key = "key"
518+
val startNode = "SourceNode"
519+
val endNode = "TargetNode"
520+
val relType = "MY_REL"
521+
val start = CUDNodeRel(ids = mapOf(key to 1), labels = listOf(startNode))
522+
val end = CUDNodeRel(ids = mapOf(key to 2), labels = listOf(endNode))
523+
val list = listOf(CUDOperations.create, CUDOperations.delete, CUDOperations.update).map {
524+
val rel = CUDRelationship(op = it, from = start, to = end, rel_type = relType)
525+
StreamsSinkEntity(null, rel)
526+
}
527+
528+
// when
529+
val cudQueryStrategy = CUDIngestionStrategy()
530+
val relationshipEvents = cudQueryStrategy.mergeRelationshipEvents(list)
531+
val relationshipDeleteEvents = cudQueryStrategy.deleteRelationshipEvents(list)
532+
533+
assertEquals(2, relationshipEvents.size)
534+
val createRel = findEventByQuery("""
535+
|${StreamsUtils.UNWIND}
536+
|MATCH (from:$startNode {key: event.from.${CUDIngestionStrategy.ID_KEY}.key})
537+
|${StreamsUtils.WITH_EVENT_FROM}
538+
|MATCH (to:$endNode {key: event.to.${CUDIngestionStrategy.ID_KEY}.key})
539+
|CREATE (from)-[r:$relType]->(to)
540+
|SET r = event.properties
541+
""".trimMargin(), relationshipEvents)
542+
assertEquals(1, createRel.events.size)
543+
val updateRel = findEventByQuery("""
544+
|${StreamsUtils.UNWIND}
545+
|MATCH (from:$startNode {key: event.from.${CUDIngestionStrategy.ID_KEY}.key})
546+
|MATCH (to:$endNode {key: event.to.${CUDIngestionStrategy.ID_KEY}.key})
547+
|MATCH (from)-[r:MY_REL]->(to)
548+
|SET r += event.properties
549+
""".trimMargin(), relationshipEvents)
550+
assertEquals(1, updateRel.events.size)
551+
552+
assertEquals(1, relationshipDeleteEvents.size)
553+
val deleteRel = findEventByQuery("""
554+
|${StreamsUtils.UNWIND}
555+
|MATCH (from:$startNode {key: event.from.${CUDIngestionStrategy.ID_KEY}.key})
556+
|MATCH (to:$endNode {key: event.to.${CUDIngestionStrategy.ID_KEY}.key})
557+
|MATCH (from)-[r:$relType]->(to)
558+
|DELETE r
559+
""".trimMargin(), relationshipDeleteEvents)
560+
assertEquals(1, deleteRel.events.size)
561+
}
562+
515563
@Test
516564
fun `should create, merge and update relationships with merge op in 'from' and 'to' node`() {
517565
// given

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCUDFormatTSE.kt

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ class KafkaEventSinkCUDFormatTSE : KafkaEventSinkBaseTSE() {
240240

241241
// when
242242
list.forEach {
243-
var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), it)
243+
val producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), it)
244244
kafkaProducer.send(producerRecord).get()
245245
}
246246

@@ -266,6 +266,57 @@ class KafkaEventSinkCUDFormatTSE : KafkaEventSinkBaseTSE() {
266266
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
267267
}
268268

269+
@Test
270+
fun `should ingest relationship data from CUD Events without properties field`() {
271+
272+
val relType = "MY_REL"
273+
val key = "key"
274+
val startNode = "SourceNode"
275+
val endNode = "TargetNode"
276+
277+
val start = CUDNodeRel(ids = mapOf(key to 1), labels = listOf(startNode))
278+
val end = CUDNodeRel(ids = mapOf(key to 1), labels = listOf(endNode))
279+
val relMerge = CUDRelationship(op = CUDOperations.merge, from = start, to = end, rel_type = relType)
280+
val relMergeAsBytes = JSONUtils.writeValueAsBytes(relMerge)
281+
282+
val topic = UUID.randomUUID().toString()
283+
db.setConfig("streams.sink.topic.cud", topic)
284+
db.start()
285+
db.beginTx().use {
286+
db.execute("""
287+
CREATE (:$startNode {key: 1})
288+
CREATE (:$endNode {key: 1})
289+
""".trimIndent())
290+
assertEquals(2, it.allNodes.count())
291+
it.commit()
292+
}
293+
294+
val producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), relMergeAsBytes)
295+
kafkaProducer.send(producerRecord).get()
296+
297+
val query = "MATCH p = (:$startNode)-[:$relType]->(:$endNode) RETURN count(p) AS count"
298+
299+
Assert.assertEventually(ThrowingSupplier {
300+
db.execute(query) {
301+
val count = it.columnAs<Long>("count")
302+
count.hasNext() && count.next() == 1L && !count.hasNext()
303+
}
304+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
305+
306+
val relDelete = CUDRelationship(op = CUDOperations.delete, from = start, to = end, rel_type = relType)
307+
val relDeleteAsBytes = JSONUtils.writeValueAsBytes(relDelete)
308+
309+
val producerRecordDelete = ProducerRecord(topic, UUID.randomUUID().toString(), relDeleteAsBytes)
310+
kafkaProducer.send(producerRecordDelete).get()
311+
312+
Assert.assertEventually(ThrowingSupplier {
313+
db.execute(query) {
314+
val count = it.columnAs<Long>("count")
315+
count.hasNext() && count.next() == 0L && !count.hasNext()
316+
}
317+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
318+
}
319+
269320
@Test
270321
fun `should delete relationship data from CUD Events`() {
271322
// given

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,56 @@ class Neo4jSinkTaskTest {
987987
}
988988
}
989989

990+
@Test
991+
fun `should create and delete relationship from CUD event without properties field`() {
992+
val relType = "MY_REL"
993+
val key = "key"
994+
val startNode = "SourceNode"
995+
val endNode = "TargetNode"
996+
val topic = UUID.randomUUID().toString()
997+
998+
db.defaultDatabaseService().beginTx().use {
999+
it.execute("CREATE (:$startNode {key: 1}) CREATE (:$endNode {key: 1})").close()
1000+
it.commit()
1001+
}
1002+
1003+
val start = CUDNodeRel(ids = mapOf(key to 1), labels = listOf(startNode))
1004+
val end = CUDNodeRel(ids = mapOf(key to 1), labels = listOf(endNode))
1005+
val relMerge = CUDRelationship(op = CUDOperations.merge, from = start, to = end, rel_type = relType)
1006+
val sinkRecordMerge = SinkRecord(topic, 1, null, null, null, JSONUtils.asMap(relMerge), 0L)
1007+
1008+
val props = mutableMapOf<String, String>()
1009+
props[Neo4jSinkConnectorConfig.SERVER_URI] = db.boltURI().toString()
1010+
props[Neo4jSinkConnectorConfig.TOPIC_CUD] = topic
1011+
props[Neo4jSinkConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
1012+
props[SinkTask.TOPICS_CONFIG] = topic
1013+
1014+
val task = Neo4jSinkTask()
1015+
task.initialize(mock(SinkTaskContext::class.java))
1016+
task.start(props)
1017+
task.put(listOf(sinkRecordMerge))
1018+
1019+
val queryCount = "MATCH p = (:$startNode)-[:$relType]->(:$endNode) RETURN count(p) AS count"
1020+
1021+
db.defaultDatabaseService().beginTx().use {
1022+
val countRels = it.execute(queryCount)
1023+
.columnAs<Long>("count")
1024+
.next()
1025+
assertEquals(1L, countRels)
1026+
}
1027+
1028+
val relDelete = CUDRelationship(op = CUDOperations.delete, from = start, to = end, rel_type = relType)
1029+
val sinkRecordDelete = SinkRecord(topic, 1, null, null, null, JSONUtils.asMap(relDelete), 1L)
1030+
task.put(listOf(sinkRecordDelete))
1031+
1032+
db.defaultDatabaseService().beginTx().use {
1033+
val countRels = it.execute(queryCount)
1034+
.columnAs<Long>("count")
1035+
.next()
1036+
assertEquals(0L, countRels)
1037+
}
1038+
}
1039+
9901040
@Test
9911041
fun `should ingest node data from CUD Events`() {
9921042
// given

0 commit comments

Comments
 (0)