Skip to content

Commit 8a16d91

Browse files
authored
Fixes #304: : Random selection of partitions and order guarantees (#435)
* Fixes #272: Allow to publish messages with key in Kafka topic * changed git add . with ?: * removed cast * changes review * changed key serializer * added note in producer adoc * changes review * code clean * Fixes #272: Allow to publish messages with key in Kafka topic * changes review * changed key serializer * changes review * rebase * code clean * rebase * commentend slf4j-test in pom.xml * assertEventually - log compaction validation - note into adoc * updated doc - is node key test cases - changed compaction strategy * wip - fixes #304 * removed num.partition * rebase * code clean * removed empty files * added comments, renamed test class and methods * refactor - create checkPartitionEquality method * added doc note
1 parent 3bc0a26 commit 8a16d91

File tree

8 files changed

+145
-25
lines changed

8 files changed

+145
-25
lines changed

doc/docs/modules/ROOT/pages/kafka-ssl.adoc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ kafka.group.id=neo4j
9393
streams.sink.dlq=neo4j-dlq
9494
9595
kafka.acks=all
96-
kafka.num.partitions=1
9796
kafka.retries=2
9897
kafka.batch.size=16384
9998
kafka.buffer.memory=33554432

doc/docs/modules/ROOT/pages/producer-configuration.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ You can set the following configuration values in your `neo4j.conf`, here are th
88
kafka.zookeeper.connect=localhost:2181
99
kafka.bootstrap.servers=localhost:9092
1010
kafka.acks=1
11-
kafka.num.partitions=1
1211
kafka.retries=2
1312
kafka.batch.size=16384
1413
kafka.buffer.memory=33554432
@@ -53,6 +52,8 @@ https://kafka.apache.org/documentation.html#compaction[Log Compaction] on the Ka
5352
Please note that delete strategy does not actually delete records, it has this name to match the topic config `cleanup.policy=delete/compact`.
5453
Namely, the operations which will involve the same nodes or relationships, will have the same key.
5554

55+
When `kafka.streams.log.compaction.strategy=compact`, for partitioning we leverage internal Kafka mechanism.
56+
5657
xref:message-structure.adoc[See 'message structure' section to see key examples]
5758

5859
=== Multi Database Support

producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ private val configPrefix = "kafka."
2020
data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
2121
val bootstrapServers: String = "localhost:9092",
2222
val acks: String = "1",
23-
val numPartitions: Int = 1,
2423
val retries: Int = 2,
2524
val batchSize: Int = 16384,
2625
val bufferMemory: Int = 33554432,
@@ -47,7 +46,6 @@ data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
4746
return default.copy(zookeeperConnect = config.getOrDefault("zookeeper.connect",default.zookeeperConnect),
4847
bootstrapServers = config.getOrDefault("bootstrap.servers", default.bootstrapServers),
4948
acks = config.getOrDefault("acks", default.acks),
50-
numPartitions = config.getInt("num.partitions", default.numPartitions),
5149
retries = config.getInt("retries", default.retries),
5250
batchSize = config.getInt("batch.size", default.batchSize),
5351
bufferMemory = config.getInt("buffer.memory", default.bufferMemory),

producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,25 @@ package streams.kafka
22

33
import org.apache.kafka.clients.producer.KafkaProducer
44
import org.apache.kafka.clients.producer.ProducerRecord
5-
import org.apache.kafka.clients.producer.RecordMetadata
65
import org.apache.kafka.common.KafkaException
76
import org.apache.kafka.common.errors.AuthorizationException
87
import org.apache.kafka.common.errors.OutOfOrderSequenceException
98
import org.apache.kafka.common.errors.ProducerFencedException
109
import org.neo4j.logging.Log
1110
import org.neo4j.logging.internal.LogService
12-
import streams.StreamsEventRouter
1311
import streams.StreamsEventRouterConfiguration
1412
import streams.asSourceRecordKey
1513
import streams.asSourceRecordValue
14+
import streams.toMap
15+
import streams.StreamsEventRouter
1616
import streams.config.StreamsConfig
1717
import streams.events.StreamsEvent
1818
import streams.events.StreamsTransactionEvent
1919
import streams.utils.JSONUtils
2020
import streams.utils.KafkaValidationUtils.getInvalidTopicsError
2121
import streams.utils.StreamsUtils
22-
import streams.toMap
2322
import java.util.Properties
2423
import java.util.UUID
25-
import java.util.concurrent.ThreadLocalRandom
2624

2725

2826
class KafkaEventRouter: StreamsEventRouter {
@@ -95,8 +93,9 @@ class KafkaEventRouter: StreamsEventRouter {
9593
// in the procedures we allow to define a custom message key via the configuration property key
9694
// in order to have the backwards compatibility we define as default value the old key
9795
val key = config.getOrDefault("key", UUID.randomUUID().toString())
96+
val partition = (config["partition"])?.toString()?.toInt()
9897

99-
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(), key?.let { JSONUtils.writeValueAsBytes(it) },
98+
val producerRecord = ProducerRecord(topic, partition, System.currentTimeMillis(), key?.let { JSONUtils.writeValueAsBytes(it) },
10099
JSONUtils.writeValueAsBytes(event))
101100
return send(producerRecord, sync)
102101
}
@@ -108,7 +107,8 @@ class KafkaEventRouter: StreamsEventRouter {
108107
}
109108
val key = JSONUtils.writeValueAsBytes(event.asSourceRecordKey(kafkaConfig.streamsLogCompactionStrategy))
110109
val value = event.asSourceRecordValue(kafkaConfig.streamsLogCompactionStrategy)?.let { JSONUtils.writeValueAsBytes(it) }
111-
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(), key, value)
110+
111+
val producerRecord = ProducerRecord(topic, null, System.currentTimeMillis(), key, value)
112112
send(producerRecord)
113113
}
114114

