Skip to content

Commit 9bea0ee

Browse files
authored
Fixes #305: CDC and Kafka Log Compaction (#406)
* Fixes #272: Allow to publish messages with key in Kafka topic * code clean * changed git add . with ?: * removed cast * changes review * changed key serializer * added note in producer adoc * added note in producer adoc * added format * changes review * code clean * Fixes #272: Allow to publish messages with key in Kafka topic * changes review * changed key serializer * changes review * issue 305 * added assertions * changed rel key with ifEmpty * changes review * removed not necessary fun * externalize AdminClient * rebase * code clean * changes review * commented pom.xml package * rebase * commentend slf4j-test in pom.xml * assertEventually - log compaction validation - note into adoc * updated doc - is node key test cases - changed compaction strategy * improved tests * moved enterprise compaction test * fixed broken test * fix broken test suite again * moved bootstrapMap * changed LogCompactionTestCommon to object
1 parent 8b93c71 commit 9bea0ee

File tree

13 files changed

+1034
-34
lines changed

13 files changed

+1034
-34
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
== Message structure
2+
3+
The message key structure depends on `kafka.streams.log.compaction.strategy`.
4+
5+
With delete is a string: "${meta.txId + meta.txEventId}-${meta.txEventId}".
6+
7+
[source]
8+
----
9+
"[txId+txEventId] - txEventId "
10+
----
11+
12+
where:
13+
14+
* `txId` identifies the transaction that affected the entity
15+
16+
* `txEventId` is a counter that identifies the internal order in which Neo4j handled the specific event
17+
18+
* [txId+txEventId] is the numeric sum of the two previous values
19+
20+
{nbsp}
21+
22+
Instead with compact:
23+
24+
In case of node without constrained label the key is the string value of node id.
25+
26+
In case of node with constrained label, the key is a json with `{ids: mapOfConstaint , labels: listOfLabels}`
27+
28+
For example, with this configuration:
29+
[source]
30+
----
31+
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
32+
kafka.streams.log.compaction.strategy=compact
33+
----
34+
this constraint:
35+
[source, cypher]
36+
----
37+
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE
38+
----
39+
and this query:
40+
[source, cypher]
41+
----
42+
CREATE (:Person {name:'Sherlock', surname: 'Holmes'})
43+
----
44+
We obtain this key:
45+
46+
[source,json]
47+
----
48+
{"ids": {"name": "Sherlock"}, "labels": ["Person"]}
49+
----
50+
{nbsp}
51+
52+
Otherwise, with the same configuration and query as above, but with the constraint:
53+
[source, cypher]
54+
----
55+
CREATE CONSTRAINT ON (p:Person) ASSERT (p.name, p.surname) IS NODE KEY
56+
----
57+
We obtain this key:
58+
59+
[source,json]
60+
----
61+
{"ids": {"surname": "Holmes", "name": "Sherlock"}, "labels": ["Person"]}
62+
----
63+
64+
{nbsp}
65+
66+
In case of relationship, the key is a json with `{start: START_NODE , end: END_NODE, label: typeOfRelationship}` +
67+
START_NODE and END_NODE node follow the same rule as above.
68+
69+
For example, with this configuration:
70+
[source]
71+
----
72+
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
73+
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
74+
kafka.streams.log.compaction.strategy=compact
75+
----
76+
these constraints:
77+
[source, cypher]
78+
----
79+
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
80+
CREATE CONSTRAINT ON (p:Product) ASSERT p.code IS UNIQUE;
81+
----
82+
and these queries:
83+
[source, cypher]
84+
----
85+
CREATE (:Person {name:'Pippo'});
86+
CREATE (p:Product {code:'1367', name: 'Notebook'});
87+
MATCH (pe:Person {name:'Pippo'}), (pr:Product {name:'Notebook'}) MERGE (pe)-[:BUYS]->(pr);
88+
----
89+
We obtain this key:
90+
[source,json]
91+
----
92+
{"start": {"ids": {"name": "Pippo"}, "labels": ["Person"]}, "end": {"ids": {"code": "1367"}, "labels": ["Product"]},
93+
"label": "BUYS"}
94+
----
95+
{nbsp}
96+
97+
Otherwise, with this configuration:
98+
[source]
99+
----
100+
streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
101+
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
102+
kafka.streams.log.compaction.strategy=compact
103+
----
104+
without constraints, and with these queries:
105+
[source, cypher]
106+
----
107+
CREATE (:Person {name:'Pippo'})
108+
----
109+
We obtain this key:
110+
[source,json]
111+
----
112+
{"start": "0", "end": "1", "label": "BUYS"}
113+
----

