Skip to content

Commit 8c7927b

Browse files
authored
Fixes #305: CDC and Kafka Log Compaction (v 3.5) (#434)
* Fixes #272: Allow to publish messages with key in Kafka topic (4.0) * WIP: rebase issue-272-3.5 * WIP: changes review * WIP: tests * tests * changes review * last changes review * changes review except enterprise tests * changes after merging 4.0 version * removed wildcard imports - statement from setUp KafkaEventRouterBaseIT * changes after rebase
1 parent a947457 commit 8c7927b

File tree

10 files changed

+967
-109
lines changed

10 files changed

+967
-109
lines changed

doc/asciidoc/producer/configuration.adoc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ kafka.replication=1
1414
kafka.linger.ms=1
1515
kafka.transactional.id=
1616
kafka.topic.discovery.polling.interval=300000
17+
kafka.streams.log.compaction.strategy=delete
1718
1819
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
1920
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
@@ -35,4 +36,13 @@ all the messages sent to topics that don't exist are discarded;
3536
this because the `KafkaProducer.send()` method blocks the execution, as explained in https://issues.apache.org/jira/browse/KAFKA-3539[KAFKA-3539].
3637
You can tune the custom property `kafka.topic.discovery.polling.interval` in order to
3738
periodically check for new topics into the Kafka cluster so the plugin will be able
38-
to send events to the defined topics.
39+
to send events to the defined topics.
40+
41+
With `kafka.streams.log.compaction.strategy=delete` will be generated a sequence of unique keys with Neo4j Streams Source.
42+
instead with `kafka.streams.log.compaction.strategy=compact` the keys will be adapted to enable
43+
https://kafka.apache.org/documentation.html#compaction[Log Compaction] on the Kafka side.
44+
Please note that delete strategy does not actually delete records, it has this name to match the topic config `cleanup.policy=delete/compact`.
45+
Namely, the operations which will involve the same nodes or relationships, will have the same key.
46+
47+
xref:message-structure.adoc[See 'message structure' section to see key examples]
48+

