Skip to content

Commit cb65ed0

Browse files
authored
Merge pull request #208 from conker84/issue_198
fixes #198: Tombstone record Management
2 parents 8133dab + d9edc96 commit cb65ed0

32 files changed

+867
-502
lines changed

common/src/main/kotlin/streams/extensions/CommonExtensions.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
44
import org.apache.kafka.clients.consumer.OffsetAndMetadata
55
import org.apache.kafka.common.TopicPartition
66
import org.neo4j.graphdb.Node
7+
import streams.serialization.JSONUtils
8+
import streams.service.StreamsSinkEntity
79
import javax.lang.model.SourceVersion
810

911
fun Map<String,String>.getInt(name:String, defaultValue: Int) = this.get(name)?.toInt() ?: defaultValue
@@ -30,5 +32,10 @@ fun Map<String, Any?>.flatten(map: Map<String, Any?> = this, prefix: String = ""
3032
}
3133
}.toMap()
3234
}
35+
3336
fun <K, V> ConsumerRecord<K, V>.topicPartition() = TopicPartition(this.topic(), this.partition())
34-
fun <K, V> ConsumerRecord<K, V>.offsetAndMetadata(metadata: String = "") = OffsetAndMetadata(this.offset() + 1, metadata)
37+
fun <K, V> ConsumerRecord<K, V>.offsetAndMetadata(metadata: String = "") = OffsetAndMetadata(this.offset() + 1, metadata)
38+
39+
fun ConsumerRecord<ByteArray, ByteArray>.toStreamsSinkEntity(): StreamsSinkEntity = StreamsSinkEntity(
40+
if (this.key() != null) try { JSONUtils.readValue<Any>(this.key()) } catch (e: Exception) { String(this.key()) } else null,
41+
if (this.value() != null) JSONUtils.readValue<Any>(this.value()) else null)

common/src/main/kotlin/streams/service/StreamsSinkService.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package streams.service
22

3-
import streams.serialization.JSONUtils
4-
import streams.service.sink.strategy.*
5-
import java.util.concurrent.ConcurrentHashMap
3+
import streams.service.sink.strategy.IngestionStrategy
64

75

86
const val STREAMS_TOPIC_KEY: String = "streams.sink.topic"
@@ -17,26 +15,28 @@ enum class TopicType(val group: TopicTypeGroup, val key: String) {
1715
CDC_SCHEMA(group = TopicTypeGroup.CDC, key = "$STREAMS_TOPIC_CDC_KEY.schema")
1816
}
1917

