Skip to content

Commit 4555942

Browse files
conker84jexp
authored andcommitted
fixes #211: Add AVRO Support to the Streams Sink Plugin (#216)
1 parent 19b3baf commit 4555942

File tree

18 files changed

+470
-51
lines changed

18 files changed

+470
-51
lines changed

common/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
<version>${neo4j.java.driver.version}</version>
2525
<scope>provided</scope>
2626
</dependency>
27+
28+
<dependency>
29+
<groupId>org.apache.avro</groupId>
30+
<artifactId>avro</artifactId>
31+
</dependency>
2732
</dependencies>
2833

2934
</project>

common/src/main/kotlin/streams/extensions/CommonExtensions.kt

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package streams.extensions
22

3-
import com.fasterxml.jackson.core.JsonParseException
3+
import org.apache.avro.Schema
4+
import org.apache.avro.generic.GenericEnumSymbol
5+
import org.apache.avro.generic.GenericFixed
6+
import org.apache.avro.generic.GenericRecord
7+
import org.apache.avro.generic.IndexedRecord
48
import org.apache.kafka.clients.consumer.ConsumerRecord
59
import org.apache.kafka.clients.consumer.OffsetAndMetadata
610
import org.apache.kafka.common.TopicPartition
711
import org.neo4j.graphdb.Node
812
import streams.serialization.JSONUtils
913
import streams.service.StreamsSinkEntity
14+
import java.nio.ByteBuffer
1015
import javax.lang.model.SourceVersion
1116

1217
fun Map<String,String>.getInt(name:String, defaultValue: Int) = this.get(name)?.toInt() ?: defaultValue
@@ -34,9 +39,37 @@ fun Map<String, Any?>.flatten(map: Map<String, Any?> = this, prefix: String = ""
3439
}.toMap()
3540
}
3641

37-
fun <K, V> ConsumerRecord<K, V>.topicPartition() = TopicPartition(this.topic(), this.partition())
38-
fun <K, V> ConsumerRecord<K, V>.offsetAndMetadata(metadata: String = "") = OffsetAndMetadata(this.offset() + 1, metadata)
42+
fun ConsumerRecord<*, *>.topicPartition() = TopicPartition(this.topic(), this.partition())
43+
fun ConsumerRecord<*, *>.offsetAndMetadata(metadata: String = "") = OffsetAndMetadata(this.offset() + 1, metadata)
3944

40-
fun ConsumerRecord<ByteArray, ByteArray>.toStreamsSinkEntity(): StreamsSinkEntity = StreamsSinkEntity(
41-
if (this.key() == null) null else JSONUtils.readValue<Any>(this.key(), true),
42-
if (this.value() == null) null else JSONUtils.readValue<Any>(this.value()))
45+
private fun convertAvroData(rawValue: Any?): Any? = when (rawValue) {
46+
is IndexedRecord -> rawValue.toMap()
47+
is Collection<*> -> rawValue.map(::convertAvroData)
48+
is Array<*> -> if (rawValue.javaClass.componentType.isPrimitive) rawValue else rawValue.map(::convertAvroData)
49+
is Map<*, *> -> rawValue
50+
.mapKeys { it.key.toString() }
51+
.mapValues { convertAvroData(it.value) }
52+
is GenericFixed -> rawValue.bytes()
53+
is ByteBuffer -> rawValue.array()
54+
is GenericEnumSymbol, is CharSequence -> rawValue.toString()
55+
else -> rawValue
56+
}
57+
fun IndexedRecord.toMap() = this.schema.fields
58+
.map { it.name() to convertAvroData(this[it.pos()]) }
59+
.toMap()
60+
61+
fun Schema.toMap() = JSONUtils.asMap(this.toString())
62+
63+
private fun convertData(data: Any?, stringWhenFailure: Boolean = false): Any? {
64+
return when (data) {
65+
null -> null
66+
is ByteArray -> JSONUtils.readValue<Any>(data, stringWhenFailure)
67+
is GenericRecord -> data.toMap()
68+
else -> if (stringWhenFailure) data.toString() else throw RuntimeException("Unsupported type ${data::class.java.name}")
69+
}
70+
}
71+
fun ConsumerRecord<*, *>.toStreamsSinkEntity(): StreamsSinkEntity {
72+
val key = convertData(this.key(), true)
73+
val value = convertData(this.value())
74+
return StreamsSinkEntity(key, value)
75+
}

common/src/main/kotlin/streams/service/errors/ErrorService.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package streams.service.errors
22

3+
import org.apache.avro.generic.GenericRecord
34
import org.apache.kafka.clients.consumer.ConsumerRecord
45
import org.apache.kafka.common.record.RecordBatch
5-
import java.lang.RuntimeException
6+
import streams.extensions.toMap
7+
import streams.serialization.JSONUtils
68
import java.util.*
79

