Skip to content

Commit 0b177be

Browse files
authored
Fixes #356: added a database field to the message in the DLQ (#448)
* fixes #356 * changed empty handling
1 parent 2f52b30 commit 0b177be

File tree

11 files changed

+134
-20
lines changed

11 files changed

+134
-20
lines changed

common/src/main/kotlin/streams/service/errors/ErrorService.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,25 @@ data class ErrorData(val originalTopic: String,
1515
val partition: String,
1616
val offset: String,
1717
val executingClass: Class<*>?,
18+
val databaseName: String?,
1819
val exception: Exception?) {
1920

2021
constructor(originalTopic: String, timestamp: Long?, key: Any?, value: Any?,
21-
partition: Int, offset: Long, executingClass: Class<*>?, exception: Exception?) :
22-
this(originalTopic, timestamp ?: RecordBatch.NO_TIMESTAMP, toByteArray(key), toByteArray(value), partition.toString(),offset.toString(), executingClass, exception)
22+
partition: Int, offset: Long, executingClass: Class<*>?, databaseName: String?, exception: Exception?) :
23+
this(originalTopic, timestamp ?: RecordBatch.NO_TIMESTAMP, toByteArray(key), toByteArray(value), partition.toString(),offset.toString(), executingClass, databaseName, exception)
2324

