Skip to content

Commit 5646428

Browse files
conker84moxious
authored andcommitted
fixes #188: Support Specialized Sink Classes: support CUD format (#224)
See documentation added for CUD format
1 parent 9a90164 commit 5646428

File tree

23 files changed

+1944
-205
lines changed

23 files changed

+1944
-205
lines changed

common/src/main/kotlin/streams/service/StreamsSinkService.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import streams.service.sink.strategy.IngestionStrategy
66
const val STREAMS_TOPIC_KEY: String = "streams.sink.topic"
77
const val STREAMS_TOPIC_CDC_KEY: String = "streams.sink.topic.cdc"
88

9-
enum class TopicTypeGroup { CYPHER, CDC, PATTERN }
9+
enum class TopicTypeGroup { CYPHER, CDC, PATTERN, CUD }
1010
enum class TopicType(val group: TopicTypeGroup, val key: String) {
1111
CDC_SOURCE_ID(group = TopicTypeGroup.CDC, key = "$STREAMS_TOPIC_CDC_KEY.sourceId"),
1212
CYPHER(group = TopicTypeGroup.CYPHER, key = "$STREAMS_TOPIC_KEY.cypher"),
1313
PATTERN_NODE(group = TopicTypeGroup.PATTERN, key = "$STREAMS_TOPIC_KEY.pattern.node"),
1414
PATTERN_RELATIONSHIP(group = TopicTypeGroup.PATTERN, key = "$STREAMS_TOPIC_KEY.pattern.relationship"),
15-
CDC_SCHEMA(group = TopicTypeGroup.CDC, key = "$STREAMS_TOPIC_CDC_KEY.schema")
15+
CDC_SCHEMA(group = TopicTypeGroup.CDC, key = "$STREAMS_TOPIC_CDC_KEY.schema"),
16+
CUD(group = TopicTypeGroup.CUD, key = "$STREAMS_TOPIC_KEY.cud")
1617
}
1718

