Skip to content

Commit 3bc0a26

Browse files
authored
Fixes #426: Add option to create non-existent nodes at relationship creation (#436)
* fixes #426: Add option to create non-existent nodes at relationship creation * updated adoc * added CudOperations validation * changed method as ext funct * fixed mixed merge and match rel statement - added test - changes review * added note in adoc * added note in adoc
1 parent f8e65d7 commit 3bc0a26

File tree

6 files changed

+723
-51
lines changed

6 files changed

+723
-51
lines changed

common/src/main/kotlin/streams/service/sink/strategy/CUDIngestionStrategy.kt

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import streams.utils.IngestionUtils.getNodeKeysAsString
1111
import streams.utils.StreamsUtils
1212

1313

14-
enum class CUDOperations { create, merge, update, delete }
14+
enum class CUDOperations { create, merge, update, delete, match }
1515

1616
abstract class CUD {
1717
abstract val op: CUDOperations
@@ -35,7 +35,9 @@ data class CUDNode(override val op: CUDOperations,
3535
}
3636

3737
data class CUDNodeRel(val ids: Map<String, Any?> = emptyMap(),
38-
val labels: List<String>)
38+
val labels: List<String>,
39+
val op: CUDOperations = CUDOperations.match)
40+
3941
data class CUDRelationship(override val op: CUDOperations,
4042
override val properties: Map<String, Any?> = emptyMap(),
4143
val rel_type: String,
@@ -64,9 +66,16 @@ class CUDIngestionStrategy: IngestionStrategy {
6466
@JvmStatic val PHYSICAL_ID_KEY = "_id"
6567
@JvmStatic val FROM_KEY = "from"
6668
@JvmStatic val TO_KEY = "to"
69+
70+
private val LIST_VALID_CUD_NODE_REL = listOf(CUDOperations.merge, CUDOperations.create, CUDOperations.match)
71+
private val LIST_VALID_CUD_REL = listOf(CUDOperations.create, CUDOperations.merge, CUDOperations.update)
6772
}
6873

69-
data class NodeRelMetadata(val labels: List<String>, val ids: Set<String>)
74+
data class NodeRelMetadata(val labels: List<String>, val ids: Set<String>, val op: CUDOperations = CUDOperations.match)
75+
76+
private fun CUDRelationship.isValidOperation(): Boolean = from.op in LIST_VALID_CUD_NODE_REL && to.op in LIST_VALID_CUD_NODE_REL && op in LIST_VALID_CUD_REL
77+
78+
private fun NodeRelMetadata.getOperation() = op.toString().toUpperCase()
7079

7180
private fun buildNodeLookupByIds(keyword: String = "MATCH", ids: Set<String>, labels: List<String>, identifier: String = "n", field: String = ""): String {
7281
val fullField = if (field.isNotBlank()) "$field." else field
@@ -86,8 +95,9 @@ class CUDIngestionStrategy: IngestionStrategy {
8695
private fun buildRelCreateStatement(from: NodeRelMetadata, to: NodeRelMetadata,
8796
rel_type: String): String = """
8897
|${StreamsUtils.UNWIND}
89-
|${buildNodeLookupByIds(ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
90-
|${buildNodeLookupByIds(ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
98+
|${buildNodeLookupByIds(keyword = from.getOperation(), ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
99+
|${StreamsUtils.WITH_EVENT_FROM}
100+
|${buildNodeLookupByIds(keyword = to.getOperation(), ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
91101
|CREATE ($FROM_KEY)-[r:${rel_type.quote()}]->($TO_KEY)
92102
|SET r = event.properties
93103
""".trimMargin()
@@ -101,8 +111,9 @@ class CUDIngestionStrategy: IngestionStrategy {
101111
private fun buildRelMergeStatement(from: NodeRelMetadata, to: NodeRelMetadata,
102112
rel_type: String): String = """
103113
|${StreamsUtils.UNWIND}
104-
|${buildNodeLookupByIds(ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
105-
|${buildNodeLookupByIds(ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
114+
|${buildNodeLookupByIds(keyword = from.getOperation(), ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
115+
|${StreamsUtils.WITH_EVENT_FROM}
116+
|${buildNodeLookupByIds(keyword = to.getOperation(), ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
106117
|MERGE ($FROM_KEY)-[r:${rel_type.quote()}]->($TO_KEY)
107118
|SET r += event.properties
108119
""".trimMargin()
@@ -164,9 +175,9 @@ class CUDIngestionStrategy: IngestionStrategy {
164175
try {
165176
val data = toCUDEntity<CUDNode>(it)
166177
when (data?.op) {
167-
CUDOperations.delete, null -> null
168178
CUDOperations.merge -> if (data.ids.isNotEmpty() && data.properties.isNotEmpty()) data else null // TODO send to the DLQ the null
169-
else -> if (data.properties.isNotEmpty()) data else null // TODO send to the DLQ the null
179+
CUDOperations.update, CUDOperations.create -> if (data.properties.isNotEmpty()) data else null // TODO send to the DLQ the null
180+
else -> null
170181
}
171182
} catch (e: Exception) {
172183
null
@@ -221,9 +232,9 @@ class CUDIngestionStrategy: IngestionStrategy {
221232
it.value?.let {
222233
try {
223234
val data = toCUDEntity<CUDRelationship>(it)
224-
when (data?.op) {
225-
CUDOperations.delete, null -> null // TODO send to the DLQ the null
226-
else -> if (data.from.ids.isNotEmpty() && data.to.ids.isNotEmpty() && data.properties.isNotEmpty()) data else null // TODO send to the DLQ the null
235+
when {
236+
data!!.isValidOperation() -> if (data.from.ids.isNotEmpty() && data.to.ids.isNotEmpty() && data.properties.isNotEmpty()) data else null // TODO send to the DLQ the null
237+
else -> null // TODO send to the DLQ the null
227238
}
228239
} catch (e: Exception) {
229240
null
@@ -233,7 +244,7 @@ class CUDIngestionStrategy: IngestionStrategy {
233244
.groupBy({ it.op }, { it })
234245

235246
return data.flatMap { (op, list) ->
236-
list.groupBy { Triple(NodeRelMetadata(getLabels(it.from), it.from.ids.keys), NodeRelMetadata(getLabels(it.to), it.to.ids.keys), it.rel_type) }
247+
list.groupBy { Triple(NodeRelMetadata(getLabels(it.from), it.from.ids.keys, it.from.op), NodeRelMetadata(getLabels(it.to), it.to.ids.keys, it.to.op), it.rel_type) }
237248
.map {
238249
val (from, to, rel_type) = it.key
239250
val query = when (op) {

common/src/main/kotlin/streams/utils/StreamsUtils.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ object StreamsUtils {
1313

1414
@JvmStatic val UNWIND: String = "UNWIND \$events AS event"
1515

16+
@JvmStatic val WITH_EVENT_FROM: String = "WITH event, from"
17+
1618
@JvmStatic val STREAMS_CONFIG_PREFIX = "streams."
1719

1820
@JvmStatic val STREAMS_SINK_TOPIC_PREFIX = "sink.topic.cypher."

0 commit comments

Comments
 (0)