810

@@ -20,20 +22,23 @@ data class ErrorData(val originalTopic: String,
2022
this(originalTopic, timestamp ?: RecordBatch.NO_TIMESTAMP, toByteArray(key), toByteArray(value), partition.toString(),offset.toString(), executingClass, exception)
2123

2224
companion object {
23-
fun from(consumerRecord: ConsumerRecord<ByteArray, ByteArray>, exception: Exception?, executingClass: Class<*>?): ErrorData {
25+
26+
fun from(consumerRecord: ConsumerRecord<out Any, out Any>, exception: Exception?, executingClass: Class<*>?): ErrorData {
2427
return ErrorData(offset = consumerRecord.offset().toString(),
2528
originalTopic = consumerRecord.topic(),
2629
partition = consumerRecord.partition().toString(),
2730
timestamp = consumerRecord.timestamp(),
2831
exception = exception,
2932
executingClass = executingClass,
30-
key = consumerRecord.key(),
31-
value = consumerRecord.value())
33+
key = toByteArray(consumerRecord.key()),
34+
value = toByteArray(consumerRecord.value()))
3235
}
36+
3337
fun toByteArray(v:Any?) = try {
3438
when (v) {
3539
null -> null
3640
is ByteArray -> v
41+
is GenericRecord -> JSONUtils.writeValueAsBytes(mapOf("schema" to v.schema.toMap(), "record" to v.toMap()))
3742
else -> v.toString().toByteArray(Charsets.UTF_8)
3843
}
3944
} catch (e:Exception) {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package streams.extensions
2+
3+
import org.apache.avro.SchemaBuilder
4+
import org.apache.avro.generic.GenericRecordBuilder
5+
import org.junit.Test
6+
import kotlin.test.assertEquals
7+
8+
class CommonExtensionsTest {
9+
10+
@Test
11+
fun `should convert AVRO record to Map`() {
12+
// given
13+
// this test generates a simple tree structure like this
14+
// body
15+
// / \
16+
// p ul
17+
// |
18+
// li
19+
val BODY_SCHEMA = SchemaBuilder.builder("org.neo4j.example.html")
20+
.record("BODY").fields()
21+
.name("ul").type().array().items()
22+
.record("UL").namespace("org.neo4j.example.html").fields()
23+
.name("value").type().array().items()
24+
.record("LI").namespace("org.neo4j.example.html").fields()
25+
.optionalString("value")
26+
.name("class").type().nullable().array().items().stringType().noDefault()
27+
.endRecord().noDefault()
28+
.endRecord().noDefault()
29+
.name("p").type().array().items()
30+
.record("P").namespace("org.neo4j.example.html").fields()
31+
.optionalString("value")
32+
.endRecord().noDefault()
33+
.endRecord()
34+
val UL_SCHEMA = BODY_SCHEMA.getField("ul").schema().elementType
35+
val LI_SCHEMA = UL_SCHEMA.getField("value").schema().elementType
36+
val firstLi = listOf(
37+
GenericRecordBuilder(LI_SCHEMA).set("value", "First UL - First Element").set("class", null).build(),
38+
GenericRecordBuilder(LI_SCHEMA).set("value", "First UL - Second Element").set("class", listOf("ClassA", "ClassB")).build()
39+
)
40+
val secondLi = listOf(
41+
GenericRecordBuilder(LI_SCHEMA).set("value", "Second UL - First Element").set("class", null).build(),
42+
GenericRecordBuilder(LI_SCHEMA).set("value", "Second UL - Second Element").set("class", null).build()
43+
)
44+
val structUL = listOf(
45+
GenericRecordBuilder(UL_SCHEMA).set("value", firstLi).build(),
46+
GenericRecordBuilder(UL_SCHEMA).set("value", secondLi).build()
47+
)
48+
val structP = listOf(
49+
GenericRecordBuilder(BODY_SCHEMA.getField("p").schema().elementType).set("value", "First Paragraph").build(),
50+
GenericRecordBuilder(BODY_SCHEMA.getField("p").schema().elementType).set("value", "Second Paragraph").build()
51+
)
52+
val struct = GenericRecordBuilder(BODY_SCHEMA)
53+
.set("ul", structUL)
54+
.set("p", structP)
55+
.build()
56+
57+
// when
58+
val actual = struct.toMap()
59+
60+
// then
61+
val firstULMap = mapOf("value" to listOf(
62+
mapOf("value" to "First UL - First Element", "class" to null),
63+
mapOf("value" to "First UL - Second Element", "class" to listOf("ClassA", "ClassB"))))
64+
val secondULMap = mapOf("value" to listOf(
65+
mapOf("value" to "Second UL - First Element", "class" to null),
66+
mapOf("value" to "Second UL - Second Element", "class" to null)))
67+
val ulListMap = listOf(firstULMap, secondULMap)
68+
val pListMap = listOf(mapOf("value" to "First Paragraph"),
69+
mapOf("value" to "Second Paragraph"))
70+
val bodyMap = mapOf("ul" to ulListMap, "p" to pListMap)
71+
assertEquals(bodyMap, actual)
72+
}
73+
}

consumer/pom.xml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,34 @@
1717
<version>3.5.3</version>
1818
</parent>
1919

20+
<repositories>
21+
<repository>
22+
<id>confluent</id>
23+
<name>Confluent</name>
24+
<url>https://packages.confluent.io/maven/</url>
25+
</repository>
26+
</repositories>
27+
<pluginRepositories>
28+
<pluginRepository>
29+
<id>confluent</id>
30+
<url>https://packages.confluent.io/maven/</url>
31+
</pluginRepository>
32+
</pluginRepositories>
33+
2034
<dependencies>
2135
<dependency>
2236
<groupId>org.neo4j</groupId>
2337
<artifactId>neo4j-streams-common</artifactId>
2438
<version>3.5.3</version>
2539
</dependency>
26-
40+
<dependency>
41+
<groupId>org.apache.avro</groupId>
42+
<artifactId>avro</artifactId>
43+
</dependency>
44+
<dependency>
45+
<groupId>io.confluent</groupId>
46+
<artifactId>kafka-avro-serializer</artifactId>
47+
</dependency>
2748
</dependencies>
2849

2950
</project>

consumer/src/main/kotlin/streams/kafka/KafkaAutoCommitEventConsumer.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package streams.kafka
22

3+
import io.confluent.kafka.serializers.KafkaAvroDeserializer
4+
import org.apache.avro.generic.GenericRecord
35
import org.apache.kafka.clients.consumer.ConsumerRecord
46
import org.apache.kafka.clients.consumer.KafkaConsumer
57
import org.apache.kafka.clients.consumer.OffsetAndMetadata
68
import org.apache.kafka.common.TopicPartition
9+
import org.apache.kafka.common.serialization.ByteArrayDeserializer
710
import org.neo4j.logging.Log
811
import streams.StreamsEventConsumer
912
import streams.extensions.offsetAndMetadata
@@ -41,7 +44,13 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
4144

4245
private var isSeekSet = false
4346

44-
val consumer = KafkaConsumer<ByteArray, ByteArray>(config.asProperties())
47+
val consumer: KafkaConsumer<*, *> = when {
48+
config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<ByteArray, ByteArray>(config.asProperties())
49+
config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<ByteArray, GenericRecord>(config.asProperties())
50+
config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<GenericRecord, GenericRecord>(config.asProperties())
51+
config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<GenericRecord, ByteArray>(config.asProperties())
52+
else -> throw RuntimeException("Invalid config")
53+
}
4554

4655
lateinit var topics: Set<String>
4756

@@ -79,15 +88,15 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
7988
}
8089
}
8190

82-
private fun executeAction(action: (String, List<StreamsSinkEntity>) -> Unit, topic: String, topicRecords: Iterable<ConsumerRecord<ByteArray, ByteArray>>) {
91+
private fun executeAction(action: (String, List<StreamsSinkEntity>) -> Unit, topic: String, topicRecords: Iterable<ConsumerRecord<out Any, out Any>>) {
8392
try {
8493
action(topic, convert(topicRecords))
8594
} catch (e: Exception) {
8695
errorService.report(topicRecords.map { ErrorData.from(it, e,this::class.java ) })
8796
}
8897
}
8998

90-
private fun convert(topicRecords: Iterable<ConsumerRecord<ByteArray, ByteArray>>) = topicRecords
99+
private fun convert(topicRecords: Iterable<ConsumerRecord<out Any, out Any>>) = topicRecords
91100
.map {
92101
try {
93102
"ok" to it.toStreamsSinkEntity()

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ class KafkaEventSink(private val config: Config,
3030
"broker" to "kafka.${ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}",
3131
"from" to "kafka.${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}",
3232
"autoCommit" to "kafka.${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG}",
33+
"keyDeserializer" to "kafka.${ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}",
34+
"valueDeserializer" to "kafka.${ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}",
35+
"schemaRegistryUrl" to "kafka.schema.registry.url",
3336
"groupId" to "kafka.${ConsumerConfig.GROUP_ID_CONFIG}")
3437

3538
override fun getEventConsumerFactory(): StreamsEventConsumerFactory {
@@ -83,18 +86,6 @@ class KafkaEventSink(private val config: Config,
8386
} catch (e : UninitializedPropertyAccessException) { /* ignoring this one only */ }
8487
}
8588

86-
override fun getEventSinkConfigMapper(): StreamsEventSinkConfigMapper { // TODO move to the abstract class
87-
return object: StreamsEventSinkConfigMapper(streamsConfigMap, mappingKeys) {
88-
override fun convert(config: Map<String, String>): Map<String, String> {
89-
val props = streamsConfigMap
90-
.toMutableMap()
91-
props += config.mapKeys { mappingKeys.getOrDefault(it.key, it.key) }
92-
return props
93-
}
94-
95-
}
96-
}
97-
9889
private fun createJob(): Job {
9990
log.info("Creating Sink daemon Job")
10091
return GlobalScope.launch(Dispatchers.IO) { // TODO improve exception management

consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,39 @@
11
package streams.kafka
22

3+
import io.confluent.kafka.serializers.KafkaAvroDeserializer
34
import org.apache.kafka.clients.CommonClientConfigs
45
import org.apache.kafka.clients.consumer.ConsumerConfig
56
import org.apache.kafka.common.serialization.ByteArrayDeserializer
6-
import org.apache.kafka.common.serialization.StringDeserializer
77
import org.neo4j.kernel.configuration.Config
88
import streams.StreamsSinkConfiguration
99
import streams.extensions.toPointCase
1010
import streams.serialization.JSONUtils
1111
import java.util.*
1212

1313

14-
1514
private const val kafkaConfigPrefix = "kafka."
1615

16+
private val SUPPORTED_DESERIALIZER = listOf(ByteArrayDeserializer::class.java.name, KafkaAvroDeserializer::class.java.name)
17+
18+
private fun validateDeserializers(config: Map<String, String>) {
19+
val kafkaKeyConfig = config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG].orEmpty()
20+
val kafkaValueConfig = config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG].orEmpty()
21+
val key = if (kafkaKeyConfig.isNotBlank() && !SUPPORTED_DESERIALIZER.contains(kafkaKeyConfig)) {
22+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
23+
} else if (kafkaValueConfig.isNotBlank() && !SUPPORTED_DESERIALIZER.contains(kafkaValueConfig)) {
24+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
25+
} else {
26+
""
27+
}
28+
if (key.isNotBlank()) {
29+
throw RuntimeException("The property kafka.$key contains an invalid deserializer. Supported deserializers are $SUPPORTED_DESERIALIZER")
30+
}
31+
}
32+
1733
data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181",
1834
val bootstrapServers: String = "localhost:9092",
35+
val keyDeserializer: String = "org.apache.kafka.common.serialization.ByteArrayDeserializer",
36+
val valueDeserializer: String = "org.apache.kafka.common.serialization.ByteArrayDeserializer",
1937
val groupId: String = "neo4j",
2038
val autoOffsetReset: String = "earliest",
2139
val streamsSinkConfiguration: StreamsSinkConfiguration = StreamsSinkConfiguration(),
@@ -34,12 +52,16 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
3452
.mapKeys { it.key.substring(kafkaConfigPrefix.length) }
3553
val default = KafkaSinkConfiguration()
3654

55+
3756
val keys = JSONUtils.asMap(default).keys.map { it.toPointCase() }
57+
validate(config)
3858
val extraProperties = config.filterKeys { !keys.contains(it) }
3959

4060
val streamsSinkConfiguration = StreamsSinkConfiguration.from(cfg)
4161

4262
return default.copy(zookeeperConnect = config.getOrDefault("zookeeper.connect",default.zookeeperConnect),
63+
keyDeserializer = config.getOrDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, default.keyDeserializer),
64+
valueDeserializer = config.getOrDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, default.valueDeserializer),
4365
bootstrapServers = config.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, default.bootstrapServers),
4466
autoOffsetReset = config.getOrDefault(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, default.autoOffsetReset),
4567
groupId = config.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG, default.groupId),
@@ -48,6 +70,10 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
4870
extraProperties = extraProperties // for what we don't provide a default configuration
4971
)
5072
}
73+
74+
private fun validate(config: Map<String, String>) {
75+
validateDeserializers(config)
76+
}
5177
}
5278

5379
fun asProperties(): Properties {
@@ -57,14 +83,6 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
5783
.mapKeys { it.key.toPointCase() }
5884
props.putAll(map)
5985
props.putAll(extraProperties)
60-
props.putAll(addDeserializers()) // Fixed deserializers
61-
return props
62-
}
63-
64-
private fun addDeserializers() : Properties {
65-
val props = Properties()
66-
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
67-
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
6886
return props
6987
}
7088
}

0 commit comments

Comments
 (0)