Skip to content

Commit f34d041

Browse files
conker84mneedham
authored andcommitted
fixes #99: Provide a roundtrip-sink-config (#169)
1 parent a7b73bd commit f34d041

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2561
-227
lines changed

common/src/main/kotlin/streams/extensions/CommonExtensions.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package streams.extensions
22

33
import org.neo4j.graphdb.Node
4+
import javax.lang.model.SourceVersion
45

56
fun Map<String,String>.getInt(name:String, defaultValue: Int) = this.get(name)?.toInt() ?: defaultValue
67

@@ -10,4 +11,8 @@ fun Node.labelNames() : List<String> {
1011

1112
fun String.toPointCase(): String {
1213
return this.split("(?<=[a-z])(?=[A-Z])".toRegex()).joinToString(separator = ".").toLowerCase()
14+
}
15+
16+
fun String.quote(): String {
17+
return if (SourceVersion.isIdentifier(this)) this else "`$this`"
1318
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package streams.service
2+
3+
import streams.serialization.JSONUtils
4+
import streams.service.sink.strategy.IngestionStrategy
5+
import streams.service.sink.strategy.SourceIdIngestionStrategy
6+
import streams.service.sink.strategy.SchemaIngestionStrategy
7+
8+
9+
const val STREAMS_TOPIC_KEY: String = "streams.sink.topic"
10+
const val STREAMS_TOPIC_CDC_KEY: String = "streams.sink.topic.cdc"
11+
12+
enum class TopicTypeGroup { CYPHER, CDC }
13+
enum class TopicType(val group: TopicTypeGroup, val key: String) {
14+
CDC_SOURCE_ID(group = TopicTypeGroup.CDC, key = "$STREAMS_TOPIC_CDC_KEY.sourceId"),
15+
CYPHER(group = TopicTypeGroup.CYPHER, key = "$STREAMS_TOPIC_KEY.cypher"),
16+
CDC_SCHEMA(group = TopicTypeGroup.CDC, key = "$STREAMS_TOPIC_CDC_KEY.schema")
17+
}
18+
19+
abstract class StreamsSinkService(private val strategyMap: Map<TopicType, Any>) {
20+
21+
abstract fun getTopicType(topic: String): TopicType?
22+
abstract fun getCypherTemplate(topic: String): String?
23+
abstract fun write(query: String, events: Collection<Any>)
24+
25+
private fun writeWithStrategy(params: Collection<Any>, strategy: IngestionStrategy) {
26+
val data = params
27+
.map { JSONUtils.asStreamsTransactionEvent(it) }
28+
29+
strategy.mergeNodeEvents(data).forEach { write(it.query, it.events) }
30+
strategy.deleteNodeEvents(data).forEach { write(it.query, it.events) }
31+
32+
strategy.mergeRelationshipEvents(data).forEach { write(it.query, it.events) }
33+
strategy.deleteRelationshipEvents(data).forEach { write(it.query, it.events) }
34+
}
35+
36+
private fun writeWithCypherTemplate(topic: String, params: Collection<Any>) {
37+
val query = getCypherTemplate(topic) ?: return
38+
write(query, params)
39+
}
40+
41+
fun writeForTopic(topic: String, params: Collection<Any>) {
42+
val topicType = getTopicType(topic) ?: return
43+
when (topicType.group) {
44+
TopicTypeGroup.CYPHER -> writeWithCypherTemplate(topic, params)
45+
TopicTypeGroup.CDC -> writeWithStrategy(params, strategyMap.getValue(topicType) as IngestionStrategy)
46+
}
47+
}
48+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package streams.service
2+
3+
import streams.service.sink.strategy.SchemaIngestionStrategy
4+
import streams.service.sink.strategy.SourceIdIngestionStrategy
5+
import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
6+
import kotlin.reflect.jvm.javaType
7+
8+
data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
9+
val cdcSourceIdTopics: Set<String> = emptySet(),
10+
val cdcSchemaTopics: Set<String> = emptySet()) {
11+
12+
fun allTopics(): List<String> = this.asMap()
13+
.map {
14+
if (it.key.group == TopicTypeGroup.CDC) {
15+
(it.value as Set<String>).toList()
16+
} else {
17+
(it.value as Map<String, Any>).keys.toList()
18+
}
19+
}
20+
.flatten()
21+
22+
fun asMap(): Map<TopicType, Any> = mapOf(TopicType.CYPHER to cypherTopics,
23+
TopicType.CDC_SCHEMA to cdcSchemaTopics, TopicType.CDC_SOURCE_ID to cdcSourceIdTopics)
24+
25+
companion object {
26+
fun from(config: Map<*, *>, prefix: String, toReplace: String = ""): Topics {
27+
val cypherTopicPrefix = TopicType.CYPHER.key.replace(prefix, toReplace)
28+
val sourceIdKey = TopicType.CDC_SOURCE_ID.key.replace(prefix, toReplace)
29+
val schemaKey = TopicType.CDC_SCHEMA.key.replace(prefix, toReplace)
30+
val cypherTopics = TopicUtils.filterByPrefix(config, cypherTopicPrefix)
31+
val cdcSourceIdTopics = TopicUtils.filterCDCTopics(config[sourceIdKey] as? String)
32+
val cdcSchemaTopics = TopicUtils.filterCDCTopics(config[schemaKey] as? String)
33+
return Topics(cypherTopics, cdcSourceIdTopics, cdcSchemaTopics)
34+
}
35+
}
36+
}
37+
38+
object TopicUtils {
39+
40+
@JvmStatic val CDC_TOPIC_SEPARATOR = ";"
41+
42+
fun filterByPrefix(config: Map<*, *>, prefix: String): Map<String, String> {
43+
val fullPrefix = "$prefix."
44+
return config
45+
.filterKeys { it.toString().startsWith(fullPrefix) }
46+
.mapKeys { it.key.toString().replace(fullPrefix, "") }
47+
.mapValues { it.value.toString() }
48+
}
49+
50+
fun filterCDCTopics(cdcMergeTopicsString: String?): Set<String> {
51+
return if (cdcMergeTopicsString.isNullOrBlank()) {
52+
emptySet()
53+
} else {
54+
cdcMergeTopicsString.split(CDC_TOPIC_SEPARATOR).toSet()
55+
}
56+
}
57+
58+
inline fun <reified T: Throwable> validate(topics: Topics) {
59+
val crossDefinedTopics = (topics.cdcSourceIdTopics + topics.cdcSchemaTopics).intersect(topics.cypherTopics.keys)
60+
val exceptionStringConstructor = T::class.constructors
61+
.first { it.parameters.size == 1 && it.parameters[0].type.javaType == String::class.java }!!
62+
if (crossDefinedTopics.isNotEmpty()) {
63+
throw exceptionStringConstructor
64+
.call("The following topics are cross defined between Cypher template configuration and CDC configuration: $crossDefinedTopics")
65+
}
66+
67+
val cdcCrossDefinedTopics = topics.cdcSourceIdTopics.intersect(topics.cdcSchemaTopics)
68+
if (cdcCrossDefinedTopics.isNotEmpty()) {
69+
throw exceptionStringConstructor
70+
.call("The following topics are cross defined between CDC Merge and CDC Schema configuration: $cdcCrossDefinedTopics")
71+
}
72+
}
73+
74+
fun toStrategyMap(topics: Topics,
75+
sourceIdStrategyConfig: SourceIdIngestionStrategyConfig): Map<TopicType, Any> = topics.asMap()
76+
.filterKeys { it != TopicType.CYPHER }
77+
.mapValues {
78+
when (it.key) {
79+
TopicType.CDC_SOURCE_ID -> SourceIdIngestionStrategy(sourceIdStrategyConfig)
80+
TopicType.CDC_SCHEMA -> SchemaIngestionStrategy()
81+
else -> throw RuntimeException("Unsupported topic type ${it.key}")
82+
}
83+
}
84+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package streams.service.sink.strategy
2+
3+
import streams.events.*
4+
import streams.utils.Neo4jUtils
5+
import streams.utils.StreamsUtils
6+
7+
8+
data class QueryEvents(val query: String, val events: List<Map<String, Any?>>)
9+
10+
interface IngestionStrategy {
11+
fun mergeNodeEvents(events: List<StreamsTransactionEvent>): List<QueryEvents>
12+
fun deleteNodeEvents(events: List<StreamsTransactionEvent>): List<QueryEvents>
13+
fun mergeRelationshipEvents(events: List<StreamsTransactionEvent>): List<QueryEvents>
14+
fun deleteRelationshipEvents(events: List<StreamsTransactionEvent>): List<QueryEvents>
15+
}
16+
17+
data class RelationshipSchemaMetadata(val label: String,
18+
val startLabels: List<String>,
19+
val endLabels: List<String>,
20+
val startKeys: Set<String>,
21+
val endKeys: Set<String>) {
22+
constructor(payload: RelationshipPayload) : this(label = payload.label,
23+
startLabels = payload.start.labels.orEmpty(),
24+
endLabels = payload.end.labels.orEmpty(),
25+
startKeys = payload.start.ids.keys,
26+
endKeys = payload.end.ids.keys)
27+
}
28+
29+
data class NodeSchemaMetadata(val constraints: List<Constraint>,
30+
val labelsToAdd: List<String>,
31+
val labelsToDelete: List<String>,
32+
val keys: Set<String>)
33+
34+
35+
36+
data class NodeMergeMetadata(val labelsToAdd: Set<String>,
37+
val labelsToDelete: Set<String>)
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package streams.service.sink.strategy
2+
3+
import streams.events.*
4+
import streams.extensions.quote
5+
import streams.utils.IngestionUtils.getLabelsAsString
6+
import streams.utils.IngestionUtils.getNodeKeysAsString
7+
import streams.utils.IngestionUtils.keySeparator
8+
import streams.utils.IngestionUtils.labelSeparator
9+
import streams.utils.SchemaUtils.getNodeKeys
10+
import streams.utils.StreamsUtils
11+
12+
13+
private fun prepareRelationshipEvents(events: List<StreamsTransactionEvent>, withProperties: Boolean = true): Map<RelationshipSchemaMetadata, List<Map<String, Any>>> = events
14+
.mapNotNull {
15+
val payload = it.payload as RelationshipPayload
16+
17+
if (payload.start.ids.isEmpty() || payload.end.ids.isEmpty()) {
18+
null
19+
} else {
20+
val properties = payload.after?.properties ?: payload.before?.properties ?: emptyMap()
21+
22+
val key = RelationshipSchemaMetadata(it.payload)
23+
val value = if (withProperties) {
24+
mapOf("start" to payload.start.ids, "end" to payload.end.ids, "properties" to properties)
25+
} else {
26+
mapOf("start" to payload.start.ids, "end" to payload.end.ids)
27+
}
28+
29+
key to value
30+
}
31+
}
32+
.groupBy { it.first }
33+
.mapValues { it.value.map { it.second } }
34+
35+
class SchemaIngestionStrategy: IngestionStrategy {
36+
37+
override fun mergeRelationshipEvents(events: List<StreamsTransactionEvent>): List<QueryEvents> {
38+
if (events.isNullOrEmpty()) {
39+
return emptyList()
40+
}
41+
return prepareRelationshipEvents(events.filter { it.payload.type == EntityType.relationship && it.meta.operation != OperationType.deleted })
42+
.map {
43+
val label = it.key.label.quote()
44+
val query = """
45+
|${StreamsUtils.UNWIND}
46+
|MERGE (start:${getLabelsAsString(it.key.startLabels)}{${getNodeKeysAsString("start", it.key.startKeys)}})
47+
|MERGE (end:${getLabelsAsString(it.key.endLabels)}{${getNodeKeysAsString("end", it.key.endKeys)}})
48+
|MERGE (start)-[r:${label}]->(end)
49+
|SET r = event.properties
50+
""".trimMargin()
51+
QueryEvents(query, it.value)
52+
}
53+
}
54+
55+
override fun deleteRelationshipEvents(events: List<StreamsTransactionEvent>): List<QueryEvents> {
56+
if (events.isNullOrEmpty()) {
57+
return emptyList()
58+
}
59+
return prepareRelationshipEvents(events.filter { it.payload.type == EntityType.relationship && it.meta.operation == OperationType.deleted }, false)
60+
.map {
61+
val label = it.key.label.quote()
62+
val query = """
63+
|${StreamsUtils.UNWIND}
64+
|MATCH (start:${getLabelsAsString(it.key.startLabels)}{${getNodeKeysAsString("start", it.key.startKeys)}})
65+
|MATCH (end:${getLabelsAsString(it.key.endLabels)}{${getNodeKeysAsString("end", it.key.endKeys)}})
66+
|MATCH (start)-[r:$label]->(end)
67+
|DELETE r
68+
""".trimMargin()
69+
QueryEvents(query, it.value)
70+
}
71+
}
72+
73+
override fun deleteNodeEvents(events: List<StreamsTransactionEvent>): List<QueryEvents> {
74+
if (events.isNullOrEmpty()) {
75+
return emptyList()
76+
}
77+
return events
78+
.filter { it.payload.type == EntityType.node && it.meta.operation == OperationType.deleted }
79+
.mapNotNull {
80+
val changeEvtBefore = it.payload.before as NodeChange
81+
val constraints = getNodeConstraints(it) { it.type == StreamsConstraintType.UNIQUE }
82+
if (constraints.isEmpty()) {
83+
null
84+
} else {
85+
constraints to mapOf("properties" to changeEvtBefore.properties)
86+
}
87+
}
88+
.groupBy { it.first }
89+
.map {
90+
val labels = it.key.mapNotNull { it.label }
91+
val nodeKeys = it.key.flatMap { it.properties }.toSet()
92+
val query = """
93+
|${StreamsUtils.UNWIND}
94+
|MATCH (n:${getLabelsAsString(labels)}{${getNodeKeysAsString(keys = nodeKeys)}})
95+
|DETACH DELETE n
96+
""".trimMargin()
97+
QueryEvents(query, it.value.map { it.second })
98+
}
99+
}
100+
101+
override fun mergeNodeEvents(events: List<StreamsTransactionEvent>): List<QueryEvents> {
102+
if (events.isNullOrEmpty()) {
103+
return emptyList()
104+
}
105+
106+
val filterLabels: (List<String>, List<Constraint>) -> List<String> = { labels, constraints ->
107+
labels.filter { label -> !constraints.any { constraint -> constraint.label == label } }
108+
.map { it.quote() }
109+
}
110+
return events
111+
.filter { it.payload.type == EntityType.node && it.meta.operation != OperationType.deleted }
112+
.mapNotNull {
113+
val changeEvtAfter = it.payload.after as NodeChange
114+
val labelsAfter = changeEvtAfter.labels ?: emptyList()
115+
val labelsBefore = (it.payload.before as? NodeChange)?.labels.orEmpty()
116+
117+
val constraints = getNodeConstraints(it) { it.type == StreamsConstraintType.UNIQUE }
118+
if (constraints.isEmpty()) {
119+
null
120+
} else {
121+
val labelsToAdd = filterLabels((labelsAfter - labelsBefore), constraints)
122+
val labelsToDelete = filterLabels((labelsBefore - labelsAfter), constraints)
123+
124+
val propertyKeys = changeEvtAfter.properties?.keys ?: emptySet()
125+
val keys = getNodeKeys(labelsAfter, propertyKeys, constraints)
126+
127+
if (keys.isEmpty()) {
128+
null
129+
} else {
130+
val key = NodeSchemaMetadata(constraints = constraints,
131+
labelsToAdd = labelsToAdd, labelsToDelete = labelsToDelete,
132+
keys = keys)
133+
val value = mapOf("properties" to changeEvtAfter.properties)
134+
key to value
135+
}
136+
}
137+
}
138+
.groupBy { it.first }
139+
.map { map ->
140+
var query = """
141+
|${StreamsUtils.UNWIND}
142+
|MERGE (n:${getLabelsAsString(map.key.constraints.mapNotNull { it.label })}{${getNodeKeysAsString(keys = map.key.keys)}})
143+
|SET n = event.properties
144+
""".trimMargin()
145+
if (map.key.labelsToAdd.isNotEmpty()) {
146+
query += "\nSET n:${getLabelsAsString(map.key.labelsToAdd)}"
147+
}
148+
if (map.key.labelsToDelete.isNotEmpty()) {
149+
query += "\nREMOVE n:${getLabelsAsString(map.key.labelsToDelete)}"
150+
}
151+
QueryEvents(query, map.value.map { it.second })
152+
}
153+
}
154+
155+
private fun getNodeConstraints(event: StreamsTransactionEvent,
156+
filter: (Constraint) -> Boolean): List<Constraint> = event.schema.constraints.filter { filter(it) }
157+
158+
}

0 commit comments

Comments
 (0)