Skip to content

Commit 3aa0daa

Browse files
conker84jexp
authored andcommitted
follow-up: Kafka-Connect improvements (#135)
1 parent 96b3f45 commit 3aa0daa

File tree

5 files changed

+128
-36
lines changed

5 files changed

+128
-36
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package streams.kafka.connect.sink
2+
3+
import org.apache.kafka.connect.sink.SinkRecord
4+
import org.slf4j.Logger
5+
import org.slf4j.LoggerFactory
6+
7+
class EventBuilder {
8+
private val log: Logger = LoggerFactory.getLogger(EventBuilder::class.java)
9+
10+
private var batchSize: Int? = null
11+
private lateinit var topics: Set<String>
12+
private lateinit var sinkRecords: Collection<SinkRecord>
13+
14+
fun withBatchSize(batchSize: Int): EventBuilder {
15+
this.batchSize = batchSize
16+
return this
17+
}
18+
19+
fun withTopics(topics: Set<String>): EventBuilder {
20+
this.topics = topics
21+
return this
22+
}
23+
24+
fun withSinkRecords(sinkRecords: Collection<SinkRecord>): EventBuilder {
25+
this.sinkRecords = sinkRecords
26+
return this
27+
}
28+
29+
fun build(): Map<String, List<List<SinkRecord>>> { // <Topic, List<List<SinkRecord>>
30+
return this.sinkRecords
31+
.groupBy { it.topic() }
32+
.filterKeys {topic ->
33+
val isValidTopic = topics.contains(topic)
34+
if (!isValidTopic && log.isDebugEnabled) {
35+
log.debug("Topic $topic not present")
36+
}
37+
isValidTopic
38+
}
39+
.mapValues { it ->
40+
val value = it.value
41+
value.chunked(this.batchSize!!)
42+
}
43+
}
44+
45+
}

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,21 @@ import kotlinx.coroutines.channels.ticker
55
import kotlinx.coroutines.coroutineScope
66
import kotlinx.coroutines.selects.whileSelect
77
import org.apache.kafka.common.config.ConfigException
8+
import org.apache.kafka.connect.sink.SinkRecord
89
import org.neo4j.driver.v1.AuthTokens
910
import org.neo4j.driver.v1.Config
1011
import org.neo4j.driver.v1.Driver
1112
import org.neo4j.driver.v1.GraphDatabase
1213
import org.slf4j.Logger
1314
import org.slf4j.LoggerFactory
15+
import streams.utils.StreamsUtils
1416
import java.util.concurrent.TimeUnit
1517

1618

1719
class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
1820

21+
private val converter = ValueConverter()
22+
1923
private val log: Logger = LoggerFactory.getLogger(Neo4jService::class.java)
2024

2125
private val driver: Driver
@@ -59,8 +63,10 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
5963
driver.close()
6064
}
6165

