Skip to content

Commit 4b96362

Browse files
authored
Merge pull request #204 from conker84/issue_177
fixes #177: Create a Dead Letter Queue
2 parents fb2d19c + 2977206 commit 4b96362

File tree

8 files changed

+341
-12
lines changed

8 files changed

+341
-12
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package streams.service.dlq
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord
4+
5+
6+
data class DLQData(val originalTopic: String,
7+
val timestamp: Long,
8+
val key: ByteArray,
9+
val value: ByteArray,
10+
val partition: String,
11+
val offset: String,
12+
val executingClass: Class<*>?,
13+
val exception: Exception?) {
14+
15+
companion object {
16+
fun from(consumerRecord: ConsumerRecord<String, ByteArray>, exception: Exception?, executingClass: Class<*>?): DLQData {
17+
return DLQData(offset = consumerRecord.offset().toString(),
18+
originalTopic = consumerRecord.topic(),
19+
partition = consumerRecord.partition().toString(),
20+
timestamp = consumerRecord.timestamp(),
21+
exception = exception,
22+
executingClass = executingClass,
23+
key = consumerRecord.key().toByteArray(),
24+
value = consumerRecord.value())
25+
}
26+
}
27+
}
28+
29+
abstract class DeadLetterQueueService(private val config: Map<String, Any>,
30+
headerPrefix: String) {
31+
32+
val ERROR_HEADER_ORIG_TOPIC = headerPrefix + "topic"
33+
val ERROR_HEADER_ORIG_PARTITION = headerPrefix + "partition"
34+
val ERROR_HEADER_ORIG_OFFSET = headerPrefix + "offset"
35+
val ERROR_HEADER_EXECUTING_CLASS = headerPrefix + "class.name"
36+
val ERROR_HEADER_EXCEPTION = headerPrefix + "exception.class.name"
37+
val ERROR_HEADER_EXCEPTION_MESSAGE = headerPrefix + "exception.message"
38+
val ERROR_HEADER_EXCEPTION_STACK_TRACE = headerPrefix + "exception.stacktrace"
39+
40+
abstract fun send(deadLetterQueueTopic: String, dlqData: DLQData)
41+
42+
abstract fun close()
43+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package streams.service.dlq
2+
3+
import org.apache.commons.lang3.exception.ExceptionUtils
4+
import org.apache.kafka.clients.producer.KafkaProducer
5+
import org.apache.kafka.clients.producer.Producer
6+
import org.apache.kafka.clients.producer.ProducerConfig
7+
import org.apache.kafka.clients.producer.ProducerRecord
8+
import org.apache.kafka.common.record.RecordBatch
9+
import org.apache.kafka.common.serialization.ByteArraySerializer
10+
import org.neo4j.util.VisibleForTesting
11+
import java.util.*
12+
13+
class KafkaDLQService: DeadLetterQueueService {
14+
15+
private var producer: Producer<ByteArray, ByteArray>
16+
17+
constructor(config: Map<String, Any>,
18+
headerPrefix: String): super(config, headerPrefix) { // "__connect.errors."
19+
val props = Properties()
20+
props.putAll(config)
21+
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java.name
22+
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java.name
23+
producer = KafkaProducer(props)
24+
}
25+
26+
@VisibleForTesting
27+
constructor(producer: Producer<ByteArray, ByteArray>, headerPrefix: String = ""): super(emptyMap(), headerPrefix) {
28+
this.producer = producer
29+
}
30+
31+
override fun send(deadLetterQueueTopic: String, dlqData: DLQData) {
32+
val producerRecord = if (dlqData.timestamp == RecordBatch.NO_TIMESTAMP) {
33+
ProducerRecord(deadLetterQueueTopic, null,
34+
dlqData.key, dlqData.value)
35+
} else {
36+
ProducerRecord(deadLetterQueueTopic, null, dlqData.timestamp,
37+
dlqData.key, dlqData.value)
38+
}
39+
val producerHeader = producerRecord.headers()
40+
populateContextHeaders(dlqData).forEach { key, value -> producerHeader.add(key, value) }
41+
producer.send(producerRecord)
42+
}
43+
44+
@VisibleForTesting
45+
fun populateContextHeaders(dlqData: DLQData): Map<String, ByteArray> {
46+
val headers = mutableMapOf<String, ByteArray>()
47+
headers[ERROR_HEADER_ORIG_TOPIC] = dlqData.originalTopic.toByteArray()
48+
headers[ERROR_HEADER_ORIG_PARTITION] = dlqData.partition.toByteArray()
49+
headers[ERROR_HEADER_ORIG_OFFSET] = dlqData.offset.toByteArray()
50+
if (dlqData.executingClass != null) {
51+
headers[ERROR_HEADER_EXECUTING_CLASS] = dlqData.executingClass.name.toByteArray()
52+
}
53+
if (dlqData.exception != null) {
54+
headers[ERROR_HEADER_EXCEPTION] = dlqData.exception.javaClass.name.toByteArray()
55+
if (dlqData.exception.message != null) {
56+
headers[ERROR_HEADER_EXCEPTION_MESSAGE] = dlqData.exception.message.toString().toByteArray()
57+
}
58+
headers[ERROR_HEADER_EXCEPTION_STACK_TRACE] = ExceptionUtils.getStackTrace(dlqData.exception).toByteArray()
59+
}
60+
return headers
61+
}
62+
63+
override fun close() {
64+
this.producer.close()
65+
}
66+
67+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package streams.service.sink.dlq
2+
3+
import org.apache.commons.lang3.exception.ExceptionUtils
4+
import org.apache.kafka.clients.producer.MockProducer
5+
import org.apache.kafka.clients.producer.ProducerRecord
6+
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata
7+
import org.apache.kafka.common.record.RecordBatch
8+
import org.junit.Test
9+
import org.mockito.ArgumentMatchers
10+
import org.mockito.Mockito
11+
import streams.service.dlq.DLQData
12+
import streams.service.dlq.KafkaDLQService
13+
import java.util.concurrent.atomic.AtomicInteger
14+
import kotlin.test.assertEquals
15+
16+
class KafkaDLQServiceTest {
17+
@Test
18+
fun `should send the data to the DLQ`() {
19+
val producer: MockProducer<ByteArray, ByteArray> = Mockito.mock(MockProducer::class.java) as MockProducer<ByteArray, ByteArray>
20+
val counter = AtomicInteger(0)
21+
Mockito.`when`(producer.send(ArgumentMatchers.any<ProducerRecord<ByteArray, ByteArray>>())).then {
22+
counter.incrementAndGet()
23+
FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0)
24+
}
25+
val dlqService = KafkaDLQService(producer)
26+
val offset = "0"
27+
val originalTopic = "topicName"
28+
val partition = "1"
29+
val timestamp = System.currentTimeMillis()
30+
val exception = RuntimeException("Test")
31+
val key = "KEY"
32+
val value = "VALUE"
33+
val dlqData = DLQData(offset = offset,
34+
originalTopic = originalTopic,
35+
partition = partition,
36+
timestamp = timestamp,
37+
exception = exception,
38+
executingClass = KafkaDLQServiceTest::class.java,
39+
key = key.toByteArray(),
40+
value = value.toByteArray())
41+
dlqService.send("dlqTopic", dlqData)
42+
assertEquals(1, counter.get())
43+
dlqService.close()
44+
}
45+
46+
47+
@Test
48+
fun `should create the header map`() {
49+
val producer: MockProducer<ByteArray, ByteArray> = Mockito.mock(MockProducer::class.java) as MockProducer<ByteArray, ByteArray>
50+
val dlqService = KafkaDLQService(producer)
51+
val offset = "0"
52+
val originalTopic = "topicName"
53+
val partition = "1"
54+
val timestamp = System.currentTimeMillis()
55+
val exception = RuntimeException("Test")
56+
val key = "KEY"
57+
val value = "VALUE"
58+
val dlqData = DLQData(
59+
offset = offset,
60+
originalTopic = originalTopic,
61+
partition = partition,
62+
timestamp = timestamp,
63+
exception = exception,
64+
executingClass = KafkaDLQServiceTest::class.java,
65+
key = key.toByteArray(),
66+
value = value.toByteArray()
67+
)
68+
val map = dlqService.populateContextHeaders(dlqData)
69+
assertEquals(String(map["topic"]!!), originalTopic)
70+
assertEquals(String(map["partition"]!!), partition)
71+
assertEquals(String(map["offset"]!!), offset)
72+
assertEquals(String(map["class.name"]!!), KafkaDLQServiceTest::class.java.name)
73+
assertEquals(String(map["exception.class.name"]!!), exception::class.java.name)
74+
assertEquals(String(map["exception.message"]!!), exception.message)
75+
assertEquals(String(map["exception.stacktrace"]!!), ExceptionUtils.getStackTrace(exception))
76+
77+
}
78+
}

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package streams
33
import org.neo4j.kernel.configuration.Config
44
import org.neo4j.kernel.internal.GraphDatabaseAPI
55
import org.neo4j.logging.Log
6+
import streams.service.dlq.DeadLetterQueueService
67

78
abstract class StreamsEventSink(private val config: Config,
89
private val queryExecution: StreamsEventSinkQueryExecution,
@@ -20,7 +21,7 @@ abstract class StreamsEventSink(private val config: Config,
2021

2122
}
2223

23-
abstract class StreamsEventConsumer(private val log: Log) {
24+
abstract class StreamsEventConsumer(private val log: Log, private val dlqService: DeadLetterQueueService?) {
2425

2526
abstract fun stop()
2627

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ data class StreamsSinkConfiguration(val enabled: Boolean = false,
1616
val proceduresEnabled: Boolean = true,
1717
val sinkPollingInterval: Long = 10000,
1818
val topics: Topics = Topics(),
19+
val dlqTopic: String = "",
1920
val sourceIdStrategyConfig: SourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()) {
2021

2122
companion object {
@@ -39,11 +40,14 @@ data class StreamsSinkConfiguration(val enabled: Boolean = false,
3940
config.getOrDefault("sink.topic.cdc.sourceId.labelName", defaultSourceIdStrategyConfig.labelName),
4041
config.getOrDefault("sink.topic.cdc.sourceId.idName", defaultSourceIdStrategyConfig.idName))
4142

43+
val dlqTopic = config.getOrDefault("sink.dlq", "")
44+
4245
return default.copy(enabled = config.getOrDefault(StreamsSinkConfigurationConstants.ENABLED, default.enabled).toString().toBoolean(),
4346
proceduresEnabled = config.getOrDefault(StreamsSinkConfigurationConstants.PROCEDURES_ENABLED, default.proceduresEnabled)
4447
.toString().toBoolean(),
4548
sinkPollingInterval = config.getOrDefault("sink.polling.interval", default.sinkPollingInterval).toString().toLong(),
4649
topics = topics,
50+
dlqTopic = dlqTopic,
4751
sourceIdStrategyConfig = sourceIdStrategyConfig)
4852
}
4953

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package streams.kafka
22

33
import kotlinx.coroutines.*
4-
import org.apache.kafka.clients.consumer.ConsumerConfig
5-
import org.apache.kafka.clients.consumer.ConsumerRecord
6-
import org.apache.kafka.clients.consumer.KafkaConsumer
7-
import org.apache.kafka.clients.consumer.OffsetAndMetadata
4+
import org.apache.kafka.clients.consumer.*
85
import org.apache.kafka.common.TopicPartition
96
import org.neo4j.kernel.configuration.Config
107
import org.neo4j.kernel.internal.GraphDatabaseAPI
@@ -13,7 +10,11 @@ import streams.*
1310
import streams.extensions.offsetAndMetadata
1411
import streams.extensions.topicPartition
1512
import streams.serialization.JSONUtils
13+
import streams.service.dlq.DLQData
14+
import streams.service.dlq.KafkaDLQService
1615
import streams.utils.Neo4jUtils
16+
import streams.utils.StreamsUtils
17+
import java.util.concurrent.ConcurrentHashMap
1718
import java.util.concurrent.TimeUnit
1819

1920

@@ -41,10 +42,20 @@ class KafkaEventSink(private val config: Config,
4142
return object: StreamsEventConsumerFactory() {
4243
override fun createStreamsEventConsumer(config: Map<String, String>, log: Log): StreamsEventConsumer {
4344
val kafkaConfig = KafkaSinkConfiguration.from(config)
45+
val dlqService = if (kafkaConfig.streamsSinkConfiguration.dlqTopic.isNotBlank()) {
46+
val asProperties = kafkaConfig.asProperties()
47+
.mapKeys { it.key.toString() }
48+
.toMutableMap()
49+
asProperties.remove(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
50+
asProperties.remove(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
51+
KafkaDLQService(asProperties, "__streams.errors")
52+
} else {
53+
null
54+
}
4455
return if (kafkaConfig.enableAutoCommit) {
45-
KafkaAutoCommitEventConsumer(kafkaConfig, log)
56+
KafkaAutoCommitEventConsumer(kafkaConfig, log, dlqService)
4657
} else {
47-
KafkaManualCommitEventConsumer(kafkaConfig, log)
58+
KafkaManualCommitEventConsumer(kafkaConfig, log, dlqService)
4859
}
4960
}
5061
}
@@ -153,7 +164,8 @@ data class KafkaTopicConfig(val commit: Boolean, val topicPartitionsMap: Map<Top
153164
}
154165

155166
open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfiguration,
156-
private val log: Log): StreamsEventConsumer(log) {
167+
private val log: Log,
168+
private val dlqService: KafkaDLQService?): StreamsEventConsumer(log, dlqService) {
157169

158170
private var isSeekSet = false
159171

@@ -176,6 +188,7 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
176188

177189
override fun stop() {
178190
consumer.close()
191+
dlqService?.close()
179192
}
180193

181194
private fun readSimple(action: (String, List<Any>) -> Unit) {
@@ -190,7 +203,9 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
190203
try {
191204
action(topic, convert(topicRecords))
192205
} catch (e: Exception) {
193-
// TODO send to the DLQ
206+
topicRecords
207+
.map { DLQData.from(it, e, this::class.java) }
208+
.forEach{ sentToDLQ(it) }
194209
}
195210
}
196211

@@ -199,15 +214,20 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
199214
try {
200215
"ok" to JSONUtils.readValue<Any>(it.value())
201216
} catch (e: Exception) {
202-
"error" to it
217+
"error" to DLQData.from(it, e, this::class.java)
203218
}
204219
}
205220
.groupBy({ it.first }, { it.second })
206221
.let {
207-
// TODO send content of the "error" key to the DLQ
222+
it.getOrDefault("error", emptyList<DLQData>())
223+
.forEach{ sentToDLQ(it as DLQData) }
208224
it.getOrDefault("ok", emptyList())
209225
}
210226

227+
private fun sentToDLQ(dlqData: DLQData) {
228+
dlqService?.send(config.streamsSinkConfiguration.dlqTopic, dlqData)
229+
}
230+
211231
private fun readFromPartition(config: KafkaTopicConfig, action: (String, List<Any>) -> Unit) {
212232
setSeek(config.topicPartitionsMap)
213233
val records = consumer.poll(0)
@@ -248,7 +268,10 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
248268
}
249269

250270
class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
251-
private val log: Log): KafkaAutoCommitEventConsumer(config, log) {
271+
private val log: Log,
272+
private val dlqService: KafkaDLQService?): KafkaAutoCommitEventConsumer(config, log, dlqService) {
273+
274+
private val topicPartitionOffsetMap = ConcurrentHashMap<TopicPartition, OffsetAndMetadata>()
252275

253276
override fun start() {
254277
if (topics.isEmpty()) {
@@ -295,6 +318,7 @@ class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
295318

296319
override fun read(action: (String, List<Any>) -> Unit) {
297320
val topicMap = readSimple(action)
321+
topicPartitionOffsetMap += topicMap
298322
commitData(true, topicMap)
299323
}
300324

@@ -305,6 +329,7 @@ class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
305329
} else {
306330
readFromPartition(kafkaTopicConfig, action)
307331
}
332+
topicPartitionOffsetMap += topicMap
308333
commitData(kafkaTopicConfig.commit, topicMap)
309334
}
310335
}

0 commit comments

Comments
 (0)