Skip to content

Commit aaac1ea

Browse files
authored
Fixes #304: Random selection of partitions and order guarantees (#452)
1 parent 8f986d6 commit aaac1ea

File tree

6 files changed

+139
-17
lines changed

6 files changed

+139
-17
lines changed

doc/asciidoc/kafka-ssl/index.adoc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ kafka.group.id=neo4j
9191
streams.sink.dlq=neo4j-dlq
9292
9393
kafka.acks=all
94-
kafka.num.partitions=1
9594
kafka.retries=2
9695
kafka.batch.size=16384
9796
kafka.buffer.memory=33554432

doc/asciidoc/producer/configuration.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
kafka.zookeeper.connect=localhost:2181
44
kafka.bootstrap.servers=localhost:9092
55
kafka.acks=1
6-
kafka.num.partitions=1
76
kafka.retries=2
87
kafka.batch.size=16384
98
kafka.buffer.memory=33554432
@@ -44,5 +43,7 @@ https://kafka.apache.org/documentation.html#compaction[Log Compaction] on the Ka
4443
Please note that delete strategy does not actually delete records, it has this name to match the topic config `cleanup.policy=delete/compact`.
4544
Namely, the operations which will involve the same nodes or relationships, will have the same key.
4645

46+
When `kafka.streams.log.compaction.strategy=compact`, for partitioning we leverage internal Kafka mechanism.
47+
4748
xref:message-structure.adoc[See 'message structure' section to see key examples]
4849

producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ private val configPrefix = "kafka."
1818
data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
1919
val bootstrapServers: String = "localhost:9092",
2020
val acks: String = "1",
21-
val numPartitions: Int = 1,
2221
val retries: Int = 2,
2322
val batchSize: Int = 16384,
2423
val bufferMemory: Int = 33554432,
@@ -45,7 +44,6 @@ data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
4544
return default.copy(zookeeperConnect = config.getOrDefault("zookeeper.connect",default.zookeeperConnect),
4645
bootstrapServers = config.getOrDefault("bootstrap.servers", default.bootstrapServers),
4746
acks = config.getOrDefault("acks", default.acks),
48-
numPartitions = config.getInt("num.partitions", default.numPartitions),
4947
retries = config.getInt("retries", default.retries),
5048
batchSize = config.getInt("batch.size", default.batchSize),
5149
bufferMemory = config.getInt("buffer.memory", default.bufferMemory),

producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ class KafkaEventRouter(private val config: Map<String, String>, private val log:
104104
// in the procedures we allow to define a custom message key via the configuration property key
105105
// in order to have the backwards compatibility we define as default value the old key
106106
val key = config.getOrDefault("key", UUID.randomUUID().toString())
107-
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(), key?.let { JSONUtils.writeValueAsBytes(it) },
107+
val partition = (config["partition"])?.toString()?.toInt()
108+
val producerRecord = ProducerRecord(topic, partition, System.currentTimeMillis(), key?.let { JSONUtils.writeValueAsBytes(it) },
108109
JSONUtils.writeValueAsBytes(event))
109110
return send(producerRecord, sync)
110111
}
@@ -116,7 +117,7 @@ class KafkaEventRouter(private val config: Map<String, String>, private val log:
116117
}
117118
val key = JSONUtils.writeValueAsBytes(event.asSourceRecordKey(kafkaConfig.streamsLogCompactionStrategy))
118119
val value = event.asSourceRecordValue(kafkaConfig.streamsLogCompactionStrategy)?.let { JSONUtils.writeValueAsBytes(it) }
119-
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(), key, value)
120+
val producerRecord = ProducerRecord(topic, null, System.currentTimeMillis(), key, value)
120121
send(producerRecord)
121122
}
122123

