Skip to content

Commit 19b3baf

Browse files
conker84moxious
authored andcommitted
fixes #207: Bump kafka Version (#209)
1 parent 9c8a2ca commit 19b3baf

File tree

4 files changed

+62
-47
lines changed

4 files changed

+62
-47
lines changed

common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import org.apache.kafka.clients.producer.MockProducer
55
import org.apache.kafka.clients.producer.ProducerRecord
66
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata
77
import org.apache.kafka.common.record.RecordBatch
8+
import org.apache.kafka.common.utils.SystemTime
9+
import org.apache.kafka.common.utils.Time
810
import org.junit.Test
911
import org.mockito.ArgumentMatchers
1012
import org.mockito.Mockito
@@ -23,7 +25,7 @@ class KafkaErrorServiceTest {
2325
val counter = AtomicInteger(0)
2426
Mockito.`when`(producer.send(ArgumentMatchers.any<ProducerRecord<ByteArray, ByteArray>>())).then {
2527
counter.incrementAndGet()
26-
FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0)
28+
FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, SystemTime())
2729
}
2830
val dlqService = KafkaErrorService(producer, ErrorService.ErrorConfig(fail=false,dlqTopic = "dlqTopic"), { s, e -> })
2931
dlqService.report(listOf(dlqData()))

consumer/src/main/kotlin/streams/kafka/KafkaAutoCommitEventConsumer.kt

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import streams.extensions.toStreamsSinkEntity
1111
import streams.extensions.topicPartition
1212
import streams.service.StreamsSinkEntity
1313
import streams.service.errors.*
14+
import java.time.Duration
1415

