Skip to content

Commit ba5dbff

Browse files
authored
Merge pull request #12 from theUniC/feature/kafka-transport
Kafka transport
2 parents 94539ef + a3eb765 commit ba5dbff

File tree

9 files changed

+263
-3
lines changed

9 files changed

+263
-3
lines changed

gradle/libs.versions.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,19 @@ nmcp = "0.0.9"
77
vanniktech = "0.30.0"
88
dokka = "2.0.0"
99
kotlinxSerializationJson = "1.8.0"
10+
kafkaStreams = "3.7.2"
11+
kotestTestcontainers = "2.0.2"
12+
kotlinLogging = "7.0.4"
1013

1114
[libraries]
1215
kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinCoroutines" }
1316
kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotestVersion" }
1417
kotest-assertions-core = { module = "io.kotest:kotest-assertions-core", version.ref = "kotestVersion" }
1518
arrow-core = { module = "io.arrow-kt:arrow-core", version.ref = "arrow" }
1619
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinxSerializationJson" }
20+
kafka-streams = { module = "org.apache.kafka:kafka-streams", version.ref = "kafkaStreams" }
21+
kotest-testcontainers = { module = "io.kotest.extensions:kotest-extensions-testcontainers", version.ref = "kotestTestcontainers" }
22+
kotlin-logging = { module = "io.github.oshai:kotlin-logging-jvm", version.ref = "kotlinLogging" }
1723

1824
[plugins]
1925
kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }

kcommand-core/src/main/kotlin/io/github/theunic/kcommand/core/RemoteTransport.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import kotlinx.coroutines.flow.Flow
77
import kotlinx.serialization.KSerializer
88
import kotlinx.coroutines.flow.map
99
import kotlinx.coroutines.flow.filterNotNull
10+
import kotlin.reflect.KClass
1011