1819
data class StreamsSinkEntity(val key: Any?, val value: Any?)
@@ -40,7 +41,7 @@ abstract class StreamsSinkService(private val strategyMap: Map<TopicType, Any>)
4041
val topicType = getTopicType(topic) ?: return
4142
when (topicType.group) {
4243
TopicTypeGroup.CYPHER -> writeWithCypherTemplate(topic, params)
43-
TopicTypeGroup.CDC -> writeWithStrategy(params, strategyMap.getValue(topicType) as IngestionStrategy)
44+
TopicTypeGroup.CDC, TopicTypeGroup.CUD -> writeWithStrategy(params, strategyMap.getValue(topicType) as IngestionStrategy)
4445
TopicTypeGroup.PATTERN -> {
4546
val strategyMap = strategyMap[topicType] as Map<String, IngestionStrategy>
4647
writeWithStrategy(params, strategyMap.getValue(topic))

common/src/main/kotlin/streams/service/Topics.kt

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,21 @@ import kotlin.reflect.jvm.javaType
66
data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
77
val cdcSourceIdTopics: Set<String> = emptySet(),
88
val cdcSchemaTopics: Set<String> = emptySet(),
9+
val cudTopics: Set<String> = emptySet(),
910
val nodePatternTopics: Map<String, NodePatternConfiguration> = emptyMap(),
1011
val relPatternTopics: Map<String, RelationshipPatternConfiguration> = emptyMap()) {
1112

1213
fun allTopics(): List<String> = this.asMap()
1314
.map {
14-
if (it.key.group == TopicTypeGroup.CDC) {
15+
if (it.key.group == TopicTypeGroup.CDC || it.key.group == TopicTypeGroup.CUD) {
1516
(it.value as Set<String>).toList()
1617
} else {
1718
(it.value as Map<String, Any>).keys.toList()
1819
}
1920
}
2021
.flatten()
2122

22-
fun asMap(): Map<TopicType, Any> = mapOf(TopicType.CYPHER to cypherTopics,
23+
fun asMap(): Map<TopicType, Any> = mapOf(TopicType.CYPHER to cypherTopics, TopicType.CUD to cudTopics,
2324
TopicType.CDC_SCHEMA to cdcSchemaTopics, TopicType.CDC_SOURCE_ID to cdcSourceIdTopics,
2425
TopicType.PATTERN_NODE to nodePatternTopics, TopicType.PATTERN_RELATIONSHIP to relPatternTopics)
2526

@@ -28,6 +29,7 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
2829
val cypherTopicPrefix = TopicType.CYPHER.key.replace(prefix, toReplace)
2930
val sourceIdKey = TopicType.CDC_SOURCE_ID.key.replace(prefix, toReplace)
3031
val schemaKey = TopicType.CDC_SCHEMA.key.replace(prefix, toReplace)
32+
val cudKey = TopicType.CUD.key.replace(prefix, toReplace)
3133
val nodePatterKey = TopicType.PATTERN_NODE.key.replace(prefix, toReplace)
3234
val relPatterKey = TopicType.PATTERN_RELATIONSHIP.key.replace(prefix, toReplace)
3335
val cypherTopics = TopicUtils.filterByPrefix(config, cypherTopicPrefix)
@@ -37,16 +39,17 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
3739
val relPatternTopics = TopicUtils
3840
.filterByPrefix(config, relPatterKey)
3941
.mapValues { RelationshipPatternConfiguration.parse(it.value) }
40-
val cdcSourceIdTopics = TopicUtils.filterCDCTopics(config[sourceIdKey] as? String)
41-
val cdcSchemaTopics = TopicUtils.filterCDCTopics(config[schemaKey] as? String)
42-
return Topics(cypherTopics, cdcSourceIdTopics, cdcSchemaTopics, nodePatternTopics, relPatternTopics)
42+
val cdcSourceIdTopics = TopicUtils.splitTopics(config[sourceIdKey] as? String)
43+
val cdcSchemaTopics = TopicUtils.splitTopics(config[schemaKey] as? String)
44+
val cudTopics = TopicUtils.splitTopics(config[cudKey] as? String)
45+
return Topics(cypherTopics, cdcSourceIdTopics, cdcSchemaTopics, cudTopics, nodePatternTopics, relPatternTopics)
4346
}
4447
}
4548
}
4649

4750
object TopicUtils {
4851

49-
@JvmStatic val CDC_TOPIC_SEPARATOR = ";"
52+
@JvmStatic val TOPIC_SEPARATOR = ";"
5053

5154
fun filterByPrefix(config: Map<*, *>, prefix: String): Map<String, String> {
5255
val fullPrefix = "$prefix."
@@ -56,11 +59,11 @@ object TopicUtils {
5659
.mapValues { it.value.toString() }
5760
}
5861

59-
fun filterCDCTopics(cdcMergeTopicsString: String?): Set<String> {
62+
fun splitTopics(cdcMergeTopicsString: String?): Set<String> {
6063
return if (cdcMergeTopicsString.isNullOrBlank()) {
6164
emptySet()
6265
} else {
63-
cdcMergeTopicsString.split(CDC_TOPIC_SEPARATOR).toSet()
66+
cdcMergeTopicsString.split(TOPIC_SEPARATOR).toSet()
6467
}
6568
}
6669

@@ -84,6 +87,7 @@ object TopicUtils {
8487
when (type) {
8588
TopicType.CDC_SOURCE_ID -> SourceIdIngestionStrategy(sourceIdStrategyConfig)
8689
TopicType.CDC_SCHEMA -> SchemaIngestionStrategy()
90+
TopicType.CUD -> CUDIngestionStrategy()
8791
TopicType.PATTERN_NODE -> {
8892
val map = config as Map<String, NodePatternConfiguration>
8993
map.mapValues { NodePatternIngestionStrategy(it.value) }
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
package streams.service.sink.strategy
2+
3+
import streams.events.EntityType
4+
import streams.extensions.quote
5+
import streams.serialization.JSONUtils
6+
import streams.service.StreamsSinkEntity
7+
import streams.service.sink.strategy.CUDIngestionStrategy.Companion.FROM_KEY
8+
import streams.service.sink.strategy.CUDIngestionStrategy.Companion.TO_KEY
9+
import streams.utils.IngestionUtils.getLabelsAsString
10+
import streams.utils.IngestionUtils.getNodeKeysAsString
11+
import streams.utils.StreamsUtils
12+
13+
14+
enum class CUDOperations { create, merge, update, delete }
15+
16+
abstract class CUD {
17+
abstract val op: CUDOperations
18+
abstract val type: EntityType
19+
abstract val properties: Map<String, Any?>
20+
}
21+
22+
data class CUDNode(override val op: CUDOperations,
23+
override val properties: Map<String, Any?> = emptyMap(),
24+
val ids: Map<String, Any?> = emptyMap(),
25+
val detach: Boolean = true,
26+
val labels: List<String> = emptyList()): CUD() {
27+
override val type = EntityType.node
28+
29+
fun toMap(): Map<String, Any> {
30+
return when (op) {
31+
CUDOperations.delete -> mapOf("ids" to ids)
32+
else -> mapOf("ids" to ids, "properties" to properties)
33+
}
34+
}
35+
}
36+
37+
data class CUDNodeRel(val ids: Map<String, Any?> = emptyMap(),
38+
val labels: List<String>)
39+
data class CUDRelationship(override val op: CUDOperations,
40+
override val properties: Map<String, Any?> = emptyMap(),
41+
val rel_type: String,
42+
val from: CUDNodeRel,
43+
val to: CUDNodeRel): CUD() {
44+
override val type = EntityType.relationship
45+
46+
fun toMap(): Map<String, Any> {
47+
val from = mapOf("ids" to from.ids)
48+
val to = mapOf("ids" to to.ids)
49+
return when (op) {
50+
CUDOperations.delete -> mapOf(FROM_KEY to from,
51+
TO_KEY to to)
52+
else -> mapOf(FROM_KEY to from,
53+
TO_KEY to to,
54+
"properties" to properties)
55+
}
56+
}
57+
}
58+
59+
60+
class CUDIngestionStrategy: IngestionStrategy {
61+
62+
companion object {
63+
@JvmStatic val ID_KEY = "ids"
64+
@JvmStatic val PHYSICAL_ID_KEY = "_id"
65+
@JvmStatic val FROM_KEY = "from"
66+
@JvmStatic val TO_KEY = "to"
67+
}
68+
69+
data class NodeRelMetadata(val labels: List<String>, val ids: Set<String>)
70+
71+
private fun buildNodeLookupByIds(keyword: String = "MATCH", ids: Set<String>, labels: List<String>, identifier: String = "n", field: String = ""): String {
72+
val fullField = if (field.isNotBlank()) "$field." else field
73+
val quotedIdentifier = identifier.quote()
74+
return when (ids.contains(PHYSICAL_ID_KEY)) {
75+
true -> "MATCH ($quotedIdentifier) WHERE id($quotedIdentifier) = event.$fullField$ID_KEY._id"
76+
else -> "$keyword ($quotedIdentifier${getLabelsAsString(labels)} {${getNodeKeysAsString(keys = ids, prefix = "$fullField$ID_KEY")}})"
77+
}
78+
}
79+
80+
private fun buildNodeCreateStatement(labels: List<String>): String = """
81+
|${StreamsUtils.UNWIND}
82+
|CREATE (n${getLabelsAsString(labels)})
83+
|SET n = event.properties
84+
""".trimMargin()
85+
86+
private fun buildRelCreateStatement(from: NodeRelMetadata, to: NodeRelMetadata,
87+
rel_type: String): String = """
88+
|${StreamsUtils.UNWIND}
89+
|${buildNodeLookupByIds(ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
90+
|${buildNodeLookupByIds(ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
91+
|CREATE ($FROM_KEY)-[r:${rel_type.quote()}]->($TO_KEY)
92+
|SET r = event.properties
93+
""".trimMargin()
94+
95+
private fun buildNodeMergeStatement(labels: List<String>, ids: Set<String>): String = """
96+
|${StreamsUtils.UNWIND}
97+
|${buildNodeLookupByIds(keyword = "MERGE", ids = ids, labels = labels)}
98+
|SET n += event.properties
99+
""".trimMargin()
100+
101+
private fun buildRelMergeStatement(from: NodeRelMetadata, to: NodeRelMetadata,
102+
rel_type: String): String = """
103+
|${StreamsUtils.UNWIND}
104+
|${buildNodeLookupByIds(ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
105+
|${buildNodeLookupByIds(ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
106+
|MERGE ($FROM_KEY)-[r:${rel_type.quote()}]->($TO_KEY)
107+
|SET r += event.properties
108+
""".trimMargin()
109+
110+
private fun buildNodeUpdateStatement(labels: List<String>, ids: Set<String>): String = """
111+
|${StreamsUtils.UNWIND}
112+
|${buildNodeLookupByIds(ids = ids, labels = labels)}
113+
|SET n += event.properties
114+
""".trimMargin()
115+
116+
private fun buildRelUpdateStatement(from: NodeRelMetadata, to: NodeRelMetadata,
117+
rel_type: String): String = """
118+
|${StreamsUtils.UNWIND}
119+
|${buildNodeLookupByIds(ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
120+
|${buildNodeLookupByIds(ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
121+
|MATCH ($FROM_KEY)-[r:${rel_type.quote()}]->($TO_KEY)
122+
|SET r += event.properties
123+
""".trimMargin()
124+
125+
private fun buildDeleteStatement(labels: List<String>, ids: Set<String>, detach: Boolean): String = """
126+
|${StreamsUtils.UNWIND}
127+
|${buildNodeLookupByIds(ids = ids, labels = labels)}
128+
|${if (detach) "DETACH " else ""}DELETE n
129+
""".trimMargin()
130+
131+
private fun buildRelDeleteStatement(from: NodeRelMetadata, to: NodeRelMetadata,
132+
rel_type: String): String = """
133+
|${StreamsUtils.UNWIND}
134+
|${buildNodeLookupByIds(ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
135+
|${buildNodeLookupByIds(ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
136+
|MATCH ($FROM_KEY)-[r:${rel_type.quote()}]->($TO_KEY)
137+
|DELETE r
138+
""".trimMargin()
139+
140+
private inline fun <reified T: CUD> toCUDEntity(it: Any): T? {
141+
return when (it) {
142+
is T -> it
143+
is Map<*, *> -> {
144+
val type = it["type"]?.toString()
145+
val entityType = if (type == null) null else EntityType.valueOf(type)
146+
when {
147+
entityType == null -> throw RuntimeException("No `type` field found")
148+
entityType != null && EntityType.node == entityType && T::class.java != CUDNode::class.java -> null
149+
entityType != null && EntityType.relationship == entityType && T::class.java != CUDRelationship::class.java -> null
150+
else -> JSONUtils.convertValue<T>(it)
151+
}
152+
}
153+
else -> null
154+
}
155+
}
156+
157+
private fun getLabels(relNode: CUDNodeRel) = if (relNode.ids.containsKey(PHYSICAL_ID_KEY)) emptyList() else relNode.labels
158+
private fun getLabels(node: CUDNode) = if (node.ids.containsKey(PHYSICAL_ID_KEY)) emptyList() else node.labels
159+
160+
override fun mergeNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
161+
val data = events
162+
.mapNotNull {
163+
it.value?.let {
164+
try {
165+
val data = toCUDEntity<CUDNode>(it)
166+
when (data?.op) {
167+
CUDOperations.delete, null -> null
168+
CUDOperations.merge -> if (data.ids.isNotEmpty() && data.properties.isNotEmpty()) data else null // TODO send to the DLQ the null
169+
else -> if (data.properties.isNotEmpty()) data else null // TODO send to the DLQ the null
170+
}
171+
} catch (e: Exception) {
172+
null
173+
}
174+
}
175+
}
176+
.groupBy({ it.op }, { it })
177+
178+
val create = data[CUDOperations.create]
179+
.orEmpty()
180+
.groupBy { getLabels(it) }
181+
.map { QueryEvents(buildNodeCreateStatement(it.key), it.value.map { it.toMap() }) }
182+
val merge = data[CUDOperations.merge]
183+
.orEmpty()
184+
.groupBy { getLabels(it) to it.ids.keys }
185+
.map { QueryEvents(buildNodeMergeStatement(it.key.first, it.key.second), it.value.map { it.toMap() }) }
186+
val update = data[CUDOperations.update]
187+
.orEmpty()
188+
.groupBy { getLabels(it) to it.ids.keys }
189+
.map { QueryEvents(buildNodeUpdateStatement(it.key.first, it.key.second), it.value.map { it.toMap() }) }
190+
return (create + merge + update) // we'll group the data because of in case of `_id` key is present the generated queries are the same for update/merge
191+
.map { it.query to it.events }
192+
.groupBy({ it.first }, { it.second })
193+
.map { QueryEvents(it.key, it.value.flatten()) }
194+
}
195+
196+
override fun deleteNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
197+
return events
198+
.mapNotNull {
199+
it.value?.let {
200+
try {
201+
val data = toCUDEntity<CUDNode>(it)
202+
when (data?.op) {
203+
CUDOperations.delete -> if (data.ids.isNotEmpty() && data.properties.isEmpty()) data else null // TODO send to the DLQ the null
204+
else -> null // TODO send to the DLQ the null
205+
}
206+
} catch (e: Exception) {
207+
null
208+
}
209+
}
210+
}
211+
.groupBy { Triple(it.labels, it.ids.keys, it.detach) }
212+
.map {
213+
val (labels, keys, detach) = it.key
214+
QueryEvents(buildDeleteStatement(labels, keys, detach), it.value.map { it.toMap() })
215+
}
216+
}
217+
218+
override fun mergeRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
219+
val data = events
220+
.mapNotNull {
221+
it.value?.let {
222+
try {
223+
val data = toCUDEntity<CUDRelationship>(it)
224+
when (data?.op) {
225+
CUDOperations.delete, null -> null // TODO send to the DLQ the null
226+
else -> if (data.from.ids.isNotEmpty() && data.to.ids.isNotEmpty() && data.properties.isNotEmpty()) data else null // TODO send to the DLQ the null
227+
}
228+
} catch (e: Exception) {
229+
null
230+
}
231+
}
232+
}
233+
.groupBy({ it.op }, { it })
234+
235+
return data.flatMap { (op, list) ->
236+
list.groupBy { Triple(NodeRelMetadata(getLabels(it.from), it.from.ids.keys), NodeRelMetadata(getLabels(it.to), it.to.ids.keys), it.rel_type) }
237+
.map {
238+
val (from, to, rel_type) = it.key
239+
val query = when (op) {
240+
CUDOperations.create -> buildRelCreateStatement(from, to, rel_type)
241+
CUDOperations.merge -> buildRelMergeStatement(from, to, rel_type)
242+
else -> buildRelUpdateStatement(from, to, rel_type)
243+
}
244+
QueryEvents(query, it.value.map { it.toMap() })
245+
}
246+
}
247+
}
248+
249+
override fun deleteRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
250+
return events
251+
.mapNotNull {
252+
it.value?.let {
253+
try {
254+
val data = toCUDEntity<CUDRelationship>(it)
255+
when (data?.op) {
256+
CUDOperations.delete -> if (data.from.ids.isNotEmpty() && data.to.ids.isNotEmpty() && data.properties.isEmpty()) data else null // TODO send to the DLQ the null
257+
else -> null // TODO send to the DLQ the null
258+
}
259+
} catch (e: Exception) {
260+
null
261+
}
262+
}
263+
}
264+
.groupBy { Triple(NodeRelMetadata(getLabels(it.from), it.from.ids.keys), NodeRelMetadata(getLabels(it.to), it.to.ids.keys), it.rel_type) }
265+
.map {
266+
val (from, to, rel_type) = it.key
267+
QueryEvents(buildRelDeleteStatement(from, to, rel_type), it.value.map { it.toMap() })
268+
}
269+
}
270+
271+
}

common/src/main/kotlin/streams/service/sink/strategy/NodePatternIngestionStrategy.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class NodePatternIngestionStrategy(private val nodePatternConfiguration: NodePat
1212

1313
private val mergeNodeTemplate: String = """
1414
|${StreamsUtils.UNWIND}
15-
|MERGE (n:${getLabelsAsString(nodePatternConfiguration.labels)}{${
15+
|MERGE (n${getLabelsAsString(nodePatternConfiguration.labels)}{${
1616
getNodeMergeKeys("keys", nodePatternConfiguration.keys)
1717
}})
1818
|SET n = event.properties
@@ -21,7 +21,7 @@ class NodePatternIngestionStrategy(private val nodePatternConfiguration: NodePat
2121

2222
private val deleteNodeTemplate: String = """
2323
|${StreamsUtils.UNWIND}
24-
|MATCH (n:${getLabelsAsString(nodePatternConfiguration.labels)}{${
24+
|MATCH (n${getLabelsAsString(nodePatternConfiguration.labels)}{${
2525
getNodeMergeKeys("keys", nodePatternConfiguration.keys)
2626
}})
2727
|DETACH DELETE n

0 commit comments

Comments
 (0)