Skip to content

Commit 0fc73fc

Browse files
authored
Fixes #272: Allow to publish messages with key in Kafka topic (3.5) (#409)
* Fixes #272: Allow to publish messages with key in Kafka topic (4.0) * added sync config * updated test keyNull
1 parent d1e02b4 commit 0fc73fc

File tree

8 files changed

+240
-28
lines changed

8 files changed

+240
-28
lines changed

doc/asciidoc/procedures/index.adoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,23 @@ Input Parameters:
7676

7777
|===
7878

79+
Configuration parameters:
80+
[cols="3*",options="header"]
81+
|===
82+
|Name
83+
|Type
84+
|Description
85+
86+
|`key`
87+
|Object
88+
|The key value of message that you want to stream. Please note that if the key doesn't exist, you get a message with a random UUID as key value
89+
90+
|`partition`
91+
|Int
92+
|The partition of message that you want to stream
93+
94+
|===
95+
7996
You can send any kind of data in the payload, nodes, relationships, paths, lists, maps, scalar values and nested versions thereof.
8097

8198
In case of nodes or relationships if the topic is defined in the patterns provided by the configuration their properties will be filtered in according with the configuration.

doc/asciidoc/producer/index.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ You can set the following configuration values in your `neo4j.conf`, here are th
1919
[source,adoc,subs="verbatim"]
2020
include::configuration.adoc[]
2121