1112
abstract class RemoteTransport<M: Any, R: Any>(
1213
private val registry: MessageRegistry<M>,
@@ -16,7 +17,7 @@ abstract class RemoteTransport<M: Any, R: Any>(
1617
prettyPrint = false
1718
}
1819
): Transport<M, R> {
19-
protected abstract fun doSend(serializedMessage: String)
20+
protected abstract suspend fun doSend(kclass: KClass<out M>, serializedMessage: String)
2021
protected abstract fun doReceiveFlow(): Flow<String>
2122

2223
override suspend fun send(message: M): Either<Unit, CompletableDeferred<R>> {
@@ -29,7 +30,7 @@ abstract class RemoteTransport<M: Any, R: Any>(
2930
val envelope = Envelope(typeName, payloadJson)
3031
val envelopeJson = json.encodeToString(Envelope.serializer(), envelope)
3132

32-
doSend(envelopeJson)
33+
doSend(kclass, envelopeJson)
3334

3435
// Fire-and-forget
3536
return Either.Left(Unit)

kcommand-core/src/test/kotlin/io/github/theunic/kcommand/core/RemoteTransportTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import kotlinx.serialization.Serializable
1010
import kotlinx.coroutines.flow.Flow
1111
import kotlinx.coroutines.flow.MutableSharedFlow
1212
import kotlinx.serialization.json.Json
13+
import kotlin.reflect.KClass
1314

1415
class DummyRemoteTransport<M : Any, R : Any>(
1516
registry: MessageRegistry<M>,
@@ -19,7 +20,7 @@ class DummyRemoteTransport<M : Any, R : Any>(
1920
val sentMessages = mutableListOf<String>()
2021
private val incomingFlow = MutableSharedFlow<String>()
2122

22-
override fun doSend(serializedMessage: String) {
23+
override suspend fun doSend(kclass: KClass<out M>, serializedMessage: String) {
2324
sentMessages.add(serializedMessage)
2425
}
2526

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
description =
2+
"""
3+
This library offers a scalable Message Bus implementation, enabling loose coupling for seamless system integration.
4+
It emphasizes the use of middlewares for customized message handling, ensuring flexibility in distributed architectures.
5+
Its user-friendly design allows developers to focus more on core functionalities, entrusting communication orchestration
6+
to the Message Bus implementations.
7+
""".trimIndent().replace("\n", " ")
8+
9+
plugins {
10+
alias(libs.plugins.serialization)
11+
}
12+
13+
dependencies {
14+
implementation(libs.kotlinx.coroutines)
15+
implementation(libs.arrow.core)
16+
implementation(libs.kafka.streams)
17+
implementation(libs.kotlin.logging)
18+
implementation(project(":kcommand-core"))
19+
20+
testImplementation(libs.kotest.runner.junit5)
21+
testImplementation(libs.kotest.assertions.core)
22+
testImplementation(libs.kotest.testcontainers)
23+
testImplementation(libs.kotlinx.serialization.json)
24+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package io.github.theunic.kcommand.transport.kafka
2+
3+
import io.github.oshai.kotlinlogging.KotlinLogging
4+
import io.github.theunic.kcommand.core.MessageRegistry
5+
import io.github.theunic.kcommand.core.RemoteTransport
6+
import kotlinx.coroutines.channels.awaitClose
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.callbackFlow
9+
import kotlinx.coroutines.suspendCancellableCoroutine
10+
import org.apache.kafka.clients.producer.KafkaProducer
11+
import org.apache.kafka.clients.producer.ProducerConfig
12+
import org.apache.kafka.clients.producer.ProducerRecord
13+
import org.apache.kafka.clients.producer.RecordMetadata
14+
import org.apache.kafka.streams.KafkaStreams
15+
import org.apache.kafka.streams.StreamsBuilder
16+
import org.apache.kafka.streams.kstream.KStream
17+
import java.time.Duration
18+
import java.util.Properties
19+
import kotlin.coroutines.resume
20+
import kotlin.coroutines.resumeWithException
21+
import kotlin.reflect.KClass
22+
23+
suspend fun KafkaProducer<String, String>.sendAwait(record: ProducerRecord<String, String>): RecordMetadata =
24+
suspendCancellableCoroutine { cont ->
25+
send(record) { metadata, exception ->
26+
if (exception == null) {
27+
cont.resume(metadata)
28+
} else {
29+
cont.resumeWithException(exception)
30+
}
31+
}
32+
}
33+
34+
class KafkaStreamsRemoteTransport<M : Any, R : Any, TOPIC: Enum<TOPIC>>(
35+
private val config: KafkaStreamsTransportConfig<M, TOPIC>,
36+
registry: MessageRegistry<M>,
37+
) : RemoteTransport<M, R>(registry) {
38+
39+
private val producer: KafkaProducer<String, String> by lazy {
40+
val props = Properties().apply {
41+
putAll(config.streamsProperties)
42+
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
43+
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
44+
}
45+
KafkaProducer<String, String>(props)
46+
}
47+
48+
private val logger = KotlinLogging.logger {}
49+
50+
private lateinit var streams: KafkaStreams
51+
52+
override suspend fun doSend(kclass: KClass<out M>, serializedMessage: String) {
53+
val topic = config.topicResolver.invoke(kclass)
54+
producer.sendAwait(ProducerRecord(topic.name, serializedMessage))
55+
}
56+
57+
override fun doReceiveFlow(): Flow<String> = callbackFlow {
58+
val builder = StreamsBuilder()
59+
60+
config.inputTopics.forEach { topic ->
61+
val stream: KStream<String, String> = builder.stream(topic.name)
62+
stream.foreach { _, value ->
63+
val result = trySend(value)
64+
if (result.isFailure) {
65+
logger.error(result.exceptionOrNull()) {}
66+
}
67+
}
68+
}
69+
70+
val topology = builder.build()
71+
streams = KafkaStreams(topology, config.streamsProperties)
72+
streams.start()
73+
74+
awaitClose {
75+
streams.close()
76+
producer.close()
77+
}
78+
}
79+
80+
fun stop() {
81+
streams.close(Duration.ofSeconds(1))
82+
producer.close()
83+
}
84+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.github.theunic.kcommand.transport.kafka
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig
4+
import org.apache.kafka.common.serialization.Serdes
5+
import org.apache.kafka.streams.StreamsConfig
6+
import java.util.Properties
7+
import kotlin.reflect.KClass
8+
9+
data class KafkaStreamsTransportConfig<M: Any, TOPIC: Enum<TOPIC>>(
10+
val inputTopics: List<TOPIC>,
11+
val topicResolver: (KClass<out M>) -> TOPIC,
12+
val streamsProperties: Properties
13+
) {
14+
companion object {
15+
/**
16+
* Helper to create a basic config object.
17+
* @param applicationId The application.id for Kafka Streams.
18+
* @param bootstrapServers Kafka bootstrap addresses.
19+
*/
20+
fun <M: Any, TOPIC: Enum<TOPIC>> basic(
21+
applicationId: String,
22+
bootstrapServers: String,
23+
inputTopics: List<TOPIC>,
24+
topicResolver: (KClass<out M>) -> TOPIC
25+
): KafkaStreamsTransportConfig<M, TOPIC> {
26+
val props = Properties().apply {
27+
put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
28+
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
29+
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
30+
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde::class.java.name)
31+
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde::class.java.name)
32+
}
33+
34+
return KafkaStreamsTransportConfig(
35+
streamsProperties = props,
36+
topicResolver = topicResolver,
37+
inputTopics = inputTopics,
38+
)
39+
}
40+
}
41+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package io.github.theunic.kcommand.transport.kafka
2+
3+
import io.github.theunic.kcommand.core.MessageRegistry
4+
import io.kotest.common.ExperimentalKotest
5+
import io.kotest.core.extensions.install
6+
import io.kotest.core.spec.style.BehaviorSpec
7+
import io.kotest.extensions.testcontainers.ContainerExtension
8+
import io.kotest.matchers.collections.shouldContain
9+
import io.kotest.matchers.shouldBe
10+
import kotlinx.coroutines.cancel
11+
import kotlinx.coroutines.delay
12+
import kotlinx.coroutines.flow.first
13+
import kotlinx.coroutines.launch
14+
import kotlinx.coroutines.test.runTest
15+
import org.apache.kafka.clients.admin.KafkaAdminClient
16+
import org.apache.kafka.clients.admin.NewTopic
17+
import org.testcontainers.containers.KafkaContainer
18+
import org.testcontainers.utility.DockerImageName
19+
import java.util.Properties
20+
21+
enum class Topics(val topicName: String) {
22+
DEFAULT("testTopic")
23+
}
24+
25+
@OptIn(ExperimentalKotest::class)
26+
class KafkaStreamsTransportTest : BehaviorSpec({
27+
context("The KafkaStreamsTransport should send and receive messages using Kafka streams") {
28+
given("A Kafka cluster") {
29+
val kafka = install(ContainerExtension(KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.8.1"))))
30+
31+
beforeSpec {
32+
kafka.start()
33+
}
34+
35+
val registry = MessageRegistry<TestMessage>()
36+
registry.register(
37+
TestMessage::class,
38+
TestMessage.serializer()
39+
)
40+
41+
val transportConfig = KafkaStreamsTransportConfig.basic<TestMessage, Topics>(
42+
applicationId = "kcommand-streams-test",
43+
bootstrapServers = kafka.bootstrapServers,
44+
inputTopics = listOf(Topics.DEFAULT),
45+
topicResolver = { _ -> Topics.DEFAULT }
46+
)
47+
48+
val transport = KafkaStreamsRemoteTransport<TestMessage, Any, Topics>(
49+
config = transportConfig,
50+
registry = registry,
51+
)
52+
53+
val properties = Properties()
54+
properties["bootstrap.servers"] = kafka.bootstrapServers
55+
properties["connections.max.idle.ms"] = 10000
56+
properties["request.timeout.ms"] = 5000
57+
val adminClient = KafkaAdminClient.create(properties)
58+
adminClient.createTopics(listOf(NewTopic("testTopic", 1, 1)))
59+
60+
afterSpec {
61+
kafka.stop()
62+
transport.stop()
63+
}
64+
65+
`when`("the cluster is inspected") {
66+
val topics = adminClient.listTopics()
67+
topics.names().whenComplete { topicNames, _ ->
68+
topicNames shouldContain "testTopic"
69+
}
70+
}
71+
72+
given("A KafkaStreamsRemoteTransport with a real Kafka container") {
73+
`when`("calling send(...)") {
74+
then("the message should be received in the flow") {
75+
runTest {
76+
val job = launch {
77+
val (msg, either) = transport.receive().first()
78+
msg.content shouldBe "Hello from Testcontainers"
79+
either.isLeft() shouldBe true
80+
this.cancel()
81+
}
82+
83+
// Wait for the topology to start
84+
delay(2000)
85+
86+
val result = transport.send(TestMessage("Hello from Testcontainers"))
87+
result.isLeft() shouldBe true
88+
89+
job.join()
90+
}
91+
}
92+
}
93+
}
94+
}
95+
}
96+
})
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.github.theunic.kcommand.transport.kafka
2+
3+
import kotlinx.serialization.Serializable
4+
5+
@Serializable
6+
data class TestMessage(val content: String)

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ plugins {
44

55
rootProject.name = "kcommand"
66
include(":kcommand-core")
7+
include(":kcommand-kafka-transport")

0 commit comments

Comments
 (0)