1516
data class KafkaTopicConfig(val commit: Boolean, val topicPartitionsMap: Map<TopicPartition, Long>) {
1617
companion object {
@@ -64,15 +65,18 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
6465
}
6566

6667
fun readSimple(action: (String, List<StreamsSinkEntity>) -> Unit): Map<TopicPartition, OffsetAndMetadata> {
67-
val records = consumer.poll(0)
68-
return this.topics
69-
.filter { topic -> records.records(topic).iterator().hasNext() }
70-
.flatMap { topic -> records.records(topic).map { it.topicPartition() to it } }
71-
.groupBy({ it.first }, { it.second })
72-
.mapValues {
73-
executeAction(action, it.key.topic(), it.value)
74-
it.value.last().offsetAndMetadata()
75-
}
68+
val records = consumer.poll(Duration.ZERO)
69+
return when (records.isEmpty) {
70+
true -> emptyMap()
71+
else -> this.topics
72+
.filter { topic -> records.records(topic).iterator().hasNext() }
73+
.flatMap { topic -> records.records(topic).map { it.topicPartition() to it } }
74+
.groupBy({ it.first }, { it.second })
75+
.mapValues {
76+
executeAction(action, it.key.topic(), it.value)
77+
it.value.last().offsetAndMetadata()
78+
}
79+
}
7680
}
7781

7882
private fun executeAction(action: (String, List<StreamsSinkEntity>) -> Unit, topic: String, topicRecords: Iterable<ConsumerRecord<ByteArray, ByteArray>>) {
@@ -102,14 +106,17 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
102106
fun readFromPartition(kafkaTopicConfig: KafkaTopicConfig,
103107
action: (String, List<StreamsSinkEntity>) -> Unit): Map<TopicPartition, OffsetAndMetadata> {
104108
setSeek(kafkaTopicConfig.topicPartitionsMap)
105-
val records = consumer.poll(0)
106-
return kafkaTopicConfig.topicPartitionsMap
107-
.mapValues { records.records(it.key) }
108-
.filterValues { it.isNotEmpty() }
109-
.mapValues { (topic, topicRecords) ->
110-
executeAction(action, topic.topic(), topicRecords)
111-
topicRecords.last().offsetAndMetadata()
112-
}
109+
val records = consumer.poll(Duration.ZERO)
110+
return when (records.isEmpty) {
111+
true -> emptyMap()
112+
else -> kafkaTopicConfig.topicPartitionsMap
113+
.mapValues { records.records(it.key) }
114+
.filterValues { it.isNotEmpty() }
115+
.mapValues { (topic, topicRecords) ->
116+
executeAction(action, topic.topic(), topicRecords)
117+
topicRecords.last().offsetAndMetadata()
118+
}
119+
}
113120
}
114121

115122
override fun read(action: (String, List<StreamsSinkEntity>) -> Unit) {

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkDLQ.kt

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,27 @@ class KafkaEventSinkDLQ : KafkaEventSinkBase() {
2727

2828
var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(data))
2929
kafkaProducer.send(producerRecord).get()
30-
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
31-
val query = """
30+
val dlqConsumer = createConsumer<ByteArray, ByteArray>(ByteArrayDeserializer::class.java.name,
31+
ByteArrayDeserializer::class.java.name,
32+
dlqTopic)
33+
34+
dlqConsumer.let {
35+
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
36+
val query = """
3237
MATCH (c:Customer)
3338
RETURN count(c) AS count
3439
""".trimIndent()
35-
val result = db.execute(query).columnAs<Long>("count")
40+
val result = db.execute(query).columnAs<Long>("count")
3641

37-
val dlqConsumer = createConsumer<ByteArray, ByteArray>(ByteArrayDeserializer::class.java.name,
38-
ByteArrayDeserializer::class.java.name,
39-
dlqTopic)
40-
val records = dlqConsumer.poll(5000)
41-
val record = if (records.isEmpty) null else records.records(dlqTopic).iterator().next()
42-
val headers = record?.headers()?.map { it.key() to String(it.value()) }?.toMap().orEmpty()
43-
val value = if (record != null) JSONUtils.readValue<Any>(record.value()!!) else emptyMap<String, Any>()
44-
dlqConsumer.close()
45-
!records.isEmpty && headers.size == 7 && value == data && result.hasNext() && result.next() == 0L && !result.hasNext()
46-
&& headers["__streams.errors.exception.class.name"] == "org.neo4j.graphdb.QueryExecutionException"
47-
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
42+
val records = dlqConsumer.poll(5000)
43+
val record = if (records.isEmpty) null else records.records(dlqTopic).iterator().next()
44+
val headers = record?.headers()?.map { it.key() to String(it.value()) }?.toMap().orEmpty()
45+
val value = if (record != null) JSONUtils.readValue<Any>(record.value()!!) else emptyMap<String, Any>()
46+
!records.isEmpty && headers.size == 7 && value == data && result.hasNext() && result.next() == 0L && !result.hasNext()
47+
&& headers["__streams.errors.exception.class.name"] == "org.neo4j.graphdb.QueryExecutionException"
48+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
49+
it.close()
50+
}
4851
}
4952

5053
@Test
@@ -63,24 +66,27 @@ class KafkaEventSinkDLQ : KafkaEventSinkBase() {
6366
var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(),
6467
data.toByteArray())
6568
kafkaProducer.send(producerRecord).get()
66-
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
67-
val query = """
69+
val dlqConsumer = createConsumer<ByteArray, ByteArray>(ByteArrayDeserializer::class.java.name,
70+
ByteArrayDeserializer::class.java.name,
71+
dlqTopic)
72+
dlqConsumer.let {
73+
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
74+
val query = """
6875
MATCH (c:Customer)
6976
RETURN count(c) AS count
7077
""".trimIndent()
71-
val result = db.execute(query).columnAs<Long>("count")
78+
val result = db.execute(query).columnAs<Long>("count")
79+
7280

73-
val dlqConsumer = createConsumer<ByteArray, ByteArray>(ByteArrayDeserializer::class.java.name,
74-
ByteArrayDeserializer::class.java.name,
75-
dlqTopic)
76-
val records = dlqConsumer.poll(5000)
77-
val record = if (records.isEmpty) null else records.records(dlqTopic).iterator().next()
78-
val headers = record?.headers()?.map { it.key() to String(it.value()) }?.toMap().orEmpty()
79-
val value = if (record != null) String(record.value()) else emptyMap<String, Any>()
80-
dlqConsumer.close()
81-
!records.isEmpty && headers.size == 7 && data == value && result.hasNext() && result.next() == 0L && !result.hasNext()
82-
&& headers["__streams.errors.exception.class.name"] == "com.fasterxml.jackson.core.JsonParseException"
83-
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
81+
val records = dlqConsumer.poll(5000)
82+
val record = if (records.isEmpty) null else records.records(dlqTopic).iterator().next()
83+
val headers = record?.headers()?.map { it.key() to String(it.value()) }?.toMap().orEmpty()
84+
val value = if (record != null) String(record.value()) else emptyMap<String, Any>()
85+
!records.isEmpty && headers.size == 7 && data == value && result.hasNext() && result.next() == 0L && !result.hasNext()
86+
&& headers["__streams.errors.exception.class.name"] == "com.fasterxml.jackson.core.JsonParseException"
87+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
88+
it.close()
89+
}
8490
}
8591

8692
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<kotlin.version>1.3.0</kotlin.version>
3737
<kotlin.coroutines.version>1.0.0</kotlin.coroutines.version>
3838
<neo4j.version>3.5.2</neo4j.version>
39-
<kafka.version>1.0.1</kafka.version>
39+
<kafka.version>2.3.0</kafka.version>
4040
<jackson.version>2.9.7</jackson.version>
4141
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
4242
<neo4j.java.driver.version>1.7.5</neo4j.java.driver.version>

0 commit comments

Comments
 (0)