Skip to content

Commit 986be24

Browse files
conker84mneedham
authored andcommitted
fixes #186: Kafka even sink with manual commit fails with multiple topic subscriptions
1 parent 5d635b8 commit 986be24

File tree

2 files changed

+73
-9
lines changed

2 files changed

+73
-9
lines changed

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class KafkaEventSink(private val config: Config,
7979

8080
private fun createJob(): Job {
8181
log.info("Creating Sink daemon Job")
82-
return GlobalScope.launch(Dispatchers.IO) {
82+
return GlobalScope.launch(Dispatchers.IO) { // TODO improve exception management
8383
try {
8484
while (isActive) {
8585
eventConsumer.read { topic, data ->
@@ -153,7 +153,11 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
153153
if (!records.isEmpty) {
154154
try {
155155
this.topics.forEach { topic ->
156-
action(topic, records.records(topic).map { JSONUtils.readValue<Any>(it.value()) })
156+
val topicRecords = records.records(topic)
157+
if (!topicRecords.iterator().hasNext()) {
158+
return@forEach
159+
}
160+
action(topic, topicRecords.map { JSONUtils.readValue<Any>(it.value()) })
157161
}
158162
} catch (e: Exception) {
159163
// TODO add dead letter queue
@@ -186,10 +190,10 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
186190
fun toConsumerRecordsMap(topicPartitionsMap: Map<TopicPartition, Long>,
187191
records: ConsumerRecords<String, ByteArray>)
188192
: Map<TopicPartition, List<ConsumerRecord<String, ByteArray>>> = topicPartitionsMap
189-
.mapValues {
190-
records.records(it.key)
191-
}
192-
.filterValues { it.isNotEmpty() }
193+
.mapValues {
194+
records.records(it.key)
195+
}
196+
.filterValues { it.isNotEmpty() }
193197

194198
fun setSeek(topicPartitionsMap: Map<TopicPartition, Long>) {
195199
if (isSeekSet) {
@@ -226,6 +230,9 @@ class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
226230
if (!records.isEmpty) {
227231
this.topics.forEach { topic ->
228232
val topicRecords = records.records(topic)
233+
if (!topicRecords.iterator().hasNext()) {
234+
return@forEach
235+
}
229236
val lastRecord = topicRecords.last()
230237
val offsetAndMetadata = OffsetAndMetadata(lastRecord.offset(), "")
231238
val topicPartition = TopicPartition(lastRecord.topic(), lastRecord.partition())
@@ -291,7 +298,7 @@ class StreamsConsumerRebalanceListener(private val topicPartitionOffsetMap: Map<
291298
.map {
292299
val offset = consumer.position(it)
293300
if (log.isDebugEnabled) {
294-
log.debug("for topic ${it.topic()} partition ${it.partition()}, the last saved offset is: $offset")
301+
log.debug("onPartitionsRevoked: for topic ${it.topic()} partition ${it.partition()}, the last saved offset is: $offset")
295302
}
296303
it to OffsetAndMetadata(offset, "")
297304
}
@@ -301,9 +308,9 @@ class StreamsConsumerRebalanceListener(private val topicPartitionOffsetMap: Map<
301308

302309
override fun onPartitionsAssigned(partitions: Collection<TopicPartition>) {
303310
for (partition in partitions) {
304-
val offset = topicPartitionOffsetMap[partition]?.offset()
311+
val offset = (topicPartitionOffsetMap[partition] ?: consumer.committed(partition))?.offset()
305312
if (log.isDebugEnabled) {
306-
log.debug("for ${partition.topic()} partition ${partition.partition()}, the retrieved offset is: $offset")
313+
log.debug("onPartitionsAssigned: for ${partition.topic()} partition ${partition.partition()}, the retrieved offset is: $offset")
307314
}
308315
if (offset == null) {
309316
when (autoOffsetReset) {

consumer/src/test/kotlin/integrations/KafkaEventSinkIT.kt

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,63 @@ class KafkaEventSinkIT {
401401
}, equalTo(true), 30, TimeUnit.SECONDS)
402402
}
403403

404+
@Test
405+
fun `should fix issue 186 with auto commit false`() {
406+
val product = "product" to "MERGE (p:Product {id: event.id}) ON CREATE SET p.name = event.name"
407+
val customer = "customer" to "MERGE (c:Customer {id: event.id}) ON CREATE SET c.name = event.name"
408+
val bought = "bought" to """
409+
MERGE (c:Customer {id: event.id})
410+
MERGE (p:Product {id: event.id})
411+
MERGE (c)-[:BOUGHT]->(p)
412+
""".trimIndent()
413+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${product.first}", product.second)
414+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${customer.first}", customer.second)
415+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${bought.first}", bought.second)
416+
graphDatabaseBuilder.setConfig("kafka.${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG}", "false")
417+
db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
418+
419+
val props = mapOf("id" to 1, "name" to "My Awesome Product")
420+
var producerRecord = ProducerRecord(product.first, UUID.randomUUID().toString(),
421+
JSONUtils.writeValueAsBytes(props))
422+
kafkaProducer.send(producerRecord).get()
423+
assertEventually(ThrowingSupplier<Boolean, Exception> {
424+
val query = """
425+
MATCH (p:Product)
426+
WHERE properties(p) = {props}
427+
RETURN count(p) AS count
428+
""".trimIndent()
429+
val result = db.execute(query, mapOf("props" to props)).columnAs<Long>("count")
430+
result.hasNext() && result.next() == 1L && !result.hasNext()
431+
}, equalTo(true), 30, TimeUnit.SECONDS)
432+
}
404433

434+
@Test
435+
fun `should fix issue 186 with auto commit true`() {
436+
val product = "product" to "MERGE (p:Product {id: event.id}) ON CREATE SET p.name = event.name"
437+
val customer = "customer" to "MERGE (c:Customer {id: event.id}) ON CREATE SET c.name = event.name"
438+
val bought = "bought" to """
439+
MERGE (c:Customer {id: event.id})
440+
MERGE (p:Product {id: event.id})
441+
MERGE (c)-[:BOUGHT]->(p)
442+
""".trimIndent()
443+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${product.first}", product.second)
444+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${customer.first}", customer.second)
445+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${bought.first}", bought.second)
446+
db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
447+
448+
val props = mapOf("id" to 1, "name" to "My Awesome Product")
449+
var producerRecord = ProducerRecord(product.first, UUID.randomUUID().toString(),
450+
JSONUtils.writeValueAsBytes(props))
451+
kafkaProducer.send(producerRecord).get()
452+
assertEventually(ThrowingSupplier<Boolean, Exception> {
453+
val query = """
454+
MATCH (p:Product)
455+
WHERE properties(p) = {props}
456+
RETURN count(p) AS count
457+
""".trimIndent()
458+
val result = db.execute(query, mapOf("props" to props)).columnAs<Long>("count")
459+
result.hasNext() && result.next() == 1L && !result.hasNext()
460+
}, equalTo(true), 30, TimeUnit.SECONDS)
461+
}
405462

406463
}

0 commit comments

Comments
 (0)