Skip to content

Commit 041a936

Browse files
authored
build: upgrade kafka client (#620)
1 parent d74168f commit 041a936

File tree

10 files changed

+9
-695
lines changed

10 files changed

+9
-695
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@ jobs:
1212
runs-on: ubuntu-latest
1313

1414
steps:
15-
- uses: actions/checkout@v2
15+
- uses: actions/checkout@v4
1616

1717
- name: Set up JDK 11
18-
uses: actions/setup-java@v1
18+
uses: actions/setup-java@v4
1919
with:
20+
distribution: temurin
2021
java-version: 11
2122

2223
- name: Cache Maven packages
23-
uses: actions/cache@v1
24+
uses: actions/cache@v4
2425
with:
2526
path: ~/.m2
2627
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,10 @@
11
package streams.extensions
22

3-
import org.apache.kafka.clients.consumer.ConsumerRecord
4-
import org.apache.kafka.clients.consumer.OffsetAndMetadata
5-
import org.apache.kafka.common.TopicPartition
63
import org.neo4j.driver.types.Node
74
import org.neo4j.driver.types.Relationship
8-
import streams.service.StreamsSinkEntity
9-
import streams.utils.JSONUtils
10-
import java.nio.ByteBuffer
115
import java.util.*
126
import javax.lang.model.SourceVersion
137

14-
fun Map<String,String>.getInt(name:String, defaultValue: Int) = this.get(name)?.toInt() ?: defaultValue
158
fun Map<*, *>.asProperties() = this.let {
169
val properties = Properties()
1710
properties.putAll(it)
@@ -34,10 +27,6 @@ fun Relationship.asStreamsMap(): Map<String, Any?> {
3427
return relMap
3528
}
3629

37-
fun String.toPointCase(): String {
38-
return this.split("(?<=[a-z])(?=[A-Z])".toRegex()).joinToString(separator = ".").toLowerCase()
39-
}
40-
4130
fun String.quote(): String = if (SourceVersion.isIdentifier(this)) this else "`$this`"
4231

4332
fun Map<String, Any?>.flatten(map: Map<String, Any?> = this, prefix: String = ""): Map<String, Any?> {
@@ -51,33 +40,4 @@ fun Map<String, Any?>.flatten(map: Map<String, Any?> = this, prefix: String = ""
5140
listOf(newKey to value)
5241
}
5342
}.toMap()
54-
}
55-
56-
fun ConsumerRecord<*, *>.topicPartition() = TopicPartition(this.topic(), this.partition())
57-
fun ConsumerRecord<*, *>.offsetAndMetadata(metadata: String = "") = OffsetAndMetadata(this.offset() + 1, metadata)
58-
59-
private fun convertAvroData(rawValue: Any?): Any? = when (rawValue) {
60-
is Collection<*> -> rawValue.map(::convertAvroData)
61-
is Array<*> -> if (rawValue.javaClass.componentType.isPrimitive) rawValue else rawValue.map(::convertAvroData)
62-
is Map<*, *> -> rawValue
63-
.mapKeys { it.key.toString() }
64-
.mapValues { convertAvroData(it.value) }
65-
is ByteBuffer -> rawValue.array()
66-
is CharSequence -> rawValue.toString()
67-
else -> rawValue
68-
}
69-
70-
71-
72-
private fun convertData(data: Any?, stringWhenFailure: Boolean = false): Any? {
73-
return when (data) {
74-
null -> null
75-
is ByteArray -> JSONUtils.readValue<Any>(data, stringWhenFailure)
76-
else -> if (stringWhenFailure) data.toString() else throw RuntimeException("Unsupported type ${data::class.java.name}")
77-
}
78-
}
79-
fun ConsumerRecord<*, *>.toStreamsSinkEntity(): StreamsSinkEntity {
80-
val key = convertData(this.key(), true)
81-
val value = convertData(this.value())
82-
return StreamsSinkEntity(key, value)
8343
}

common/src/main/kotlin/streams/service/errors/ErrorService.kt

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package streams.service.errors
22

3-
import org.apache.kafka.clients.consumer.ConsumerRecord
4-
import org.apache.kafka.common.record.RecordBatch
53
import java.util.Properties
64

7-
85
data class ErrorData(val originalTopic: String,
96
val timestamp: Long,
107
val key: ByteArray?,
@@ -17,21 +14,11 @@ data class ErrorData(val originalTopic: String,
1714

1815
constructor(originalTopic: String, timestamp: Long?, key: Any?, value: Any?,
1916
partition: Int, offset: Long, executingClass: Class<*>?, databaseName: String?, exception: Exception?) :
20-
this(originalTopic, timestamp ?: RecordBatch.NO_TIMESTAMP, toByteArray(key), toByteArray(value), partition.toString(),offset.toString(), executingClass, databaseName, exception)
17+
this(originalTopic, timestamp ?: NO_TIMESTAMP, toByteArray(key), toByteArray(value), partition.toString(),offset.toString(), executingClass, databaseName, exception)
2118

2219
companion object {
2320

24-
fun from(consumerRecord: ConsumerRecord<out Any, out Any>, exception: Exception?, executingClass: Class<*>?, databaseName: String?): ErrorData {
25-
return ErrorData(offset = consumerRecord.offset().toString(),
26-
originalTopic = consumerRecord.topic(),
27-
partition = consumerRecord.partition().toString(),
28-
timestamp = consumerRecord.timestamp(),
29-
exception = exception,
30-
executingClass = executingClass,
31-
key = toByteArray(consumerRecord.key()),
32-
value = toByteArray(consumerRecord.value()),
33-
databaseName = databaseName)
34-
}
21+
private const val NO_TIMESTAMP: Long = -1L
3522

3623
fun toByteArray(v:Any?) = try {
3724
when (v) {

common/src/main/kotlin/streams/utils/KafkaValidationUtils.kt

Lines changed: 0 additions & 69 deletions
This file was deleted.

common/src/test/kotlin/streams/service/sink/errors/KafkaErrorServiceTest.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import org.apache.kafka.clients.producer.ProducerRecord
66
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata
77
import org.apache.kafka.common.record.RecordBatch
88
import org.apache.kafka.common.utils.SystemTime
9-
import org.apache.kafka.common.utils.Time
109
import org.junit.Test
1110
import org.mockito.ArgumentMatchers
1211
import org.mockito.Mockito
@@ -25,7 +24,7 @@ class KafkaErrorServiceTest {
2524
val counter = AtomicInteger(0)
2625
Mockito.`when`(producer.send(ArgumentMatchers.any<ProducerRecord<ByteArray, ByteArray>>())).then {
2726
counter.incrementAndGet()
28-
FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, SystemTime())
27+
FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0, 0, SystemTime())
2928
}
3029
val dlqService = KafkaErrorService(producer, ErrorService.ErrorConfig(fail=false,dlqTopic = "dlqTopic"), { s, e -> })
3130
dlqService.report(listOf(dlqData()))

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
<kotlin.version>1.9.20</kotlin.version>
5353
<kotlin.coroutines.version>1.7.3</kotlin.coroutines.version>
5454
<neo4j.version>4.4.27</neo4j.version>
55-
<kafka.version>2.6.3</kafka.version>
55+
<!-- Version 3.4 is the minimal version introducing version for CVE-2023-25194 -->
56+
<kafka.version>3.4.1</kafka.version>
5657
<jackson.version>2.15.2</jackson.version>
5758
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
5859
<neo4j.java.driver.version>4.4.12</neo4j.java.driver.version>

0 commit comments

Comments
 (0)