@@ -135,7 +136,6 @@ class KafkaEventRouter(private val config: Map<String, String>, private val log:
135136
try {
136137
producer?.beginTransaction()
137138
transactionEvents.forEach {
138-
val partition = ThreadLocalRandom.current().nextInt(kafkaConfig.numPartitions)
139139
if (it is StreamsTransactionEvent) {
140140
sendEvent(topic, it, config)
141141
} else {
@@ -158,8 +158,6 @@ class KafkaEventRouter(private val config: Map<String, String>, private val log:
158158
}
159159
}
160160

161-
private fun getPartition(config: Map<String, Any?>) = config.getOrDefault("partition", ThreadLocalRandom.current().nextInt(kafkaConfig.numPartitions)).toString().toInt()
162-
163161
}
164162

165163
class Neo4jKafkaProducer<K, V>: KafkaProducer<K, V> {

producer/src/test/kotlin/streams/integrations/KafkaEventRouterLogCompactionIT.kt renamed to producer/src/test/kotlin/streams/integrations/KafkaEventRouterStrategyCompactIT.kt

Lines changed: 134 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package streams.integrations
33
import extension.newDatabase
44
import org.apache.kafka.clients.admin.AdminClient
55
import org.apache.kafka.clients.admin.NewTopic
6+
import org.apache.kafka.clients.consumer.ConsumerRecord
67
import org.apache.kafka.clients.consumer.ConsumerRecords
78
import org.apache.kafka.clients.consumer.KafkaConsumer
89
import org.apache.kafka.common.config.TopicConfig
@@ -26,7 +27,7 @@ import streams.serialization.JSONUtils
2627
import java.util.concurrent.TimeUnit
2728

2829
@Suppress("DEPRECATION")
29-
class KafkaEventRouterLogCompactionIT: KafkaEventRouterBaseIT() {
30+
class KafkaEventRouterStrategyCompactIT: KafkaEventRouterBaseIT() {
3031

3132
private fun compactTopic(topic: String, numTopics: Int, withCompact: Boolean) = run {
3233
val newTopic = NewTopic(topic, numTopics, 1)
@@ -40,7 +41,7 @@ class KafkaEventRouterLogCompactionIT: KafkaEventRouterBaseIT() {
4041
newTopic
4142
}
4243

43-
private fun createCompactTopic(topic: String, numTopics: Int = 1, withCompact: Boolean = true) {
44+
private fun createTopic(topic: String, numTopics: Int = 1, withCompact: Boolean = true) {
4445
AdminClient.create(mapOf("bootstrap.servers" to kafka.bootstrapServers)).use {
4546
val topics = listOf(compactTopic(topic, numTopics, withCompact))
4647
it.createTopics(topics).all().get()
@@ -75,7 +76,7 @@ class KafkaEventRouterLogCompactionIT: KafkaEventRouterBaseIT() {
7576
fun `compact message with streams publish`() {
7677
val topic = UUID.randomUUID().toString()
7778
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT)
78-
createCompactTopic(topic)
79+
createTopic(topic)
7980
db.dependencyResolver.resolveDependency(Procedures::class.java)
8081
.registerProcedure(StreamsProcedures::class.java, true)
8182

@@ -117,7 +118,7 @@ class KafkaEventRouterLogCompactionIT: KafkaEventRouterBaseIT() {
117118
"streams.source.topic.relationships.$topic" to "KNOWS{*}")
118119
val queries = listOf("CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE")
119120
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT, sourceTopics, queries)
120-
createCompactTopic(topic)
121+
createTopic(topic)
121122
createConsumer(KafkaConfiguration(bootstrapServers = kafka.bootstrapServers)).use { consumer ->
122123
consumer.subscribe(listOf(topic))
123124

@@ -153,7 +154,7 @@ class KafkaEventRouterLogCompactionIT: KafkaEventRouterBaseIT() {
153154
val sourceTopics = mapOf("streams.source.topic.nodes.$topic" to "Person{*}",
154155
"streams.source.topic.relationships.$topic" to "$relType{*}")
155156
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT, sourceTopics)
156-
createCompactTopic(topic)
157+
createTopic(topic)
157158
createConsumer(KafkaConfiguration(bootstrapServers = kafka.bootstrapServers)).use { consumer ->
158159
consumer.subscribe(listOf(topic))
159160

@@ -191,7 +192,7 @@ class KafkaEventRouterLogCompactionIT: KafkaEventRouterBaseIT() {
191192
val sourceTopics = mapOf("streams.source.topic.nodes.$topic" to "Person{*}")
192193
val queries = listOf("CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE")
193194
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT, sourceTopics, queries)
194-
createCompactTopic(topic)
195+
createTopic(topic)
195196
createConsumer(KafkaConfiguration(bootstrapServers = kafka.bootstrapServers)).use { consumer ->
196197
consumer.subscribe(listOf(topic))
197198

@@ -663,4 +664,131 @@ class KafkaEventRouterLogCompactionIT: KafkaEventRouterBaseIT() {
663664
assertEquals(createProducerRecordKeyForDeleteStrategy(meta), JSONUtils.readValue(record.key()))
664665
}
665666
}
667+
668+
@Test
669+
fun `same nodes and entities in same partitions with strategy compact and without constraints`() {
670+
671+
// db without constraints
672+
createTopicAndEntitiesAndAssertPartition(false)
673+
}
674+
675+
@Test
676+
fun `same nodes and entities in same partitions with strategy compact and with constraints`() {
677+
678+
val personLabel = "Person"
679+
val otherLabel = "Other"
680+
681+
val expectedKeyPippo = mapOf("ids" to mapOf("name" to "Pippo"), "labels" to listOf(personLabel))
682+
val expectedKeyPluto = mapOf("ids" to mapOf("name" to "Pluto"), "labels" to listOf(personLabel, otherLabel))
683+
val expectedKeyFoo = mapOf("ids" to mapOf("name" to "Foo"), "labels" to listOf(personLabel))
684+
val expectedKeyBar = mapOf("ids" to mapOf("name" to "Bar"), "labels" to listOf(personLabel, otherLabel))
685+
686+
// db with unique constraint
687+
createTopicAndEntitiesAndAssertPartition(true, expectedKeyPippo, expectedKeyPluto, expectedKeyFoo, expectedKeyBar)
688+
}
689+
690+
// we create a topic with kafka.streams.log.compaction.strategy=compact
691+
// after that, we create and update some nodes and relationships
692+
// finally, we check that each node/relationship has no records spread across multiple partitions but only in a single partition
693+
private fun createTopicAndEntitiesAndAssertPartition(withConstraints: Boolean,
694+
firstKey: Any? = null,
695+
secondKey: Any? = null,
696+
thirdKey: Any? = null,
697+
fourthKey: Any? = null) {
698+
val relType = "KNOWS"
699+
700+
// we create a topic with strategy compact
701+
val topic = UUID.randomUUID().toString()
702+
val sourceTopics = mapOf("streams.source.topic.nodes.$topic" to "Person{*}",
703+
"streams.source.topic.relationships.$topic" to "$relType{*}",
704+
"kafka.num.partitions" to "10"
705+
)
706+
// we optionally create a constraint for Person.name property
707+
val queries = if(withConstraints) listOf("CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE") else null
708+
709+
initDbWithLogStrategy(TopicConfig.CLEANUP_POLICY_COMPACT, sourceTopics, queries)
710+
createTopic(topic, 10, false)
711+
712+
createConsumer(KafkaConfiguration(bootstrapServers = kafka.bootstrapServers)).use { consumer ->
713+
consumer.subscribe(listOf(topic))
714+
715+
// we create some nodes and rels
716+
val idPippo = db.execute("CREATE (n:Person {name:'Pippo', surname: 'Pippo_1'}) RETURN id(n) as id").columnAs<Long>("id").next()
717+
val idPluto = db.execute("CREATE (n:Person:Other {name:'Pluto', surname: 'Pluto_1'}) RETURN id(n) as id").columnAs<Long>("id").next()
718+
val idFoo = db.execute("CREATE (n:Person {name:'Foo'}) RETURN id(n) as id").columnAs<Long>("id").next()
719+
val idBar = db.execute("CREATE (n:Person:Other {name:'Bar'}) RETURN id(n) as id").columnAs<Long>("id").next()
720+
721+
db.execute("CREATE (n:Person:Other {name:'Paperino', surname: 'Paperino_1'})").close()
722+
db.execute("CREATE (:Person {name:'Baz'})").close()
723+
db.execute("CREATE (:Person {name:'Baa'})").close()
724+
db.execute("CREATE (:Person {name:'One'})").close()
725+
db.execute("CREATE (:Person {name:'Two'})").close()
726+
db.execute("CREATE (:Person {name:'Three'})").close()
727+
db.execute("""
728+
|MATCH (pippo:Person {name:'Pippo'})
729+
|MATCH (pluto:Person {name:'Pluto'})
730+
|MERGE (pippo)-[:KNOWS]->(pluto)
731+
""".trimMargin()).close()
732+
db.execute("""
733+
|MATCH (foo:Person {name:'Foo'})
734+
|MATCH (bar:Person {name:'Bar'})
735+
|MERGE (foo)-[:KNOWS]->(bar)
736+
""".trimMargin()).close()
737+
db.execute("""
738+
|MATCH (one:Person {name:'One'})
739+
|MATCH (two:Person {name:'Two'})
740+
|MERGE (one)-[:KNOWS]->(two)
741+
""".trimMargin()).close()
742+
743+
// we update 4 nodes and 2 rels
744+
db.execute("MATCH (p:Person {name:'Pippo'}) SET p.surname = 'Pippo_update'").close()
745+
db.execute("MATCH (p:Person {name:'Pippo'}) SET p.address = 'Rome'").close()
746+
747+
db.execute("MATCH (p:Person {name:'Pluto'}) SET p.surname = 'Pluto_update'").close()
748+
db.execute("MATCH (p:Person {name:'Pluto'}) SET p.address = 'London'").close()
749+
750+
db.execute("MATCH (p:Person {name:'Foo'}) SET p.surname = 'Foo_update'").close()
751+
db.execute("MATCH (p:Person {name:'Foo'}) SET p.address = 'Rome'").close()
752+
753+
db.execute("MATCH (p:Person {name:'Bar'}) SET p.surname = 'Bar_update'").close()
754+
db.execute("MATCH (p:Person {name:'Bar'}) SET p.address = 'Tokyo'").close()
755+
756+
db.execute("MATCH (:Person {name:'Foo'})-[rel:KNOWS]->(:Person {name:'Bar'}) SET rel.since = 1999").close()
757+
db.execute("MATCH (:Person {name:'Pippo'})-[rel:KNOWS]->(:Person {name:'Pluto'}) SET rel.since = 2019").close()
758+
759+
val records = consumer.poll(30000)
760+
761+
assertEquals(23, records.count())
762+
763+
val firstExpectedKey = firstKey ?: idPippo.toString()
764+
val secondExpectedKey = secondKey ?: idPluto.toString()
765+
val thirdExpectedKey = thirdKey ?: idFoo.toString()
766+
val fourthExpectedKey = fourthKey ?: idBar.toString()
767+
// we take the records for each node
768+
val firstRecordNode = records.filter { JSONUtils.readValue<Any>(it.key()) == firstExpectedKey }
769+
val secondRecordNode = records.filter { JSONUtils.readValue<Any>(it.key()) == secondExpectedKey }
770+
val thirdRecordNode = records.filter { JSONUtils.readValue<Any>(it.key()) == thirdExpectedKey }
771+
val fourthRecordNode = records.filter { JSONUtils.readValue<Any>(it.key()) == fourthExpectedKey }
772+
val firstRecordRel = records.filter { JSONUtils.readValue<Any>(it.key()) == mapOf("start" to thirdExpectedKey, "end" to fourthExpectedKey, "label" to relType) }
773+
val secondRecordRel = records.filter { JSONUtils.readValue<Any>(it.key()) == mapOf("start" to firstExpectedKey, "end" to secondExpectedKey, "label" to relType) }
774+
775+
// we check that all queries produced record
776+
assertEquals(3, firstRecordNode.count())
777+
assertEquals(3, secondRecordNode.count())
778+
assertEquals(3, thirdRecordNode.count())
779+
assertEquals(3, fourthRecordNode.count())
780+
assertEquals(2, firstRecordRel.count())
781+
assertEquals(2, secondRecordRel.count())
782+
783+
// we check that each node/relationship has no records spread across multiple partitions
784+
assertEquals(1, checkPartitionEquality(firstRecordNode))
785+
assertEquals(1, checkPartitionEquality(secondRecordNode))
786+
assertEquals(1, checkPartitionEquality(thirdRecordNode))
787+
assertEquals(1, checkPartitionEquality(fourthRecordNode))
788+
assertEquals(1, checkPartitionEquality(firstRecordRel))
789+
assertEquals(1, checkPartitionEquality(secondRecordRel))
790+
}
791+
}
792+
793+
private fun checkPartitionEquality(records: List<ConsumerRecord<String, ByteArray>>) = records.groupBy { it.partition() }.count()
666794
}

producer/src/test/kotlin/streams/kafka/KafkaConfigurationTest.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ class KafkaConfigurationTest {
1212
val map = mapOf("kafka.zookeeper.connect" to "zookeeper:1234",
1313
"kafka.bootstrap.servers" to "kafka:5678",
1414
"kafka.acks" to "10",
15-
"kafka.num.partitions" to 1,
1615
"kafka.retries" to 1,
1716
"kafka.batch.size" to 10,
1817
"kafka.buffer.memory" to 1000,
@@ -37,7 +36,6 @@ class KafkaConfigurationTest {
3736
assertEquals(map["kafka.zookeeper.connect"], properties["zookeeper.connect"])
3837
assertEquals(map["kafka.bootstrap.servers"], properties["bootstrap.servers"])
3938
assertEquals(map["kafka.acks"], properties["acks"])
40-
assertEquals(map["kafka.num.partitions"], properties["num.partitions"])
4139
assertEquals(map["kafka.retries"], properties["retries"])
4240
assertEquals(map["kafka.batch.size"], properties["batch.size"])
4341
assertEquals(map["kafka.buffer.memory"], properties["buffer.memory"])

0 commit comments

Comments
 (0)