22+
=== Serializers
23+
24+
To allow insertion of keys in any format (e.g. through `streams.publish` procedure)
25+
**the `key.serializer` is set with the `org.apache.kafka.common.serialization.ByteArraySerializer`**
26+
like `value.serializer`
27+
2228
[#source-patterns]
2329
=== Patterns
2430

producer/src/main/kotlin/streams/StreamsEventRouter.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import streams.events.StreamsEvent
66

77
abstract class StreamsEventRouter(val logService: LogService?, val config: Config?) {
88

9-
abstract fun sendEvents(topic: String, transactionEvents: List<out StreamsEvent>)
9+
abstract fun sendEvents(topic: String, transactionEvents: List<out StreamsEvent>, config: Map<String, Any?> = emptyMap())
1010

11-
abstract fun sendEventsSync(topic: String, transactionEvents: List<out StreamsEvent>): List<Map<String, Any>>
11+
abstract fun sendEventsSync(topic: String, transactionEvents: List<out StreamsEvent>, config: Map<String, Any?> = emptyMap()): List<Map<String, Any>>
1212

1313
abstract fun start()
1414

producer/src/main/kotlin/streams/kafka/KafkaConfiguration.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
9191

9292
private fun addSerializers() : Properties {
9393
val props = Properties()
94-
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
94+
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
9595
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
9696
return props
9797
}

producer/src/main/kotlin/streams/kafka/KafkaEventRouter.kt

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import java.util.concurrent.ThreadLocalRandom
2525

2626
class KafkaEventRouter: StreamsEventRouter {
2727
private val log: Log
28-
private lateinit var producer: Neo4jKafkaProducer<String, ByteArray>
28+
private lateinit var producer: Neo4jKafkaProducer<ByteArray, ByteArray>
2929
private lateinit var kafkaConfig: KafkaConfiguration
3030
private lateinit var kafkaAdminService: KafkaAdminService
3131

@@ -57,7 +57,7 @@ class KafkaEventRouter: StreamsEventRouter {
5757
StreamsUtils.ignoreExceptions({ kafkaAdminService.stop() }, UninitializedPropertyAccessException::class.java)
5858
}
5959

60-
private fun send(producerRecord: ProducerRecord<String, ByteArray>, sync: Boolean = false): Map<String, Any>? {
60+
private fun send(producerRecord: ProducerRecord<ByteArray?, ByteArray>, sync: Boolean = false): Map<String, Any>? {
6161
if (!kafkaAdminService.isValidTopic(producerRecord.topic())) {
6262
// TODO add logging system here
6363
return null
@@ -80,46 +80,51 @@ class KafkaEventRouter: StreamsEventRouter {
8080
}
8181
}
8282

83-
private fun sendEvent(partition: Int, topic: String, event: StreamsEvent, sync: Boolean = false): Map<String, Any>? {
83+
// this method is used by the procedures
84+
private fun sendEvent(topic: String, event: StreamsEvent, config: Map<String, Any?>, sync: Boolean = false): Map<String, Any>? {
8485
if (log.isDebugEnabled) {
8586
log.debug("Trying to send a simple event with payload ${event.payload} to kafka")
8687
}
87-
val uuid = UUID.randomUUID().toString()
88-
val producerRecord = ProducerRecord(topic, partition, System.currentTimeMillis(), uuid,
88+
// in the procedures we allow to define a custom message key via the configuration property key
89+
// in order to have the backwards compatibility we define as default value the old key
90+
val key = config.getOrDefault("key", UUID.randomUUID().toString())
91+
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(), key?.let { JSONUtils.writeValueAsBytes(it) },
8992
JSONUtils.writeValueAsBytes(event))
9093
return send(producerRecord, sync)
9194
}
9295

93-
private fun sendEvent(partition: Int, topic: String, event: StreamsTransactionEvent) {
96+
// this method is used by the transaction event handler
97+
private fun sendEvent(topic: String, event: StreamsTransactionEvent, config: Map<String, Any?>) {
9498
if (log.isDebugEnabled) {
9599
log.debug("Trying to send a transaction event with txId ${event.meta.txId} and txEventId ${event.meta.txEventId} to kafka")
96100
}
97-
val producerRecord = ProducerRecord(topic, partition, System.currentTimeMillis(), "${event.meta.txId + event.meta.txEventId}-${event.meta.txEventId}",
101+
val producerRecord = ProducerRecord(topic, getPartition(config), System.currentTimeMillis(),
102+
JSONUtils.writeValueAsBytes("${event.meta.txId + event.meta.txEventId}-${event.meta.txEventId}"),
98103
JSONUtils.writeValueAsBytes(event))
99104
send(producerRecord)
100105
}
101106

102107

103-
override fun sendEventsSync(topic: String, transactionEvents: List<out StreamsEvent>): List<Map<String, Any>> {
108+
override fun sendEventsSync(topic: String, transactionEvents: List<out StreamsEvent>, config: Map<String, Any?>): List<Map<String, Any>> {
104109
producer.beginTransaction()
105110

106111
val results = transactionEvents.mapNotNull {
107-
sendEvent(ThreadLocalRandom.current().nextInt(kafkaConfig.numPartitions), topic, it, true)
112+
sendEvent(topic, it, config, true)
108113
}
109114
producer.commitTransaction()
110115

111116
return results
112117
}
113118

114-
override fun sendEvents(topic: String, transactionEvents: List<out StreamsEvent>) {
119+
override fun sendEvents(topic: String, transactionEvents: List<out StreamsEvent>, config: Map<String, Any?>) {
115120
try {
116121
producer.beginTransaction()
117122
transactionEvents.forEach {
118123
val partition = ThreadLocalRandom.current().nextInt(kafkaConfig.numPartitions)
119124
if (it is StreamsTransactionEvent) {
120-
sendEvent(partition, topic, it)
125+
sendEvent(topic, it, config)
121126
} else {
122-
sendEvent(partition, topic, it)
127+
sendEvent(topic, it, config)
123128
}
124129
}
125130
producer.commitTransaction()
@@ -138,6 +143,8 @@ class KafkaEventRouter: StreamsEventRouter {
138143
}
139144
}
140145

146+
private fun getPartition(config: Map<String, Any?>) = config.getOrDefault("partition", ThreadLocalRandom.current().nextInt(kafkaConfig.numPartitions)).toString().toInt()
147+
141148
}
142149

143150
class Neo4jKafkaProducer<K, V>: KafkaProducer<K, V> {

producer/src/main/kotlin/streams/procedures/StreamsProcedures.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class StreamsProcedures {
2424

2525
val streamsEvent = buildStreamEvent(topic!!, payload!!)
2626

27-
return eventRouter.sendEventsSync(topic, listOf(streamsEvent))
27+
return eventRouter.sendEventsSync(topic, listOf(streamsEvent), config ?: emptyMap())
2828
.map { StreamPublishResult(it) }
2929
.stream()
3030
}
@@ -41,7 +41,7 @@ class StreamsProcedures {
4141

4242
val streamsEvent = buildStreamEvent(topic!!, payload!!)
4343

44-
eventRouter.sendEvents(topic, listOf(streamsEvent))
44+
eventRouter.sendEvents(topic, listOf(streamsEvent), config ?: emptyMap())
4545
}
4646

4747
private fun isTopicNullOrEmpty(topic: String?): Boolean {

0 commit comments

Comments
 (0)