62-
private fun write(query: String, data: Map<String, Any>) {
66+
private fun write(topic: String, records: List<SinkRecord>) {
6367
val session = driver.session()
68+
val query = "${StreamsUtils.UNWIND} ${config.topicMap[topic]}"
69+
val data = mapOf<String, Any>("events" to records.map { converter.convert(it.value()) })
6470
session.writeTransaction {
6571
try {
6672
it.run(query, data)
@@ -79,12 +85,13 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
7985
session.close()
8086
}
8187

82-
suspend fun writeData(data: Map<String, List<List<Map<String, Any>>>>) = coroutineScope {
88+
suspend fun writeData(data: Map<String, List<List<SinkRecord>>>) = coroutineScope {
8389
val timeout = config.batchTimeout
8490
val ticker = ticker(timeout)
85-
val deferredList = data.flatMap { (query, events) ->
86-
events.flatMap { it.map { async { write(query, it) } } }
87-
}
91+
val deferredList = data
92+
.flatMap { (topic, records) ->
93+
records.map { async { write(topic, it) } }
94+
}
8895
whileSelect {
8996
ticker.onReceive {
9097
if (log.isDebugEnabled) {

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ class Neo4jSinkTask : SinkTask() {
1515
private lateinit var config: Neo4jSinkConnectorConfig
1616
private lateinit var neo4jService: Neo4jService
1717

18-
private val converter = ValueConverter()
19-
2018
override fun version(): String {
2119
return VersionUtil.version(this.javaClass as Class<*>)
2220
}
@@ -31,33 +29,12 @@ class Neo4jSinkTask : SinkTask() {
3129
return@runBlocking
3230
}
3331

34-
val mapTopicRecords = collection.groupBy { it.topic() }
35-
36-
if (log.isDebugEnabled) {
37-
mapTopicRecords.forEach { topic, records -> log.debug("For topic: $topic record size is ${records.size}") }
38-
}
39-
4032
// TODO define a retry policy in that case we must throw `RetriableException`
41-
val data = mapTopicRecords
42-
.filterKeys {topic ->
43-
val isValidTopic = config.topicMap.containsKey(topic)
44-
if (!isValidTopic && log.isDebugEnabled) {
45-
log.debug("Topic $topic not present")
46-
}
47-
isValidTopic
48-
}
49-
.mapKeys { it -> "${StreamsUtils.UNWIND} ${config.topicMap[it.key]}" }
50-
.mapValues { it ->
51-
val value = it.value
52-
val chunks = value.chunked(config.batchSize)
53-
if (log.isDebugEnabled) {
54-
log.debug("Data chunked in ${chunks.size} chunks")
55-
}
56-
val result = chunks.map {
57-
it.map { mapOf("events" to converter.convert(it.value())) }
58-
}
59-
result
60-
}
33+
val data = EventBuilder()
34+
.withBatchSize(config.batchSize)
35+
.withTopics(config.topicMap.keys)
36+
.withSinkRecords(collection)
37+
.build()
6138
neo4jService.writeData(data)
6239

6340
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package streams.kafka.connect.sink
2+
3+
import org.apache.kafka.connect.data.Schema
4+
import org.apache.kafka.connect.data.SchemaBuilder
5+
import org.apache.kafka.connect.data.Struct
6+
import org.apache.kafka.connect.data.Timestamp
7+
import org.apache.kafka.connect.sink.SinkRecord
8+
import org.junit.Test
9+
import java.util.*
10+
import kotlin.test.assertEquals
11+
12+
class EventBuilderTest {
13+
private val PERSON_SCHEMA = SchemaBuilder.struct().name("com.example.Person")
14+
.field("firstName", Schema.STRING_SCHEMA)
15+
.field("lastName", Schema.STRING_SCHEMA)
16+
.field("age", Schema.OPTIONAL_INT32_SCHEMA)
17+
.field("bool", Schema.OPTIONAL_BOOLEAN_SCHEMA)
18+
.field("short", Schema.OPTIONAL_INT16_SCHEMA)
19+
.field("byte", Schema.OPTIONAL_INT8_SCHEMA)
20+
.field("long", Schema.OPTIONAL_INT64_SCHEMA)
21+
.field("float", Schema.OPTIONAL_FLOAT32_SCHEMA)
22+
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
23+
.field("modified", Timestamp.SCHEMA)
24+
.build()
25+
26+
@Test
27+
fun `should create event map properly`() {
28+
// Given
29+
val firstTopic = "neotopic"
30+
val secondTopic = "foo"
31+
val batchSize = 2
32+
val struct= Struct(PERSON_SCHEMA)
33+
.put("firstName", "Alex")
34+
.put("lastName", "Smith")
35+
.put("bool", true)
36+
.put("short", 1234.toShort())
37+
.put("byte", (-32).toByte())
38+
.put("long", 12425436L)
39+
.put("float", 2356.3.toFloat())
40+
.put("double", -2436546.56457)
41+
.put("age", 21)
42+
.put("modified", Date(1474661402123L))
43+
val input = listOf(SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
44+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
45+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 43),
46+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 44),
47+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 45),
48+
SinkRecord(secondTopic, 1, null, null, PERSON_SCHEMA, struct, 43)) // 5 records for topic "neotopic", 1 for topic "foo"
49+
val topics = setOf(firstTopic, secondTopic)
50+
51+
// When
52+
val data = EventBuilder()
53+
.withBatchSize(batchSize)
54+
.withTopics(topics)
55+
.withSinkRecords(input)
56+
.build()
57+
58+
// Then
59+
assertEquals(topics, data.keys)
60+
assertEquals(3, data[firstTopic]!!.size) // n° of chunks for "neotopic"
61+
assertEquals(1, data[secondTopic]!!.size) // n° of chunks for "foo"
62+
}
63+
}

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ class Neo4jSinkTaskTest: EasyMockSupport() {
8686
task.start(props)
8787
val input = listOf(SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
8888
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
89-
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
90-
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
91-
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
89+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 43),
90+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 44),
91+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 45),
9292
SinkRecord(secondTopic, 1, null, null, PERSON_SCHEMA, struct, 43))
9393
task.put(input)
9494
db.graph().beginTx().use {

0 commit comments

Comments
 (0)