Skip to content

Commit 17c3096

Browse files
conker84jexp
authored andcommitted
fixes #243: Restore the DLQ in Kafka Connect Sink (#244)
1 parent 6917083 commit 17c3096

File tree

14 files changed

+85
-57
lines changed

14 files changed

+85
-57
lines changed

common/src/main/kotlin/streams/extensions/CommonExtensions.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@ import org.neo4j.graphdb.Node
1212
import streams.serialization.JSONUtils
1313
import streams.service.StreamsSinkEntity
1414
import java.nio.ByteBuffer
15+
import java.util.*
1516
import javax.lang.model.SourceVersion
1617

1718
fun Map<String,String>.getInt(name:String, defaultValue: Int) = this.get(name)?.toInt() ?: defaultValue
19+
fun Map<*, *>.asProperties() = this.let {
20+
val properties = Properties()
21+
properties.putAll(it)
22+
properties
23+
}
1824

1925
fun Node.labelNames() : List<String> {
2026
return this.labels.map { it.name() }

common/src/main/kotlin/streams/service/errors/KafkaErrorService.kt

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,35 @@
11
package streams.service.errors
22

33
import org.apache.commons.lang3.exception.ExceptionUtils
4+
import org.apache.kafka.clients.CommonClientConfigs
45
import org.apache.kafka.clients.producer.KafkaProducer
56
import org.apache.kafka.clients.producer.Producer
67
import org.apache.kafka.clients.producer.ProducerConfig
78
import org.apache.kafka.clients.producer.ProducerRecord
89
import org.apache.kafka.common.record.RecordBatch
910
import org.apache.kafka.common.serialization.ByteArraySerializer
1011
import org.neo4j.util.VisibleForTesting
12+
import streams.utils.ValidationUtils.validateConnection
1113
import java.util.*
1214

1315
class KafkaErrorService(private val producer: Producer<ByteArray, ByteArray>?, private val errorConfig: ErrorConfig, private val log: (String, Exception?)->Unit): ErrorService() {
1416

1517
constructor(config: Properties, errorConfig: ErrorConfig,
16-
log: (String, Exception?) -> Unit) : this(producer(errorConfig, config), errorConfig, log)
18+
log: (String, Exception?) -> Unit) : this(producer(errorConfig, config, log), errorConfig, log)
1719

1820
companion object {
19-
private fun producer(errorConfig: ErrorConfig, config: Properties) =
21+
private fun producer(errorConfig: ErrorConfig, config: Properties, log: (String, Exception?) -> Unit) =
2022
errorConfig.dlqTopic?.let {
21-
val props = Properties()
22-
props.putAll(config)
23-
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java.name
24-
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java.name
25-
KafkaProducer<ByteArray,ByteArray>(props)
23+
try {
24+
val bootstrapServers = config.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "").toString()
25+
validateConnection(bootstrapServers, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, false)
26+
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java.name
27+
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java.name
28+
KafkaProducer<ByteArray, ByteArray>(config)
29+
} catch (e: Exception) {
30+
log("Cannot initialize the custom DLQ because of the following exception: ", e)
31+
null
32+
}
2633
}
2734
}
2835