doc/asciidoc/producer/index.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ To allow insertion of keys in any format (e.g. through `streams.publish` procedu
2525
**the `key.serializer` is set with the `org.apache.kafka.common.serialization.ByteArraySerializer`**
2626
like `value.serializer`
2727

28+
include::message-structure.adoc[]
29+
2830
[#source-patterns]
2931
=== Patterns
3032

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
2+
== Message structure
3+
4+
The message key structure depends on `kafka.streams.log.compaction.strategy`.
5+
6+
With delete is a string: "${meta.txId + meta.txEventId}-${meta.txEventId}".
7+
8+
[source]
9+
----
10+
"[txId+txEventId] - txEventId "
11+
----
12+
13+
where:
14+
15+
* `txId` identifies the transaction that affected the entity
16+
17+
* `txEventId` is a counter that identifies the internal order in which Neo4j handled the specific event
18+
19+
* [txId+txEventId] is the numeric sum of the two previous values
20+
21+
{nbsp}
22+
23+
Instead with compact:
24+
25+
In case of node without constrained label the key is the string value of node id.
26+
27+
In case of node with constrained label, the key is a json with `{ids: mapOfConstaint , labels: listOfLabels}`
28+
29+
For example, with this configuration:
30+
[source]
31+
----
32+
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
33+
kafka.streams.log.compaction.strategy=compact
34+
----
35+
this constraint:
36+
[source, cypher]
37+
----
38+
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE
39+
----
40+
and this query:
41+
[source, cypher]
42+
----
43+
CREATE (:Person {name:'Sherlock', surname: 'Holmes'})
44+
----
45+
We obtain this key:
46+
47+
[source,json]
48+
----
49+
{"ids": {"name": "Sherlock"}, "labels": ["Person"]}
50+
----
51+
{nbsp}
52+
53+
Otherwise, with the same configuration and query as above, but with the constraint:
54+
[source, cypher]
55+
----
56+
CREATE CONSTRAINT ON (p:Person) ASSERT (p.name, p.surname) IS NODE KEY
57+
----
58+
We obtain this key:
59+
60+
[source,json]
61+
----
62+
{"ids": {"surname": "Holmes", "name": "Sherlock"}, "labels": ["Person"]}
63+
----
64+
65+
{nbsp}
66+
67+
In case of relationship, the key is a json with `{start: START_NODE , end: END_NODE, label: typeOfRelationship}` +
68+
START_NODE and END_NODE node follow the same rule as above.
69+
70+
For example, with this configuration:
71+
[source]
72+
----
73+
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
74+
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
75+
kafka.streams.log.compaction.strategy=compact
76+
----
77+
these constraints:
78+
[source, cypher]
79+
----
80+
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
81+
CREATE CONSTRAINT ON (p:Product) ASSERT p.code IS UNIQUE;
82+
----
83+
and these queries:
84+
[source, cypher]
85+
----
86+
CREATE (:Person {name:'Pippo'});
87+
CREATE (p:Product {code:'1367', name: 'Notebook'});
88+
MATCH (pe:Person {name:'Pippo'}), (pr:Product {name:'Notebook'}) MERGE (pe)-[:BUYS]->(pr);
89+
----
90+
We obtain this key:
91+
[source,json]
92+
----
93+
{"start": {"ids": {"name": "Pippo"}, "labels": ["Person"]}, "end": {"ids": {"code": "1367"}, "labels": ["Product"]},
94+
"label": "BUYS"}
95+
----
96+
{nbsp}
97+
98+
Otherwise, with this configuration:
99+
[source]
100+
----
101+
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
102+
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
103+
kafka.streams.log.compaction.strategy=compact
104+
----
105+
without constraints, and with these queries:
106+
[source, cypher]
107+
----
108+
CREATE (:Person {name:'Pippo'})
109+
----
110+
We obtain this key:
111+
[source,json]
112+
----
113+
{"start": "0", "end": "1", "label": "BUYS"}
114+
----

producer/src/main/kotlin/streams/Extensions.kt

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
package streams
22

33
import org.apache.kafka.clients.producer.RecordMetadata
4+
import org.apache.kafka.common.config.TopicConfig
45
import org.neo4j.graphdb.Node
56
import org.neo4j.graphdb.Relationship
67
import org.neo4j.graphdb.schema.ConstraintDefinition
78
import org.neo4j.graphdb.schema.ConstraintType
89
import streams.events.EntityType
10+
import streams.events.NodeChange
11+
import streams.events.NodePayload
12+
import streams.events.RelationshipPayload
13+
import streams.events.Schema
914
import streams.events.StreamsConstraintType
15+
import streams.events.StreamsTransactionEvent
16+
import streams.events.RelationshipNodeChange
17+
import streams.events.OperationType
1018
import streams.extensions.labelNames
19+
import streams.utils.SchemaUtils
1120

1221
fun Node.toMap(): Map<String, Any?> {
1322
return mapOf("id" to id.toString(), "properties" to allProperties, "labels" to labelNames(), "type" to EntityType.node)
@@ -28,7 +37,6 @@ fun RecordMetadata.toMap(): Map<String, Any> = mapOf(
2837
"partition" to partition()
2938
)
3039

31-
3240
fun ConstraintDefinition.streamsConstraintType(): StreamsConstraintType {
3341
return when (this.constraintType) {
3442
ConstraintType.UNIQUENESS, ConstraintType.NODE_KEY -> StreamsConstraintType.UNIQUE
@@ -42,4 +50,33 @@ fun ConstraintDefinition.isNodeConstraint(): Boolean {
4250

4351
fun ConstraintDefinition.isRelationshipConstraint(): Boolean {
4452
return try { this.relationshipType; true } catch (e: IllegalStateException) { false }
45-
}
53+
}
54+
55+
fun StreamsTransactionEvent.asSourceRecordValue(strategy: String): StreamsTransactionEvent? =
56+
if(isStrategyCompact(strategy) && meta.operation == OperationType.deleted) null else this
57+
58+
fun StreamsTransactionEvent.asSourceRecordKey(strategy: String): Any =
59+
when {
60+
isStrategyCompact(strategy) && payload is NodePayload -> nodePayloadAsMessageKey(payload as NodePayload, schema)
61+
isStrategyCompact(strategy) && payload is RelationshipPayload -> relationshipAsMessageKey(payload as RelationshipPayload)
62+
else -> "${meta.txId + meta.txEventId}-${meta.txEventId}"
63+
}
64+
65+
private fun nodePayloadAsMessageKey(payload: NodePayload, schema: Schema) = run {
66+
val nodeChange: NodeChange = payload.after ?: payload.before!!
67+
val labels = nodeChange.labels ?: emptyList()
68+
val props: Map<String, Any> = nodeChange.properties ?: emptyMap()
69+
val keys = SchemaUtils.getNodeKeys(labels, props.keys, schema.constraints)
70+
val ids = props.filterKeys { keys.contains(it) }
71+
72+
if (ids.isEmpty()) payload.id else mapOf("ids" to ids, "labels" to labels)
73+
}
74+
75+
private fun RelationshipNodeChange.toKey(): Any = if (ids.isEmpty()) id else mapOf("ids" to ids, "labels" to labels)
76+
77+
private fun relationshipAsMessageKey(payload: RelationshipPayload) = mapOf(
78+
"start" to payload.start.toKey(),
79+
"end" to payload.end.toKey(),
80+
"label" to payload.label)
81+
82+
fun isStrategyCompact(strategy: String) = strategy == TopicConfig.CLEANUP_POLICY_COMPACT

producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package streams.kafka
33
import org.apache.commons.lang3.StringUtils
44
import org.apache.kafka.clients.CommonClientConfigs
55
import org.apache.kafka.clients.producer.ProducerConfig
6+
import org.apache.kafka.common.config.TopicConfig
67
import org.apache.kafka.common.serialization.ByteArraySerializer
78
import org.apache.kafka.common.serialization.StringSerializer
89
import streams.extensions.getInt
@@ -28,6 +29,7 @@ data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
2829
val transactionalId: String = StringUtils.EMPTY,
2930
val lingerMs: Int = 1,
3031
val topicDiscoveryPollingInterval: Long = TimeUnit.MINUTES.toMillis(5),
32+
val streamsLogCompactionStrategy: String = TopicConfig.CLEANUP_POLICY_DELETE,
3133
val extraProperties: Map<String, String> = emptyMap()) {
3234

3335
companion object {
@@ -55,6 +57,7 @@ data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
5557
lingerMs = config.getInt("linger.ms", default.lingerMs),
5658
topicDiscoveryPollingInterval = config.getOrDefault("topic.discovery.polling.interval",
5759
default.topicDiscoveryPollingInterval).toString().toLong(),
60+
streamsLogCompactionStrategy = config.getOrDefault("streams.log.compaction.strategy", default.streamsLogCompactionStrategy),
5861
extraProperties = extraProperties // for what we don't provide a default configuration
5962
)
6063
}

producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import org.apache.kafka.common.errors.ProducerFencedException
1212
import org.neo4j.logging.Log
1313
import streams.StreamsEventRouter
1414
import streams.StreamsEventRouterConfiguration
15+
import streams.asSourceRecordKey
16+
import streams.asSourceRecordValue
1517
import streams.events.StreamsEvent
1618
import streams.events.StreamsPluginStatus
1719
import streams.events.StreamsTransactionEvent
@@ -71,7 +73,7 @@ class KafkaEventRouter(private val config: Map<String, String>, private val log:
7173
}
7274
}
7375

74-
private fun send(producerRecord: ProducerRecord<ByteArray?, ByteArray>, sync: Boolean = false): Map<String, Any>? {
76+
private fun send(producerRecord: ProducerRecord<ByteArray?, ByteArray?>, sync: Boolean = false): Map<String, Any>? {
7577
if (!kafkaAdminService.isValidTopic(producerRecord.topic())) {
7678
// TODO add logging system here
7779
return null
@@ -112,13 +114,12 @@ class KafkaEventRouter(private val config: Map<String, String>, private val log:
112114
if (log.isDebugEnabled) {
113115
log.debug("Trying to send a transaction event with txId ${event.meta.txId} and txEventId ${event.meta.txEventId} to kafka")
114116
}
115-
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(),
116-
JSONUtils.writeValueAsBytes("${event.meta.txId + event.meta.txEventId}-${event.meta.txEventId}"),
117-
JSONUtils.writeValueAsBytes(event))
117+
val key = JSONUtils.writeValueAsBytes(event.asSourceRecordKey(kafkaConfig.streamsLogCompactionStrategy))
118+
val value = event.asSourceRecordValue(kafkaConfig.streamsLogCompactionStrategy)?.let { JSONUtils.writeValueAsBytes(it) }
119+
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(), key, value)
118120
send(producerRecord)
119121
}
120122

121-
122123
override fun sendEventsSync(topic: String, transactionEvents: List<out StreamsEvent>, config: Map<String, Any?>): List<Map<String, Any>> {
123124
producer?.beginTransaction()
124125

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package streams.integrations
2+
3+
import extension.newDatabase
4+
import org.junit.After
5+
import org.junit.AfterClass
6+
import org.junit.Assume
7+
import org.junit.Before
8+
import org.junit.BeforeClass
9+
import org.junit.Rule
10+
import org.junit.rules.TestName
11+
import org.neo4j.graphdb.factory.GraphDatabaseBuilder
12+
import org.neo4j.kernel.impl.proc.Procedures
13+
import org.neo4j.kernel.internal.GraphDatabaseAPI
14+
import org.neo4j.test.TestGraphDatabaseFactory
15+
import org.testcontainers.containers.KafkaContainer
16+
import streams.procedures.StreamsProcedures
17+
import streams.utils.StreamsUtils
18+
19+
20+
@Suppress("UNCHECKED_CAST", "DEPRECATION")
21+
open class KafkaEventRouterBaseIT {
22+
23+
companion object {
24+
/**
25+
* Kafka TestContainers uses Confluent OSS images.
26+
* We need to keep in mind which is the right Confluent Platform version for the Kafka version this project uses
27+
*
28+
* Confluent Platform | Apache Kafka
29+
* |
30+
* 4.0.x | 1.0.x
31+
* 4.1.x | 1.1.x
32+
* 5.0.x | 2.0.x
33+
*
34+
* Please see also https://docs.confluent.io/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
35+
*/
36+
private const val confluentPlatformVersion = "4.0.2"
37+
@JvmStatic
38+
lateinit var kafka: KafkaContainer
39+
40+
@BeforeClass @JvmStatic
41+
fun setUpContainer() {
42+
var exists = false
43+
StreamsUtils.ignoreExceptions({
44+
kafka = KafkaContainer(confluentPlatformVersion)
45+
kafka.start()
46+
exists = true
47+
}, IllegalStateException::class.java)
48+
Assume.assumeTrue("Kafka container has to exist", exists)
49+
Assume.assumeTrue("Kafka must be running", this::kafka.isInitialized && kafka.isRunning)
50+
}
51+
52+
@AfterClass @JvmStatic
53+
fun tearDownContainer() {
54+
StreamsUtils.ignoreExceptions({
55+
kafka.stop()
56+
}, UninitializedPropertyAccessException::class.java)
57+
}
58+
}
59+
60+
lateinit var db: GraphDatabaseAPI
61+
62+
lateinit var graphDatabaseBuilder: GraphDatabaseBuilder
63+
64+
@Rule
65+
@JvmField
66+
var testName = TestName()
67+
68+
@Before
69+
open fun setUp() {
70+
graphDatabaseBuilder = TestGraphDatabaseFactory()
71+
.newImpermanentDatabaseBuilder()
72+
.setConfig("kafka.bootstrap.servers", kafka.bootstrapServers)
73+
74+
db = graphDatabaseBuilder.newDatabase() as GraphDatabaseAPI
75+
db.dependencyResolver.resolveDependency(Procedures::class.java)
76+
.registerProcedure(StreamsProcedures::class.java, true)
77+
}
78+
79+
@After
80+
fun tearDown() {
81+
db.shutdown()
82+
}
83+
}

0 commit comments

Comments
 (0)