Skip to content

Commit 3fd6176

Browse files
jexpmoxious
authored andcommitted
Improved DeadletterQueue, Logging and Error Handling (#231)
* Improved DeadletterQueue, Logging and Error Handling * Handle null keys and values in ConsumerRecord for ErrorData * Addressed review comments
1 parent d7e1fa5 commit 3fd6176

File tree

26 files changed

+492
-243
lines changed

26 files changed

+492
-243
lines changed

common/src/main/kotlin/streams/extensions/CommonExtensions.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package streams.extensions
22

3+
import com.fasterxml.jackson.core.JsonParseException
34
import org.apache.kafka.clients.consumer.ConsumerRecord
45
import org.apache.kafka.clients.consumer.OffsetAndMetadata
56
import org.apache.kafka.common.TopicPartition
@@ -37,5 +38,5 @@ fun <K, V> ConsumerRecord<K, V>.topicPartition() = TopicPartition(this.topic(),
3738
fun <K, V> ConsumerRecord<K, V>.offsetAndMetadata(metadata: String = "") = OffsetAndMetadata(this.offset() + 1, metadata)
3839

3940
fun ConsumerRecord<ByteArray, ByteArray>.toStreamsSinkEntity(): StreamsSinkEntity = StreamsSinkEntity(
40-
if (this.key() != null) try { JSONUtils.readValue<Any>(this.key()) } catch (e: Exception) { String(this.key()) } else null,
41-
if (this.value() != null) JSONUtils.readValue<Any>(this.value()) else null)
41+
if (this.key() == null) null else JSONUtils.readValue<Any>(this.key(), true),
42+
if (this.value() == null) null else JSONUtils.readValue<Any>(this.value()))

common/src/main/kotlin/streams/service/dlq/DeadLetterQueueService.kt

Lines changed: 0 additions & 43 deletions
This file was deleted.

common/src/main/kotlin/streams/service/dlq/KafkaDLQService.kt

Lines changed: 0 additions & 67 deletions
This file was deleted.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package streams.service.errors
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord
4+
import org.apache.kafka.common.record.RecordBatch
5+
import java.lang.RuntimeException
6+
import java.util.*
7+
8+
9+
data class ErrorData(val originalTopic: String,
10+
val timestamp: Long,
11+
val key: ByteArray?,
12+
val value: ByteArray?,
13+
val partition: String,
14+
val offset: String,
15+
val executingClass: Class<*>?,
16+
val exception: Exception?) {
17+
18+
constructor(originalTopic: String, timestamp: Long?, key: Any?, value: Any?,
19+
partition: Int, offset: Long, executingClass: Class<*>?, exception: Exception?) :
20+
this(originalTopic, timestamp ?: RecordBatch.NO_TIMESTAMP, toByteArray(key), toByteArray(value), partition.toString(),offset.toString(), executingClass, exception)
21+
22+
companion object {
23+
fun from(consumerRecord: ConsumerRecord<ByteArray, ByteArray>, exception: Exception?, executingClass: Class<*>?): ErrorData {
24+
return ErrorData(offset = consumerRecord.offset().toString(),
25+
originalTopic = consumerRecord.topic(),
26+
partition = consumerRecord.partition().toString(),
27+
timestamp = consumerRecord.timestamp(),
28+
exception = exception,
29+
executingClass = executingClass,
30+
key = consumerRecord.key(),
31+
value = consumerRecord.value())
32+
}
33+
fun toByteArray(v:Any?) = try {
34+
when (v) {
35+
null -> null
36+
is ByteArray -> v
37+
else -> v.toString().toByteArray(Charsets.UTF_8)
38+
}
39+
} catch (e:Exception) {
40+
null
41+
}
42+
}
43+
fun toLogString() =
44+
"""
45+
ErrorData(originalTopic=$originalTopic, timestamp=$timestamp, partition=$partition, offset=$offset, exception=$exception, key=${key?.toString(Charsets.UTF_8)}, value=${value?.sliceArray(0..Math.min(value.size,200)-1)?.toString(Charsets.UTF_8)}, executingClass=$executingClass)
46+
""".trimIndent()
47+
48+
}
49+
50+
abstract class ErrorService(private val config: Map<String, Any> = emptyMap()) {
51+
52+
data class ErrorConfig(val fail:Boolean=false, val log:Boolean=false, val logMessages:Boolean=false,
53+
val dlqTopic:String? = null, val dlqHeaderPrefix:String = "", val dlqHeaders:Boolean = false, val dlqReplication: Int? = 3) {
54+
55+
/*
56+
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues
57+
"errors.retry.timeout": "-1",
58+
"errors.retry.delay.max.ms": "1000",
59+
60+
"errors.tolerance": "all", "none" == fail-fast, abort sink task
61+
62+
fail-fast for configuration errors (e.g. validate cypher statements on start)
63+
errors.tolerance = all -> silently ignore all bad messages
64+
65+
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java
66+
67+
68+
"errors.log.enable": true,
69+
"errors.deadletterqueue.context.header.enable"=true/false
70+
"errors.deadletterqueue.topic.name": "test-error-topic",
71+
"errors.deadletterqueue.topic.replication.factor": 1,
72+
"errors.log.include.messages": true,
73+
*/
74+
75+
companion object {
76+
const val TOLERANCE = "errors.tolerance"
77+
const val LOG = "errors.log.enable"
78+
const val LOG_MESSAGES = "errors.log.include.messages"
79+
const val DLQ_TOPIC = "errors.deadletterqueue.topic.name"
80+
const val DLQ_HEADERS = "errors.deadletterqueue.context.header.enable"
81+
const val DLQ_HEADER_PREFIX = "errors.deadletterqueue.context.header.prefix"
82+
const val DLQ_REPLICATION = "errors.deadletterqueue.topic.replication.factor"
83+
84+
fun from(props: Properties) = from(props.toMap() as Map<String, Any>)
85+
86+
fun boolean(v:Any?) = when (v) {
87+
null -> false
88+
"true" -> true
89+
"false" -> false
90+
is Boolean -> v
91+
else -> false
92+
}
93+
fun int(v:Any?) = when (v) {
94+
null -> 0
95+
is Int -> v
96+
is String -> v.toInt()
97+
else -> 0
98+
}
99+
100+
fun from(config: Map<String, Any?>) =
101+
ErrorConfig(
102+
fail = config.getOrDefault(TOLERANCE, "none") == "none",
103+
log = boolean(config.get(LOG)),
104+
logMessages = boolean(config.get(LOG_MESSAGES)),
105+
dlqTopic = config.get(DLQ_TOPIC) as String?,
106+
dlqHeaders = boolean(config.get(DLQ_HEADERS)),
107+
dlqHeaderPrefix = config.getOrDefault(DLQ_HEADER_PREFIX,"") as String,
108+
dlqReplication = int(config.getOrDefault(DLQ_REPLICATION, 3)))
109+
}
110+
}
111+
112+
abstract fun report(errorDatas: List<ErrorData>)
113+
114+
open fun close() {}
115+
}
116+
117+
class ProcessingError(val errorDatas: List<ErrorData>) :
118+
RuntimeException("Error processing ${errorDatas.size} messages\n"+errorDatas.map { it.toLogString() }.joinToString("\n"))
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package streams.service.errors
2+
3+
import org.apache.commons.lang3.exception.ExceptionUtils
4+
import org.apache.kafka.clients.producer.KafkaProducer
5+
import org.apache.kafka.clients.producer.Producer
6+
import org.apache.kafka.clients.producer.ProducerConfig
7+
import org.apache.kafka.clients.producer.ProducerRecord
8+
import org.apache.kafka.common.record.RecordBatch
9+
import org.apache.kafka.common.serialization.ByteArraySerializer
10+
import org.neo4j.util.VisibleForTesting
11+
import java.util.*
12+
13+
class KafkaErrorService(private val producer: Producer<ByteArray, ByteArray>?, private val errorConfig: ErrorConfig, private val log: (String, Exception?)->Unit): ErrorService() {
14+
15+
constructor(config: Properties, errorConfig: ErrorConfig,
16+
log: (String, Exception?) -> Unit) : this(producer(errorConfig, config), errorConfig, log)
17+
18+
companion object {
19+
private fun producer(errorConfig: ErrorConfig, config: Properties) =
20+
errorConfig.dlqTopic?.let {
21+
val props = Properties()
22+
props.putAll(config)
23+
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java.name
24+
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java.name
25+
KafkaProducer<ByteArray,ByteArray>(props)
26+
}
27+
}
28+
29+
override fun report(errorDatas: List<ErrorData>) {
30+
if (errorConfig.fail) throw ProcessingError(errorDatas)
31+
if (errorConfig.log) {
32+
if (errorConfig.logMessages) {
33+
errorDatas.forEach{log(it.toLogString(),it.exception)}
34+
} else {
35+
errorDatas.map { it.exception }.distinct().forEach{log("Error processing ${errorDatas.size} messages",it)}
36+
}
37+
}
38+
if (errorConfig.dlqTopic != null && producer != null) {
39+
errorDatas.forEach { dlqData ->
40+
try {
41+
val producerRecord = if (dlqData.timestamp == RecordBatch.NO_TIMESTAMP) {
42+
ProducerRecord(errorConfig.dlqTopic, null, dlqData.key, dlqData.value)
43+
} else {
44+
ProducerRecord(errorConfig.dlqTopic, null, dlqData.timestamp, dlqData.key, dlqData.value)
45+
}
46+
if (errorConfig.dlqHeaders) {
47+
val producerHeader = producerRecord.headers()
48+
populateContextHeaders(dlqData).forEach { (key, value) -> producerHeader.add(key, value) }
49+
}
50+
producer.send(producerRecord)
51+
} catch (e: Exception) {
52+
log("Error writing to DLQ $e :${dlqData.toLogString()}",e) // todo only the first or all
53+
}
54+
}
55+
}
56+
}
57+
58+
@VisibleForTesting
59+
fun populateContextHeaders(errorData: ErrorData): Map<String, ByteArray> {
60+
fun prefix(suffix: String) = errorConfig.dlqHeaderPrefix + suffix
61+
62+
val headers = mutableMapOf(
63+
prefix("topic") to errorData.originalTopic.toByteArray(),
64+
prefix("partition") to errorData.partition.toByteArray(),
65+
prefix("offset") to errorData.offset.toByteArray())
66+
67+
if (errorData.executingClass != null) {
68+
headers[prefix("class.name")] = errorData.executingClass.name.toByteArray()
69+
}
70+
if (errorData.exception != null) {
71+
headers[prefix("exception.class.name")] = errorData.exception.javaClass.name.toByteArray()
72+
if (errorData.exception.message != null) {
73+
headers[prefix("exception.message")] = errorData.exception.message.toString().toByteArray()
74+
}
75+
headers[prefix("exception.stacktrace")] = ExceptionUtils.getStackTrace(errorData.exception).toByteArray()
76+
}
77+
return headers
78+
}
79+
80+
81+
override fun close() {
82+
this.producer?.close()
83+
}
84+
85+
}

common/src/main/kotlin/streams/utils/JSONUtils.kt

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package streams.serialization
22

33
import com.fasterxml.jackson.core.JsonGenerator
4+
import com.fasterxml.jackson.core.JsonParseException
45
import com.fasterxml.jackson.core.JsonProcessingException
56
import com.fasterxml.jackson.databind.*
67
import com.fasterxml.jackson.databind.module.SimpleModule
@@ -14,6 +15,7 @@ import streams.events.*
1415
import streams.utils.StreamsUtils
1516
import java.io.IOException
1617
import java.time.temporal.TemporalAccessor
18+
import kotlin.reflect.full.isSubclassOf
1719

1820
abstract class StreamsPoint { abstract val crs: String }
1921
data class StreamsPointCartesian(override val crs: String, val x: Double, val y: Double, val z: Double? = null): StreamsPoint()
@@ -112,12 +114,23 @@ object JSONUtils {
112114
return getObjectMapper().readValue(value, T::class.java)
113115
}
114116

115-
inline fun <reified T> readValue(value: Any): T {
117+
inline fun <reified T> readValue(value: Any, stringWhenFailure:Boolean = false): T {
116118
val strValue = when (value) {
117119
is String -> value
120+
is ByteArray -> String(value)
118121
else -> getObjectMapper().writeValueAsString(value)
119122
}
120-
return getObjectMapper().readValue(strValue)
123+
return try {
124+
getObjectMapper().readValue(strValue)
125+
} catch(e: JsonParseException) {
126+
if (stringWhenFailure && String::class.isSubclassOf(T::class)) {
127+
strValue.trimStart().let {
128+
if (it.get(0) == '{' || it.get(0) == '[') throw e
129+
else it as T
130+
}
131+
}
132+
else throw e
133+
}
121134
}
122135

123136
inline fun <reified T> convertValue(value: Any): T {

0 commit comments

Comments
 (0)