doc/docs/modules/ROOT/pages/producer-configuration.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ kafka.replication=1
1919
kafka.linger.ms=1
2020
kafka.transactional.id=
2121
kafka.topic.discovery.polling.interval=300000
22+
kafka.streams.log.compaction.strategy=delete
2223
2324
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
2425
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
@@ -45,6 +46,15 @@ You can tune the custom property `kafka.topic.discovery.polling.interval` in ord
4546
periodically check for new topics into the Kafka cluster so the plugin will be able
4647
to send events to the defined topics.
4748

49+
50+
With `kafka.streams.log.compaction.strategy=delete` will be generated a sequence of unique keys with Neo4j Streams Source.
51+
instead with `kafka.streams.log.compaction.strategy=compact` the keys will be adapted to enable
52+
https://kafka.apache.org/documentation.html#compaction[Log Compaction] on the Kafka side.
53+
Please note that delete strategy does not actually delete records, it has this name to match the topic config `cleanup.policy=delete/compact`.
54+
Namely, the operations which will involve the same nodes or relationships, will have the same key.
55+
56+
xref:message-structure.adoc[See 'message structure' section to see key examples]
57+
4858
=== Multi Database Support
4959

5060
Neo4j 4.0 Enterprise has https://neo4j.com/docs/operations-manual/4.0/manage-databases/[multi-tenancy support],