18+
data class StreamsSinkEntity(val key: Any?, val value: Any?)
19+
2020
abstract class StreamsSinkService(private val strategyMap: Map<TopicType, Any>) {
2121

2222
abstract fun getTopicType(topic: String): TopicType?
2323
abstract fun getCypherTemplate(topic: String): String?
2424
abstract fun write(query: String, events: Collection<Any>)
2525

26-
private fun writeWithStrategy(data: Collection<Any>, strategy: IngestionStrategy) {
26+
private fun writeWithStrategy(data: Collection<StreamsSinkEntity>, strategy: IngestionStrategy) {
2727
strategy.mergeNodeEvents(data).forEach { write(it.query, it.events) }
2828
strategy.deleteNodeEvents(data).forEach { write(it.query, it.events) }
2929

3030
strategy.mergeRelationshipEvents(data).forEach { write(it.query, it.events) }
3131
strategy.deleteRelationshipEvents(data).forEach { write(it.query, it.events) }
3232
}
3333

34-
private fun writeWithCypherTemplate(topic: String, params: Collection<Any>) {
34+
private fun writeWithCypherTemplate(topic: String, params: Collection<StreamsSinkEntity>) {
3535
val query = getCypherTemplate(topic) ?: return
36-
write(query, params)
36+
write(query, params.mapNotNull { it.value })
3737
}
3838

39-
fun writeForTopic(topic: String, params: Collection<Any>) {
39+
fun writeForTopic(topic: String, params: Collection<StreamsSinkEntity>) {
4040
val topicType = getTopicType(topic) ?: return
4141
when (topicType.group) {
4242
TopicTypeGroup.CYPHER -> writeWithCypherTemplate(topic, params)

common/src/main/kotlin/streams/service/dlq/DeadLetterQueueService.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ data class DLQData(val originalTopic: String,
1313
val exception: Exception?) {
1414

1515
companion object {
16-
fun from(consumerRecord: ConsumerRecord<String, ByteArray>, exception: Exception?, executingClass: Class<*>?): DLQData {
16+
fun from(consumerRecord: ConsumerRecord<ByteArray, ByteArray>, exception: Exception?, executingClass: Class<*>?): DLQData {
1717
return DLQData(offset = consumerRecord.offset().toString(),
1818
originalTopic = consumerRecord.topic(),
1919
partition = consumerRecord.partition().toString(),
2020
timestamp = consumerRecord.timestamp(),
2121
exception = exception,
2222
executingClass = executingClass,
23-
key = consumerRecord.key().toByteArray(),
23+
key = consumerRecord.key(),
2424
value = consumerRecord.value())
2525
}
2626
}

common/src/main/kotlin/streams/service/sink/strategy/IngestionStrategy.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package streams.service.sink.strategy
22

33
import streams.events.*
4+
import streams.service.StreamsSinkEntity
45
import streams.utils.Neo4jUtils
56
import streams.utils.StreamsUtils
67

78

89
data class QueryEvents(val query: String, val events: List<Map<String, Any?>>)
910

1011
interface IngestionStrategy {
11-
fun mergeNodeEvents(events: Collection<Any>): List<QueryEvents>
12-
fun deleteNodeEvents(events: Collection<Any>): List<QueryEvents>
13-
fun mergeRelationshipEvents(events: Collection<Any>): List<QueryEvents>
14-
fun deleteRelationshipEvents(events: Collection<Any>): List<QueryEvents>
12+
fun mergeNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents>
13+
fun deleteNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents>
14+
fun mergeRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents>
15+
fun deleteRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents>
1516
}
1617

1718
data class RelationshipSchemaMetadata(val label: String,

common/src/main/kotlin/streams/service/sink/strategy/NodePatternIngestionStrategy.kt

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package streams.service.sink.strategy
22

33
import streams.extensions.flatten
4-
import streams.utils.IngestionUtils
4+
import streams.serialization.JSONUtils
5+
import streams.service.StreamsSinkEntity
56
import streams.utils.IngestionUtils.containsProp
67
import streams.utils.IngestionUtils.getLabelsAsString
78
import streams.utils.IngestionUtils.getNodeMergeKeys
89
import streams.utils.StreamsUtils
910

1011
class NodePatternIngestionStrategy(private val nodePatternConfiguration: NodePatternConfiguration): IngestionStrategy {
1112

12-
private val nodeTemplate: String = """
13+
private val mergeNodeTemplate: String = """
1314
|${StreamsUtils.UNWIND}
1415
|MERGE (n:${getLabelsAsString(nodePatternConfiguration.labels)}{${
1516
getNodeMergeKeys("keys", nodePatternConfiguration.keys)
@@ -18,32 +19,47 @@ class NodePatternIngestionStrategy(private val nodePatternConfiguration: NodePat
1819
|SET n += event.keys
1920
""".trimMargin()
2021

21-
override fun mergeNodeEvents(events: Collection<Any>): List<QueryEvents> {
22-
if (events.isNullOrEmpty()) {
23-
return emptyList()
24-
}
22+
private val deleteNodeTemplate: String = """
23+
|${StreamsUtils.UNWIND}
24+
|MATCH (n:${getLabelsAsString(nodePatternConfiguration.labels)}{${
25+
getNodeMergeKeys("keys", nodePatternConfiguration.keys)
26+
}})
27+
|DETACH DELETE n
28+
""".trimMargin()
2529

26-
val data = (events as List<Map<String, Any?>>)
27-
.mapNotNull {
28-
toData(nodePatternConfiguration, it)
29-
}
30-
return listOf(QueryEvents(nodeTemplate, data))
30+
override fun mergeNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
31+
val data = events
32+
.mapNotNull { if (it.value != null) JSONUtils.asMap(it.value) else null }
33+
.mapNotNull { toData(nodePatternConfiguration, it) }
34+
return if (data.isEmpty()) {
35+
emptyList()
36+
} else {
37+
listOf(QueryEvents(mergeNodeTemplate, data))
38+
}
3139
}
3240

33-
override fun deleteNodeEvents(events: Collection<Any>): List<QueryEvents> {
34-
return emptyList()
41+
override fun deleteNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
42+
val data = events
43+
.filter { it.value == null && it.key != null }
44+
.mapNotNull { if (it.key != null) JSONUtils.asMap(it.key) else null }
45+
.mapNotNull { toData(nodePatternConfiguration, it, false) }
46+
return if (data.isEmpty()) {
47+
emptyList()
48+
} else {
49+
listOf(QueryEvents(deleteNodeTemplate, data))
50+
}
3551
}
3652

37-
override fun mergeRelationshipEvents(events: Collection<Any>): List<QueryEvents> {
53+
override fun mergeRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
3854
return emptyList()
3955
}
4056

41-
override fun deleteRelationshipEvents(events: Collection<Any>): List<QueryEvents> {
57+
override fun deleteRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
4258
return emptyList()
4359
}
4460

4561
companion object {
46-
fun toData(nodePatternConfiguration: NodePatternConfiguration, props: Map<String, Any?>): Map<String, Map<String, Any?>>? {
62+
fun toData(nodePatternConfiguration: NodePatternConfiguration, props: Map<String, Any?>, withProperties: Boolean = true): Map<String, Map<String, Any?>>? {
4763
val properties = props.flatten()
4864
val containsKeys = nodePatternConfiguration.keys.all { properties.containsKey(it) }
4965
return if (containsKeys) {
@@ -58,8 +74,12 @@ class NodePatternIngestionStrategy(private val nodePatternConfiguration: NodePat
5874
!nodePatternConfiguration.keys.contains(key) && containsProp
5975
}
6076
}
61-
mapOf("keys" to properties.filterKeys { nodePatternConfiguration.keys.contains(it) },
62-
"properties" to filteredProperties)
77+
if (withProperties) {
78+
mapOf("keys" to properties.filterKeys { nodePatternConfiguration.keys.contains(it) },
79+
"properties" to filteredProperties)
80+
} else {
81+
mapOf("keys" to properties.filterKeys { nodePatternConfiguration.keys.contains(it) })
82+
}
6383
} else {
6484
null
6585
}

common/src/main/kotlin/streams/service/sink/strategy/RelationshipPatternIngestionStrategy.kt

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
package streams.service.sink.strategy
22

33
import streams.extensions.flatten
4+
import streams.serialization.JSONUtils
5+
import streams.service.StreamsSinkEntity
46
import streams.utils.IngestionUtils.containsProp
57
import streams.utils.IngestionUtils.getLabelsAsString
68
import streams.utils.IngestionUtils.getNodeMergeKeys
79
import streams.utils.StreamsUtils
810

911
class RelationshipPatternIngestionStrategy(private val relationshipPatternConfiguration: RelationshipPatternConfiguration): IngestionStrategy {
1012

11-
private val relationshipTemplate: String = """
13+
private val mergeRelationshipTemplate: String = """
1214
|${StreamsUtils.UNWIND}
1315
|MERGE (start:${getLabelsAsString(relationshipPatternConfiguration.start.labels)}{${
1416
getNodeMergeKeys("start.keys", relationshipPatternConfiguration.start.keys)
@@ -24,23 +26,29 @@ class RelationshipPatternIngestionStrategy(private val relationshipPatternConfig
2426
|SET r = event.properties
2527
""".trimMargin()
2628

27-
private val needsToBeFlattened = relationshipPatternConfiguration.properties.isEmpty()
28-
|| relationshipPatternConfiguration.properties.any { it.contains(".") }
29-
29+
private val deleteRelationshipTemplate: String = """
30+
|${StreamsUtils.UNWIND}
31+
|MATCH (start:${getLabelsAsString(relationshipPatternConfiguration.start.labels)}{${
32+
getNodeMergeKeys("start.keys", relationshipPatternConfiguration.start.keys)
33+
}})
34+
|MATCH (end:${getLabelsAsString(relationshipPatternConfiguration.end.labels)}{${
35+
getNodeMergeKeys("end.keys", relationshipPatternConfiguration.end.keys)
36+
}})
37+
|MATCH (start)-[r:${relationshipPatternConfiguration.relType}]->(end)
38+
|DELETE r
39+
""".trimMargin()
3040

31-
override fun mergeNodeEvents(events: Collection<Any>): List<QueryEvents> {
41+
override fun mergeNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
3242
return emptyList()
3343
}
3444

35-
override fun deleteNodeEvents(events: Collection<Any>): List<QueryEvents> {
45+
override fun deleteNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
3646
return emptyList()
3747
}
3848

39-
override fun mergeRelationshipEvents(events: Collection<Any>): List<QueryEvents> {
40-
if (events.isNullOrEmpty()) {
41-
return emptyList()
42-
}
43-
val data = (events as List<Map<String, Any?>>)
49+
override fun mergeRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
50+
val data = events
51+
.mapNotNull { if (it.value != null) JSONUtils.asMap(it.value) else null }
4452
.mapNotNull { props ->
4553
val properties = props.flatten()
4654
val containsKeys = relationshipPatternConfiguration.start.keys.all { properties.containsKey(it) }
@@ -68,7 +76,11 @@ class RelationshipPatternIngestionStrategy(private val relationshipPatternConfig
6876
null
6977
}
7078
}
71-
return listOf(QueryEvents(relationshipTemplate, data))
79+
return if (data.isEmpty()) {
80+
emptyList()
81+
} else {
82+
listOf(QueryEvents(mergeRelationshipTemplate, data))
83+
}
7284
}
7385

7486
private fun isRelationshipProperty(propertyName: String): Boolean {
@@ -78,8 +90,31 @@ class RelationshipPatternIngestionStrategy(private val relationshipPatternConfig
7890
&& !relationshipPatternConfiguration.end.properties.contains(propertyName))
7991
}
8092

81-
override fun deleteRelationshipEvents(events: Collection<Any>): List<QueryEvents> {
82-
return emptyList()
93+
override fun deleteRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
94+
val data = events
95+
.filter { it.value == null && it.key != null }
96+
.mapNotNull { if (it.key != null) JSONUtils.asMap(it.key) else null }
97+
.mapNotNull { props ->
98+
val properties = props.flatten()
99+
val containsKeys = relationshipPatternConfiguration.start.keys.all { properties.containsKey(it) }
100+
&& relationshipPatternConfiguration.end.keys.all { properties.containsKey(it) }
101+
if (containsKeys) {
102+
val startConf = relationshipPatternConfiguration.start
103+
val endConf = relationshipPatternConfiguration.end
104+
105+
val start = NodePatternIngestionStrategy.toData(startConf, props)
106+
val end = NodePatternIngestionStrategy.toData(endConf, props)
107+
108+
mapOf("start" to start, "end" to end)
109+
} else {
110+
null
111+
}
112+
}
113+
return if (data.isEmpty()) {
114+
emptyList()
115+
} else {
116+
listOf(QueryEvents(deleteRelationshipTemplate, data))
117+
}
83118
}
84119

85120
}

0 commit comments

Comments
 (0)