2425
companion object {
2526

26-
fun from(consumerRecord: ConsumerRecord<out Any, out Any>, exception: Exception?, executingClass: Class<*>?): ErrorData {
27+
fun from(consumerRecord: ConsumerRecord<out Any, out Any>, exception: Exception?, executingClass: Class<*>?, databaseName: String?): ErrorData {
2728
return ErrorData(offset = consumerRecord.offset().toString(),
2829
originalTopic = consumerRecord.topic(),
2930
partition = consumerRecord.partition().toString(),
3031
timestamp = consumerRecord.timestamp(),
3132
exception = exception,
3233
executingClass = executingClass,
3334
key = toByteArray(consumerRecord.key()),
34-
value = toByteArray(consumerRecord.value()))
35+
value = toByteArray(consumerRecord.value()),
36+
databaseName = databaseName)
3537
}
3638

3739
fun toByteArray(v:Any?) = try {

common/src/main/kotlin/streams/service/errors/KafkaErrorService.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ class KafkaErrorService(private val producer: Producer<ByteArray, ByteArray>?, p
7272
prefix("partition") to errorData.partition.toByteArray(),
7373
prefix("offset") to errorData.offset.toByteArray())
7474

75+
if (!errorData.databaseName.isNullOrBlank()) {
76+
headers[prefix("databaseName")] = errorData.databaseName.toByteArray()
77+
}
78+
7579
if (errorData.executingClass != null) {
7680
headers[prefix("class.name")] = errorData.executingClass.name.toByteArray()
7781
}

common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class KafkaErrorServiceTest {
5959
val exception = RuntimeException("Test")
6060
val key = "KEY"
6161
val value = "VALUE"
62+
val databaseName = "myDb"
6263
return ErrorData(
6364
offset = offset,
6465
originalTopic = originalTopic,
@@ -67,7 +68,8 @@ class KafkaErrorServiceTest {
6768
exception = exception,
6869
executingClass = KafkaErrorServiceTest::class.java,
6970
key = key.toByteArray(),
70-
value = value.toByteArray()
71+
value = value.toByteArray(),
72+
databaseName = databaseName
7173
)
7274
}
7375

consumer/src/main/kotlin/streams/kafka/KafkaAutoCommitEventConsumer.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ abstract class KafkaEventConsumer(config: KafkaSinkConfiguration,
4949

5050
open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfiguration,
5151
private val log: Log,
52-
val topics: Set<String>): KafkaEventConsumer(config, log, topics) {
52+
val topics: Set<String>,
53+
private val dbName: String): KafkaEventConsumer(config, log, topics) {
5354

5455
private val errorService: ErrorService = KafkaErrorService(config.asProperties(),
5556
ErrorService.ErrorConfig.from(config.streamsSinkConfiguration.errorConfig),
@@ -93,7 +94,7 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
9394
try {
9495
action(topic, topicRecords.map { it.toStreamsSinkEntity() })
9596
} catch (e: Exception) {
96-
errorService.report(topicRecords.map { ErrorData.from(it, e, this::class.java) })
97+
errorService.report(topicRecords.map { ErrorData.from(it, e, this::class.java, dbName) })
9798
}
9899
}
99100

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
1515
import org.apache.kafka.common.errors.WakeupException
1616
import org.neo4j.kernel.internal.GraphDatabaseAPI
1717
import org.neo4j.logging.Log
18-
import streams.*
18+
import streams.StreamsEventConsumer
19+
import streams.StreamsEventConsumerFactory
20+
import streams.StreamsEventSink
21+
import streams.StreamsEventSinkQueryExecution
22+
import streams.StreamsSinkConfiguration
23+
import streams.StreamsTopicService
1924
import streams.config.StreamsConfig
2025
import streams.events.StreamsPluginStatus
2126
import streams.extensions.isDefaultDb
@@ -55,12 +60,13 @@ class KafkaEventSink(private val config: Map<String, String>,
5560
override fun getEventConsumerFactory(): StreamsEventConsumerFactory {
5661
return object: StreamsEventConsumerFactory() {
5762
override fun createStreamsEventConsumer(config: Map<String, String>, log: Log, topics: Set<Any>): StreamsEventConsumer {
58-
val kafkaConfig = KafkaSinkConfiguration.from(config, db.databaseName(), db.isDefaultDb())
63+
val dbName = db.databaseName()
64+
val kafkaConfig = KafkaSinkConfiguration.from(config, dbName, db.isDefaultDb())
5965
val topics1 = topics as Set<String>
6066
return if (kafkaConfig.enableAutoCommit) {
61-
KafkaAutoCommitEventConsumer(kafkaConfig, log, topics1)
67+
KafkaAutoCommitEventConsumer(kafkaConfig, log, topics1, dbName)
6268
} else {
63-
KafkaManualCommitEventConsumer(kafkaConfig, log, topics1)
69+
KafkaManualCommitEventConsumer(kafkaConfig, log, topics1, dbName)
6470
}
6571
}
6672
}

consumer/src/main/kotlin/streams/kafka/KafkaManualCommitEventConsumer.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import java.time.Duration
1313

1414
class KafkaManualCommitEventConsumer(config: KafkaSinkConfiguration,
1515
private val log: Log,
16-
topics: Set<String>): KafkaAutoCommitEventConsumer(config, log, topics) {
16+
topics: Set<String>,
17+
dbName: String): KafkaAutoCommitEventConsumer(config, log, topics, dbName) {
1718

1819
private val asyncCommit = config.streamsAsyncCommit
1920

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkDLQTSE.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class KafkaEventSinkDLQTSE : KafkaEventSinkBaseTSE() {
2828
db.start()
2929
val data = mapOf("id" to null, "name" to "Andrea", "surname" to "Santurbano")
3030

31-
var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(data))
31+
val producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(data))
3232
kafkaProducer.send(producerRecord).get()
3333
val dlqConsumer = KafkaTestUtils.createConsumer<ByteArray, ByteArray>(
3434
bootstrapServers = KafkaEventSinkSuiteIT.kafka.bootstrapServers,
@@ -50,8 +50,9 @@ class KafkaEventSinkDLQTSE : KafkaEventSinkBaseTSE() {
5050
val value = if (record != null) JSONUtils.readValue<Any>(record.value()!!) else emptyMap<String, Any>()
5151
db.execute(query) {
5252
val result = it.columnAs<Long>("count")
53-
!records.isEmpty && headers.size == 7 && value == data && result.hasNext() && result.next() == 0L && !result.hasNext()
53+
!records.isEmpty && headers.size == 8 && value == data && result.hasNext() && result.next() == 0L && !result.hasNext()
5454
&& headers["__streams.errors.exception.class.name"] == "org.neo4j.graphdb.QueryExecutionException"
55+
&& headers["__streams.errors.databaseName"] == "neo4j"
5556
}
5657
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
5758
}
@@ -70,7 +71,7 @@ class KafkaEventSinkDLQTSE : KafkaEventSinkBaseTSE() {
7071

7172
val data = """{id: 1, "name": "Andrea", "surname": "Santurbano"}"""
7273

73-
var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(),
74+
val producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(),
7475
data.toByteArray())
7576
kafkaProducer.send(producerRecord).get()
7677
val dlqConsumer = KafkaTestUtils.createConsumer<ByteArray, ByteArray>(
@@ -91,8 +92,9 @@ class KafkaEventSinkDLQTSE : KafkaEventSinkBaseTSE() {
9192
val record = if (records.isEmpty) null else records.records(dlqTopic).iterator().next()
9293
val headers = record?.headers()?.map { it.key() to String(it.value()) }?.toMap().orEmpty()
9394
val value = if (record != null) String(record.value()) else emptyMap<String, Any>()
94-
!records.isEmpty && headers.size == 7 && data == value && count.hasNext() && count.next() == 0L && !count.hasNext()
95+
!records.isEmpty && headers.size == 8 && data == value && count.hasNext() && count.next() == 0L && !count.hasNext()
9596
&& headers["__streams.errors.exception.class.name"] == "com.fasterxml.jackson.core.JsonParseException"
97+
&& headers["__streams.errors.databaseName"] == "neo4j"
9698
}
9799
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
98100
}

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkEnterpriseTSE.kt

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import kotlinx.coroutines.runBlocking
66
import org.apache.avro.generic.GenericRecord
77
import org.apache.kafka.clients.producer.KafkaProducer
88
import org.apache.kafka.clients.producer.ProducerRecord
9+
import org.apache.kafka.common.serialization.ByteArrayDeserializer
910
import org.hamcrest.Matchers
1011
import org.junit.After
1112
import org.junit.AfterClass
@@ -17,8 +18,8 @@ import org.neo4j.driver.SessionConfig
1718
import org.neo4j.function.ThrowingSupplier
1819
import streams.Assert
1920
import streams.KafkaTestUtils
20-
import streams.MavenUtils
2121
import streams.Neo4jContainerExtension
22+
import streams.service.errors.ErrorService
2223
import streams.utils.JSONUtils
2324
import streams.utils.StreamsUtils
2425
import java.util.UUID
@@ -29,8 +30,10 @@ class KafkaEventSinkEnterpriseTSE {
2930
companion object {
3031

3132
private var startedFromSuite = true
32-
val DB_NAME_NAMES = arrayOf("foo", "bar")
33-
val ALL_DBS = arrayOf("foo", "bar", "baz")
33+
private val DB_NAME_NAMES = arrayOf("foo", "bar", "dlq")
34+
val ALL_DBS = arrayOf("foo", "bar", "baz", "dlq")
35+
private const val DLQ_ERROR_TOPIC = "dlqTopic"
36+
const val DLQ_CYPHER_TOPIC = "dlqCypherTopic"
3437

3538
@JvmStatic
3639
val neo4j = Neo4jContainerExtension()//.withLogging()
@@ -50,6 +53,11 @@ class KafkaEventSinkEnterpriseTSE {
5053
DB_NAME_NAMES.forEach { neo4j.withNeo4jConfig("streams.sink.enabled.to.$it", "true") } // we enable the sink plugin only for the instances
5154
neo4j.withNeo4jConfig("streams.sink.topic.cypher.enterpriseCypherTopic.to.foo", "MERGE (c:Customer_foo {id: event.id, foo: 'foo'})")
5255
neo4j.withNeo4jConfig("streams.sink.topic.cypher.enterpriseCypherTopic.to.bar", "MERGE (c:Customer_bar {id: event.id, bar: 'bar'})")
56+
neo4j.withNeo4jConfig("streams.sink.topic.cypher.$DLQ_CYPHER_TOPIC.to.dlq", "MERGE (c:Customer_dlq {id: event.id, dlq: 'dlq'})")
57+
neo4j.withNeo4jConfig("streams.sink." + ErrorService.ErrorConfig.DLQ_TOPIC, DLQ_ERROR_TOPIC)
58+
neo4j.withNeo4jConfig("streams.sink." + ErrorService.ErrorConfig.DLQ_HEADERS, "true")
59+
neo4j.withNeo4jConfig("streams.sink." + ErrorService.ErrorConfig.DLQ_HEADER_PREFIX, "__streams.errors.")
60+
neo4j.withNeo4jConfig("streams.sink." + ErrorService.ErrorConfig.TOLERANCE, "all")
5361
neo4j.withDatabases(*ALL_DBS)
5462
neo4j.start()
5563
Assume.assumeTrue("Neo4j must be running", neo4j.isRunning)
@@ -126,4 +134,39 @@ class KafkaEventSinkEnterpriseTSE {
126134
nodes.isEmpty()
127135
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
128136
}
137+
138+
@Test
139+
fun `should send data to the DLQ with current databaseName because of JsonParseException`() = runBlocking {
140+
141+
val data = mapOf("id" to null, "name" to "Andrea", "surname" to "Santurbano")
142+
143+
val producerRecord = ProducerRecord(DLQ_CYPHER_TOPIC, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(data))
144+
145+
// when
146+
kafkaProducer.send(producerRecord).get()
147+
delay(5000)
148+
149+
val dlqConsumer = KafkaTestUtils.createConsumer<ByteArray, ByteArray>(
150+
bootstrapServers = KafkaEventSinkSuiteIT.kafka.bootstrapServers,
151+
schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl(),
152+
keyDeserializer = ByteArrayDeserializer::class.java.name,
153+
valueDeserializer = ByteArrayDeserializer::class.java.name,
154+
topics = arrayOf(DLQ_ERROR_TOPIC))
155+
156+
dlqConsumer.let {
157+
Assert.assertEventually(ThrowingSupplier {
158+
val dbName = "dlq"
159+
val nodes = getData(dbName)
160+
val count = nodes.size
161+
val records = dlqConsumer.poll(5000)
162+
val record = if (records.isEmpty) null else records.records(DLQ_ERROR_TOPIC).iterator().next()
163+
val headers = record?.headers()?.map { it.key() to String(it.value()) }?.toMap().orEmpty()
164+
val value = if (record != null) JSONUtils.readValue<Any>(record.value()!!) else emptyMap<String, Any>()
165+
!records.isEmpty && headers.size == 8 && data == value && count == 0
166+
&& headers["__streams.errors.exception.class.name"] == "org.neo4j.graphdb.QueryExecutionException"
167+
&& headers["__streams.errors.databaseName"] == dbName
168+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
169+
it.close()
170+
}
171+
}
129172
}

doc/docs/modules/ROOT/pages/consumer.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ Every published record in the `Dead Letter Queue` contains the original record `
190190

191191
| <prefix>exception.stacktrace"
192192
| The exception stack trace
193+
194+
| <prefix>databaseName"
195+
| The database name
193196
|===
194197

195198
[[neo4j_streams_supported_deserializers]]

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class Neo4jSinkTask : SinkTask() {
4545
neo4jService.writeData(data)
4646
} catch(e:Exception) {
4747
errorService.report(collection.map {
48-
ErrorData(it.topic(), it.timestamp(), it.key(), it.value(), it.kafkaPartition(), it.kafkaOffset(), this::class.java, e)
48+
ErrorData(it.topic(), it.timestamp(), it.key(), it.value(), it.kafkaPartition(), it.kafkaOffset(), this::class.java, this.config.database, e)
4949
})
5050
}
5151
}

0 commit comments

Comments
 (0)