doc/docs/modules/ROOT/pages/producer.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ To allow insertion of keys in any format (e.g. through `streams.publish` procedu
2121
**the `key.serializer` is set with the `org.apache.kafka.common.serialization.ByteArraySerializer`**
2222
like `value.serializer`
2323

24+
25+
include::message-structure.adoc[]
26+
2427
[#source-patterns]
2528
== Patterns
2629

pom.xml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,12 @@
300300
<scope>test</scope>
301301
</dependency>
302302

303-
<dependency>
304-
<groupId>org.slf4j</groupId>
305-
<artifactId>slf4j-simple</artifactId>
306-
<version>1.7.30</version>
307-
<scope>test</scope>
308-
</dependency>
303+
<!-- <dependency>-->
304+
<!-- <groupId>org.slf4j</groupId>-->
305+
<!-- <artifactId>slf4j-simple</artifactId>-->
306+
<!-- <version>1.7.30</version>-->
307+
<!-- <scope>test</scope>-->
308+
<!-- </dependency>-->
309309

310310
<dependency>
311311
<groupId>org.neo4j.community</groupId>

producer/src/main/kotlin/streams/Extensions.kt

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package streams
22

33
import org.apache.kafka.clients.producer.RecordMetadata
4+
import org.apache.kafka.common.config.TopicConfig
45
import org.neo4j.graphdb.Node
56
import org.neo4j.graphdb.Relationship
67
import org.neo4j.graphdb.schema.ConstraintDefinition
78
import org.neo4j.graphdb.schema.ConstraintType
8-
import streams.events.EntityType
9-
import streams.events.StreamsConstraintType
9+
import streams.events.*
1010
import streams.extensions.labelNames
11+
import streams.utils.SchemaUtils
1112

1213
fun Node.toMap(): Map<String, Any?> {
1314
return mapOf("id" to id.toString(), "properties" to allProperties, "labels" to labelNames(), "type" to EntityType.node)
@@ -41,4 +42,33 @@ fun ConstraintDefinition.isNodeConstraint(): Boolean {
4142

4243
fun ConstraintDefinition.isRelationshipConstraint(): Boolean {
4344
return try { this.relationshipType; true } catch (e: IllegalStateException) { false }
44-
}
45+
}
46+
47+
fun StreamsTransactionEvent.asSourceRecordValue(strategy: String): StreamsTransactionEvent? =
48+
if(isStrategyCompact(strategy) && meta.operation == OperationType.deleted) null else this
49+
50+
fun StreamsTransactionEvent.asSourceRecordKey(strategy: String): Any =
51+
when {
52+
isStrategyCompact(strategy) && payload is NodePayload -> nodePayloadAsMessageKey(payload as NodePayload, schema)
53+
isStrategyCompact(strategy) && payload is RelationshipPayload -> relationshipAsMessageKey(payload as RelationshipPayload)
54+
else -> "${meta.txId + meta.txEventId}-${meta.txEventId}"
55+
}
56+
57+
private fun nodePayloadAsMessageKey(payload: NodePayload, schema: Schema) = run {
58+
val nodeChange: NodeChange = payload.after ?: payload.before!!
59+
val labels = nodeChange.labels ?: emptyList()
60+
val props: Map<String, Any> = nodeChange.properties ?: emptyMap()
61+
val keys = SchemaUtils.getNodeKeys(labels, props.keys, schema.constraints)
62+
val ids = props.filterKeys { keys.contains(it) }
63+
64+
if (ids.isEmpty()) payload.id else mapOf("ids" to ids, "labels" to labels)
65+
}
66+
67+
private fun RelationshipNodeChange.toKey(): Any = if (ids.isEmpty()) id else mapOf("ids" to ids, "labels" to labels)
68+
69+
private fun relationshipAsMessageKey(payload: RelationshipPayload) = mapOf(
70+
"start" to payload.start.toKey(),
71+
"end" to payload.end.toKey(),
72+
"label" to payload.label)
73+
74+
private fun isStrategyCompact(strategy: String) = strategy == TopicConfig.CLEANUP_POLICY_COMPACT

producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@ package streams.kafka
33
import org.apache.commons.lang3.StringUtils
44
import org.apache.kafka.clients.CommonClientConfigs
55
import org.apache.kafka.clients.producer.ProducerConfig
6+
import org.apache.kafka.common.config.TopicConfig
67
import org.apache.kafka.common.serialization.ByteArraySerializer
7-
import org.apache.kafka.common.serialization.StringSerializer
8+
import org.neo4j.logging.Log
89
import streams.extensions.getInt
910
import streams.extensions.toPointCase
1011
import streams.utils.JSONUtils
1112
import streams.utils.ValidationUtils.validateConnection
1213
import java.util.Properties
1314
import java.util.concurrent.TimeUnit
1415

16+
enum class LogStrategy { delete, compact }
17+
1518
private val configPrefix = "kafka."
1619

1720
data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
@@ -28,6 +31,7 @@ data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
2831
val transactionalId: String = StringUtils.EMPTY,
2932
val lingerMs: Int = 1,
3033
val topicDiscoveryPollingInterval: Long = TimeUnit.MINUTES.toMillis(5),
34+
val streamsLogCompactionStrategy: String = LogStrategy.delete.toString(),
3135
val extraProperties: Map<String, String> = emptyMap()) {
3236

3337
companion object {
@@ -55,19 +59,26 @@ data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
5559
lingerMs = config.getInt("linger.ms", default.lingerMs),
5660
topicDiscoveryPollingInterval = config.getOrDefault("topic.discovery.polling.interval",
5761
default.topicDiscoveryPollingInterval).toString().toLong(),
62+
streamsLogCompactionStrategy = config.getOrDefault("streams.log.compaction.strategy", default.streamsLogCompactionStrategy),
5863
extraProperties = extraProperties // for what we don't provide a default configuration
5964
)
6065
}
6166

62-
fun from(cfg: Map<String, String>): KafkaConfiguration {
67+
fun from(cfg: Map<String, String>, log: Log): KafkaConfiguration {
6368
val kafkaCfg = create(cfg)
64-
validate(kafkaCfg, cfg)
69+
validate(kafkaCfg, cfg, log)
6570
return kafkaCfg
6671
}
6772

68-
private fun validate(config: KafkaConfiguration, rawConfig: Map<String, String>) {
73+
private fun validate(config: KafkaConfiguration, rawConfig: Map<String, String>, log: Log? = null) {
6974
validateConnection(config.zookeeperConnect, "zookeeper.connect", false)
7075
validateConnection(config.bootstrapServers, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, false)
76+
try {
77+
LogStrategy.valueOf(config.streamsLogCompactionStrategy)
78+
} catch (e: IllegalArgumentException) {
79+
log?.warn("Invalid log compaction strategy setting, switching to default value ${TopicConfig.CLEANUP_POLICY_DELETE}")
80+
config.streamsLogCompactionStrategy.apply { LogStrategy.delete.toString() }
81+
}
7182
}
7283

7384
}

producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import org.neo4j.logging.Log
1111
import org.neo4j.logging.internal.LogService
1212
import streams.StreamsEventRouter
1313
import streams.StreamsEventRouterConfiguration
14+
import streams.asSourceRecordKey
15+
import streams.asSourceRecordValue
1416
import streams.config.StreamsConfig
1517
import streams.events.StreamsEvent
1618
import streams.events.StreamsTransactionEvent
@@ -42,7 +44,7 @@ class KafkaEventRouter: StreamsEventRouter {
4244

4345
override fun start() {
4446
log.info("Initialising Kafka Connector")
45-
kafkaConfig = KafkaConfiguration.from(config.config)
47+
kafkaConfig = KafkaConfiguration.from(config.config, log)
4648
val props = kafkaConfig.asProperties()
4749
val definedTopics = StreamsEventRouterConfiguration
4850
.from(config, dbName)
@@ -59,7 +61,7 @@ class KafkaEventRouter: StreamsEventRouter {
5961
StreamsUtils.ignoreExceptions({ kafkaAdminService.stop() }, Exception::class.java)
6062
}
6163

62-
private fun send(producerRecord: ProducerRecord<ByteArray?, ByteArray>, sync: Boolean = false): Map<String, Any>? {
64+
private fun send(producerRecord: ProducerRecord<ByteArray?, ByteArray?>, sync: Boolean = false): Map<String, Any>? {
6365
if (!kafkaAdminService.isValidTopic(producerRecord.topic())) {
6466
if (log.isDebugEnabled) {
6567
log.debug("Error while sending record to ${producerRecord.topic()}, because it doesn't exists")
@@ -104,9 +106,9 @@ class KafkaEventRouter: StreamsEventRouter {
104106
if (log.isDebugEnabled) {
105107
log.debug("Trying to send a transaction event with txId ${event.meta.txId} and txEventId ${event.meta.txEventId} to kafka")
106108
}
107-
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(),
108-
JSONUtils.writeValueAsBytes("${event.meta.txId + event.meta.txEventId}-${event.meta.txEventId}"),
109-
JSONUtils.writeValueAsBytes(event))
109+
val key = JSONUtils.writeValueAsBytes(event.asSourceRecordKey(kafkaConfig.streamsLogCompactionStrategy))
110+
val value = event.asSourceRecordValue(kafkaConfig.streamsLogCompactionStrategy)?.let { JSONUtils.writeValueAsBytes(it) }
111+
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(), key, value)
110112
send(producerRecord)
111113
}
112114

0 commit comments

Comments
 (0)