@@ -35,8 +42,9 @@ class KafkaErrorService(private val producer: Producer<ByteArray, ByteArray>?, p
3542
errorDatas.map { it.exception }.distinct().forEach{log("Error processing ${errorDatas.size} messages",it)}
3643
}
3744
}
38-
if (errorConfig.dlqTopic != null && producer != null) {
39-
errorDatas.forEach { dlqData ->
45+
46+
errorDatas.forEach { dlqData ->
47+
producer?.let {
4048
try {
4149
val producerRecord = if (dlqData.timestamp == RecordBatch.NO_TIMESTAMP) {
4250
ProducerRecord(errorConfig.dlqTopic, null, dlqData.key, dlqData.value)
@@ -47,9 +55,9 @@ class KafkaErrorService(private val producer: Producer<ByteArray, ByteArray>?, p
4755
val producerHeader = producerRecord.headers()
4856
populateContextHeaders(dlqData).forEach { (key, value) -> producerHeader.add(key, value) }
4957
}
50-
producer.send(producerRecord)
58+
it.send(producerRecord)
5159
} catch (e: Exception) {
52-
log("Error writing to DLQ $e :${dlqData.toLogString()}",e) // todo only the first or all
60+
log("Error writing to DLQ $e: ${dlqData.toLogString()}", e) // todo only the first or all
5361
}
5462
}
5563
}

common/src/main/kotlin/streams/utils/ValidationUtils.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ object ValidationUtils {
3232
if (url.isBlank()) {
3333
throw RuntimeException("The `kafka.$kafkaPropertyKey` property is empty")
3434
} else if (checkReachable) {
35-
val unreachableServers = ValidationUtils.checkServersUnreachable(url)
35+
val unreachableServers = checkServersUnreachable(url)
3636
if (unreachableServers.isNotEmpty()) {
3737
throw RuntimeException("The servers defined into the property `kafka.$kafkaPropertyKey` are not reachable: $unreachableServers")
3838
}

consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
7979

8080
private fun validate(config: KafkaSinkConfiguration) {
8181
validateConnection(config.zookeeperConnect, "zookeeper.connect", false)
82-
validateConnection(config.bootstrapServers, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
82+
validateConnection(config.bootstrapServers, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, false)
8383
val schemaRegistryUrlKey = "schema.registry.url"
8484
if (config.extraProperties.containsKey(schemaRegistryUrlKey)) {
8585
val schemaRegistryUrl = config.extraProperties.getOrDefault(schemaRegistryUrlKey, "")

consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializer
44
import org.apache.kafka.clients.consumer.ConsumerConfig
55
import org.apache.kafka.common.serialization.ByteArrayDeserializer
66
import org.apache.kafka.common.serialization.StringDeserializer
7+
import org.junit.Ignore
78
import org.junit.Test
89
import org.neo4j.kernel.configuration.Config
910
import streams.StreamsSinkConfiguration
@@ -76,6 +77,7 @@ class KafkaSinkConfigurationTest {
7677
}
7778

7879
@Test(expected = RuntimeException::class)
80+
@Ignore("Disabled, use Kafka to deal with availability of the configured services")
7981
fun `should not validate the configuration because of unreachable kafka bootstrap server`() {
8082
val zookeeper = "zookeeper:2181"
8183
val bootstrap = "bootstrap:9092"

doc/asciidoc/consumer/index.adoc

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,25 @@ Config Options
483483
| errors.deadletterqueue.topic.replication.factor | 3/1 | replication factor, need to set to 1 for single partition, default:3
484484
|===
485485

486-
In Kafka Connect you can just configure the above properties.
486+
In Kafka Connect in addition to the above properties you need to define kafka broker connection properties:
487+
488+
|===
489+
| Name | mandatory | Description
490+
491+
| kafka.bootstrap.servers | true | It's the Kafka Broker url. *(please look at the description below)
492+
493+
| kafka.<any_other_kafka_property> | false | You can also specify any other kafka Producer
494+
setting by adding the `kafka.` prefix (i.e the configuration `acks` become `kafka.acks`). See the https://kafka.apache.org/documentation/#brokerconfigs[Apache Kafka documentation] for details on these settings.
495+
496+
|===
497+
498+
*As you may have noticed we're asking to provide the `bootstrap.server` property,
499+
this because the Kafka Connect Framework provides an out-of-the-box support
500+
only for deserialization errors and message transformations
501+
(please look into the following link for further details: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues).
502+
We want to extend this feature for transient errors in order to cover the 100% of failures.
503+
So to do that at this moment as suggested by Confluent we need to ask again the broker location,
504+
until this JIRA issue will not be addressed: https://issues.apache.org/jira/browse/KAFKA-8597.
487505

488506
For the Neo4j extension you prefix them with `streams.sink.` in the Neo4j configuration.
489507

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import streams.service.TopicType
2121
import streams.service.TopicTypeGroup
2222
import streams.utils.StreamsUtils
2323
import streams.utils.retryForException
24-
import java.lang.RuntimeException
25-
import java.util.concurrent.CompletionException
2624
import java.util.concurrent.CopyOnWriteArraySet
2725
import java.util.concurrent.TimeUnit
2826
import java.util.concurrent.TimeoutException

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,15 @@ import org.apache.kafka.common.config.AbstractConfig
1010
import org.apache.kafka.common.config.ConfigDef
1111
import org.apache.kafka.common.config.ConfigException
1212
import org.apache.kafka.connect.sink.SinkTask
13-
import org.neo4j.csv.reader.Magic.define
1413
import org.neo4j.driver.internal.async.pool.PoolSettings
1514
import org.neo4j.driver.v1.Config
16-
import streams.kafka.connect.sink.ConfigGroup.ERROR_REPORTING
1715
import streams.kafka.connect.utils.PropertiesUtil
1816
import streams.service.TopicType
1917
import streams.service.TopicUtils
2018
import streams.service.Topics
21-
import streams.service.errors.ErrorService
2219
import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
2320
import java.io.File
2421
import java.net.URI
25-
import java.net.URISyntaxException
2622
import java.util.concurrent.TimeUnit
2723

2824
enum class AuthenticationType {
@@ -71,6 +67,8 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
7167

7268
val sourceIdStrategyConfig: SourceIdIngestionStrategyConfig
7369

70+
val kafkaBrokerProperties: Map<String, Any?>
71+
7472
init {
7573
encryptionEnabled = getBoolean(ENCRYPTION_ENABLED)
7674
encryptionTrustStrategy = ConfigUtils
@@ -106,6 +104,10 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
106104
strategyMap = TopicUtils.toStrategyMap(topics, sourceIdStrategyConfig)
107105

108106
parallelBatches = getBoolean(BATCH_PARALLELIZE)
107+
val kafkaPrefix = "kafka."
108+
kafkaBrokerProperties = originals
109+
.filterKeys { it.toString().startsWith(kafkaPrefix) }
110+
.mapKeys { it.key.toString().substring(kafkaPrefix.length) }
109111
validateAllTopics(originals)
110112
}
111113

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import org.apache.kafka.connect.sink.SinkRecord
77
import org.apache.kafka.connect.sink.SinkTask
88
import org.slf4j.Logger
99
import org.slf4j.LoggerFactory
10-
import streams.service.errors.*
10+
import streams.extensions.asProperties
11+
import streams.service.errors.*
1112
import streams.utils.StreamsUtils
1213
import java.lang.Exception
1314
import java.util.*
@@ -17,38 +18,37 @@ class Neo4jSinkTask : SinkTask() {
1718
private val log: Logger = LoggerFactory.getLogger(Neo4jSinkTask::class.java)
1819
private lateinit var config: Neo4jSinkConnectorConfig
1920
private lateinit var neo4jService: Neo4jService
20-
// private lateinit var errorService: ErrorService
21+
private lateinit var errorService: ErrorService
2122

2223
override fun version(): String {
2324
return VersionUtil.version(this.javaClass as Class<*>)
2425
}
2526

2627
override fun start(map: Map<String, String>) {
2728
this.config = Neo4jSinkConnectorConfig(map)
28-
29-
val kafkaConfig = Properties()
30-
kafkaConfig.putAll(map)
31-
// this.errorService = KafkaErrorService(kafkaConfig, ErrorService.ErrorConfig.from(kafkaConfig), log::error)
3229
this.neo4jService = Neo4jService(this.config)
30+
this.errorService = KafkaErrorService(this.config.kafkaBrokerProperties.asProperties(),
31+
ErrorService.ErrorConfig.from(map.asProperties()),
32+
log::error)
3333
}
3434

3535
override fun put(collection: Collection<SinkRecord>) = runBlocking(Dispatchers.IO) {
3636
if (collection.isEmpty()) {
3737
return@runBlocking
3838
}
39-
// try {
40-
val data = EventBuilder()
41-
.withBatchSize(config.batchSize)
42-
.withTopics(config.topics.allTopics())
43-
.withSinkRecords(collection)
44-
.build()
45-
46-
neo4jService.writeData(data)
47-
// } catch(e:Exception) {
48-
// errorService.report(collection.map {
49-
// ErrorData(it.topic(), it.timestamp(),it.key(), it.value(), it.kafkaPartition(), it.kafkaOffset(), this::class.java,e)
50-
// })
51-
// }
39+
try {
40+
val data = EventBuilder()
41+
.withBatchSize(config.batchSize)
42+
.withTopics(config.topics.allTopics())
43+
.withSinkRecords(collection)
44+
.build()
45+
46+
neo4jService.writeData(data)
47+
} catch(e:Exception) {
48+
errorService.report(collection.map {
49+
ErrorData(it.topic(), it.timestamp(),it.key(), it.value(), it.kafkaPartition(), it.kafkaOffset(), this::class.java,e)
50+
})
51+
}
5252
}
5353

5454
override fun stop() {

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfigTest.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package streams.kafka.connect.sink
22

3+
import org.apache.kafka.clients.CommonClientConfigs
4+
import org.apache.kafka.clients.producer.ProducerConfig
35
import org.apache.kafka.common.config.ConfigException
46
import org.apache.kafka.connect.sink.SinkConnector
57
import org.junit.Test
@@ -46,10 +48,14 @@ class Neo4jSinkConnectorConfigTest {
4648
"${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo" to "CREATE (p:Person{name: event.firstName})",
4749
Neo4jSinkConnectorConfig.SERVER_URI to "$a,$b", // Check for string trimming
4850
Neo4jSinkConnectorConfig.BATCH_SIZE to 10,
51+
"kafka.${CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG}" to "broker:9093",
52+
"kafka.${ProducerConfig.ACKS_CONFIG}" to 1,
4953
Neo4jSinkConnectorConfig.AUTHENTICATION_BASIC_USERNAME to "FOO",
5054
Neo4jSinkConnectorConfig.AUTHENTICATION_BASIC_PASSWORD to "BAR")
5155
val config = Neo4jSinkConnectorConfig(originals)
5256

57+
assertEquals(mapOf(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG to "broker:9093",
58+
ProducerConfig.ACKS_CONFIG to 1), config.kafkaBrokerProperties)
5359
assertEquals(originals["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo"], config.topics.cypherTopics["foo"])
5460
assertFalse { config.encryptionEnabled }
5561
assertEquals(a, config.serverUri.get(0).toString())

0 commit comments

Comments
 (0)