@@ -149,8 +149,6 @@ class KafkaEventRouter: StreamsEventRouter {
149149
}
150150
}
151151

152-
private fun getPartition(config: Map<String, Any?>) = config.getOrDefault("partition", ThreadLocalRandom.current().nextInt(kafkaConfig.numPartitions)).toString().toInt()
153-
154152
}
155153

156154
class Neo4jKafkaProducer<K, V>: KafkaProducer<K, V> {

producer/src/test/kotlin/streams/integrations/KafkaEventRouterLogCompactionTSE.kt renamed to producer/src/test/kotlin/streams/integrations/KafkaEventRouterCompactionStrategyTSE.kt

Lines changed: 127 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package streams.integrations
22

3+
import org.apache.kafka.clients.consumer.ConsumerRecord
34
import org.apache.kafka.common.config.TopicConfig
45
import org.junit.Test
56
import org.neo4j.internal.helpers.collection.Iterators
@@ -13,7 +14,7 @@ import java.time.Duration
1314
import java.util.*
1415
import kotlin.test.*
1516

16-
class KafkaEventRouterLogCompactionTSE : KafkaEventRouterBaseTSE() {
17+
class KafkaEventRouterCompactionStrategyTSE : KafkaEventRouterBaseTSE() {
1718

1819
private val bootstrapServerMap = mapOf("bootstrap.servers" to KafkaEventRouterSuiteIT.kafka.bootstrapServers)
1920

@@ -35,7 +36,7 @@ class KafkaEventRouterLogCompactionTSE : KafkaEventRouterBaseTSE() {
3536
fun `compact message with streams publish`() {
3637
val topic = UUID.randomUUID().toString()
3738
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT)
38-
KafkaLogCompactionTestCommon.createCompactTopic(topic, bootstrapServerMap)
39+
KafkaLogCompactionTestCommon.createTopic(topic, bootstrapServerMap)
3940

4041
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
4142
kafkaConsumer.subscribe(listOf(topic))
@@ -73,7 +74,7 @@ class KafkaEventRouterLogCompactionTSE : KafkaEventRouterBaseTSE() {
7374
"streams.source.topic.relationships.$topic" to "$keyRel{*}")
7475
val queries = listOf("CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE")
7576
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT, sourceTopics, queries)
76-
KafkaLogCompactionTestCommon.createCompactTopic(topic, bootstrapServerMap)
77+
KafkaLogCompactionTestCommon.createTopic(topic, bootstrapServerMap)
7778

7879
kafkaConsumer.subscribe(listOf(topic))
7980

@@ -106,7 +107,7 @@ class KafkaEventRouterLogCompactionTSE : KafkaEventRouterBaseTSE() {
106107
val sourceTopics = mapOf("streams.source.topic.nodes.$topic" to "Person{*}",
107108
"streams.source.topic.relationships.$topic" to "$relType{*}")
108109
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT, sourceTopics)
109-
KafkaLogCompactionTestCommon.createCompactTopic(topic, bootstrapServerMap)
110+
KafkaLogCompactionTestCommon.createTopic(topic, bootstrapServerMap)
110111

111112
kafkaConsumer.subscribe(listOf(topic))
112113

@@ -139,7 +140,7 @@ class KafkaEventRouterLogCompactionTSE : KafkaEventRouterBaseTSE() {
139140
val topic = UUID.randomUUID().toString()
140141
val sourceTopics = mapOf("streams.source.topic.nodes.$topic" to "Person{*}")
141142
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT, sourceTopics)
142-
KafkaLogCompactionTestCommon.createCompactTopic(topic, bootstrapServerMap)
143+
KafkaLogCompactionTestCommon.createTopic(topic, bootstrapServerMap)
143144

144145
kafkaConsumer.subscribe(listOf(topic))
145146

@@ -165,7 +166,7 @@ class KafkaEventRouterLogCompactionTSE : KafkaEventRouterBaseTSE() {
165166
val sourceTopics = mapOf("streams.source.topic.nodes.$topic" to "Person{*}")
166167
val queries = listOf("CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE")
167168
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT, sourceTopics, queries)
168-
KafkaLogCompactionTestCommon.createCompactTopic(topic, bootstrapServerMap)
169+
KafkaLogCompactionTestCommon.createTopic(topic, bootstrapServerMap)
169170

170171
kafkaConsumer.subscribe(listOf(topic))
171172

@@ -612,4 +613,124 @@ class KafkaEventRouterLogCompactionTSE : KafkaEventRouterBaseTSE() {
612613
val meta = JSONUtils.asStreamsTransactionEvent(record.value()).meta
613614
assertEquals(createProducerRecordKeyForDeleteStrategy(meta), JSONUtils.readValue(record.key()))
614615
}
616+
617+
@Test
618+
fun `same nodes and entities in same partitions with strategy compact and without constraints`() {
619+
620+
// db without constraints
621+
createTopicAndEntitiesAndAssertPartition(false,
622+
"0", "1", "3", "4")
623+
}
624+
625+
@Test
626+
fun `same nodes and entities in same partitions with strategy compact and with constraints`() {
627+
628+
val personLabel = "Person"
629+
val otherLabel = "Other"
630+
631+
val expectedKeyPippo = mapOf("ids" to mapOf("name" to "Pippo"), "labels" to listOf(personLabel))
632+
val expectedKeyPluto = mapOf("ids" to mapOf("name" to "Pluto"), "labels" to listOf(personLabel, otherLabel))
633+
val expectedKeyFoo = mapOf("ids" to mapOf("name" to "Foo"), "labels" to listOf(personLabel))
634+
val expectedKeyBar = mapOf("ids" to mapOf("name" to "Bar"), "labels" to listOf(personLabel, otherLabel))
635+
636+
// db with unique constraint
637+
createTopicAndEntitiesAndAssertPartition(true, expectedKeyPippo, expectedKeyPluto, expectedKeyFoo, expectedKeyBar)
638+
}
639+
640+
// we create a topic with kafka.streams.log.compaction.strategy=compact
641+
// after that, we create and update some nodes and relationships
642+
// finally, we check that each node/relationship has no records spread across multiple partitions but only in a single partition
643+
private fun createTopicAndEntitiesAndAssertPartition(withConstraints: Boolean,
644+
firstExpectedKey: Any,
645+
secondExpectedKey: Any,
646+
thirdExpectedKey: Any,
647+
fourthExpectedKey: Any) {
648+
val relType = "KNOWS"
649+
650+
// we create a topic with strategy compact
651+
val topic = UUID.randomUUID().toString()
652+
val sourceTopics = mapOf("streams.source.topic.nodes.$topic" to "Person{*}",
653+
"streams.source.topic.relationships.$topic" to "$relType{*}",
654+
"kafka.num.partitions" to "10" )
655+
// we optionally create a constraint for Person.name property
656+
val queries = if(withConstraints) listOf("CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE") else null
657+
658+
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT, sourceTopics, queries)
659+
KafkaLogCompactionTestCommon.createTopic(topic, bootstrapServerMap, 10, false)
660+
661+
kafkaConsumer.subscribe(listOf(topic))
662+
663+
// we create some nodes and rels
664+
db.execute("CREATE (:Person {name:'Pippo', surname: 'Pippo_1'})")
665+
db.execute("CREATE (:Person:Other {name:'Pluto', surname: 'Pluto_1'})")
666+
db.execute("CREATE (:Person:Other {name:'Paperino', surname: 'Paperino_1'})")
667+
db.execute("CREATE (:Person {name:'Foo'})")
668+
db.execute("CREATE (:Person:Other {name:'Bar'})")
669+
db.execute("CREATE (:Person {name:'Baz'})")
670+
db.execute("CREATE (:Person {name:'Baa'})")
671+
db.execute("CREATE (:Person {name:'One'})")
672+
db.execute("CREATE (:Person {name:'Two'})")
673+
db.execute("CREATE (:Person {name:'Three'})")
674+
db.execute("""
675+
|MATCH (pippo:Person {name:'Pippo'})
676+
|MATCH (pluto:Person {name:'Pluto'})
677+
|MERGE (pippo)-[:KNOWS]->(pluto)
678+
""".trimMargin())
679+
db.execute("""
680+
|MATCH (foo:Person {name:'Foo'})
681+
|MATCH (bar:Person {name:'Bar'})
682+
|MERGE (foo)-[:KNOWS]->(bar)
683+
""".trimMargin())
684+
db.execute("""
685+
|MATCH (one:Person {name:'One'})
686+
|MATCH (two:Person {name:'Two'})
687+
|MERGE (one)-[:KNOWS]->(two)
688+
""".trimMargin())
689+
690+
// we update 4 nodes and 2 rels
691+
db.execute("MATCH (p:Person {name:'Pippo'}) SET p.surname = 'Pippo_update'")
692+
db.execute("MATCH (p:Person {name:'Pippo'}) SET p.address = 'Rome'")
693+
694+
db.execute("MATCH (p:Person {name:'Pluto'}) SET p.surname = 'Pluto_update'")
695+
db.execute("MATCH (p:Person {name:'Pluto'}) SET p.address = 'London'")
696+
697+
db.execute("MATCH (p:Person {name:'Foo'}) SET p.surname = 'Foo_update'")
698+
db.execute("MATCH (p:Person {name:'Foo'}) SET p.address = 'Rome'")
699+
700+
db.execute("MATCH (p:Person {name:'Bar'}) SET p.surname = 'Bar_update'")
701+
db.execute("MATCH (p:Person {name:'Bar'}) SET p.address = 'Tokyo'")
702+
703+
db.execute("MATCH (:Person {name:'Foo'})-[rel:KNOWS]->(:Person {name:'Bar'}) SET rel.since = 1999")
704+
db.execute("MATCH (:Person {name:'Pippo'})-[rel:KNOWS]->(:Person {name:'Pluto'}) SET rel.since = 2019")
705+
706+
val records = kafkaConsumer.poll(Duration.ofSeconds(30))
707+
708+
assertEquals(23, records.count())
709+
710+
// we take the records for each node
711+
val firstRecordNode = records.filter { JSONUtils.readValue<Any>(it.key()) == firstExpectedKey }
712+
val secondRecordNode = records.filter { JSONUtils.readValue<Any>(it.key()) == secondExpectedKey }
713+
val thirdRecordNode = records.filter { JSONUtils.readValue<Any>(it.key()) == thirdExpectedKey }
714+
val fourthRecordNode = records.filter { JSONUtils.readValue<Any>(it.key()) == fourthExpectedKey }
715+
val firstRecordRel = records.filter { JSONUtils.readValue<Any>(it.key()) == mapOf("start" to thirdExpectedKey, "end" to fourthExpectedKey, "label" to relType) }
716+
val secondRecordRel = records.filter { JSONUtils.readValue<Any>(it.key()) == mapOf("start" to firstExpectedKey, "end" to secondExpectedKey, "label" to relType) }
717+
718+
// we check that all queries produced record
719+
assertEquals(3, firstRecordNode.count())
720+
assertEquals(3, secondRecordNode.count())
721+
assertEquals(3, thirdRecordNode.count())
722+
assertEquals(3, fourthRecordNode.count())
723+
assertEquals(2, firstRecordRel.count())
724+
assertEquals(2, secondRecordRel.count())
725+
726+
// we check that each node/relationship has no records spread across multiple partitions
727+
assertEquals(1, checkPartitionEquality(firstRecordNode))
728+
assertEquals(1, checkPartitionEquality(secondRecordNode))
729+
assertEquals(1, checkPartitionEquality(thirdRecordNode))
730+
assertEquals(1, checkPartitionEquality(fourthRecordNode))
731+
assertEquals(1, checkPartitionEquality(firstRecordRel))
732+
assertEquals(1, checkPartitionEquality(secondRecordRel))
733+
}
734+
735+
private fun checkPartitionEquality(records: List<ConsumerRecord<String, ByteArray>>) = records.groupBy { it.partition() }.count()
615736
}

producer/src/test/kotlin/streams/integrations/KafkaEventRouterSuiteIT.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import streams.utils.StreamsUtils
1717
KafkaEventRouterSimpleTSE::class,
1818
KafkaEventRouterWithConstraintsTSE::class,
1919
KafkaEventRouterEnterpriseTSE::class,
20-
KafkaEventRouterLogCompactionTSE::class
20+
KafkaEventRouterCompactionStrategyTSE::class
2121
)
2222
class KafkaEventRouterSuiteIT {
2323

producer/src/test/kotlin/streams/integrations/KafkaLogCompactionTestCommon.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,21 @@ import java.util.concurrent.TimeUnit
1212

1313
object KafkaLogCompactionTestCommon {
1414

15-
private fun compactTopic(topic: String) =
16-
NewTopic(topic, 1, 1).configs(mapOf(
15+
private fun createTopic(topic: String, numTopics: Int, withCompact: Boolean) = run {
16+
val newTopic = NewTopic(topic, numTopics, 1)
17+
if (withCompact) {
18+
newTopic.configs(mapOf(
1719
"cleanup.policy" to "compact",
1820
"segment.ms" to "10",
1921
"retention.ms" to "1",
2022
"min.cleanable.dirty.ratio" to "0.01"))
23+
}
24+
newTopic
25+
}
2126

22-
fun createCompactTopic(topic: String, bootstrapServerMap: Map<String, Any>) {
27+
fun createTopic(topic: String, bootstrapServerMap: Map<String, Any>, numTopics: Int = 1, withCompact: Boolean = true) {
2328
AdminClient.create(bootstrapServerMap).use {
24-
val topics = listOf(compactTopic(topic))
29+
val topics = listOf(createTopic(topic, numTopics, withCompact))
2530
it.createTopics(topics).all().get()
2631
}
2732
}

producer/src/test/kotlin/streams/kafka/KafkaConfigurationTest.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ class KafkaConfigurationTest {
1212
val map = mapOf("kafka.zookeeper.connect" to "zookeeper:1234",
1313
"kafka.bootstrap.servers" to "kafka:5678",
1414
"kafka.acks" to "10",
15-
"kafka.num.partitions" to 1,
1615
"kafka.retries" to 1,
1716
"kafka.batch.size" to 10,
1817
"kafka.buffer.memory" to 1000,
@@ -37,7 +36,6 @@ class KafkaConfigurationTest {
3736
assertEquals(map["kafka.zookeeper.connect"], properties["zookeeper.connect"])
3837
assertEquals(map["kafka.bootstrap.servers"], properties["bootstrap.servers"])
3938
assertEquals(map["kafka.acks"], properties["acks"])
40-
assertEquals(map["kafka.num.partitions"], properties["num.partitions"])
4139
assertEquals(map["kafka.retries"], properties["retries"])
4240
assertEquals(map["kafka.batch.size"], properties["batch.size"])
4341
assertEquals(map["kafka.buffer.memory"], properties["buffer.memory"])

0 commit comments

Comments
 (0)