Skip to content

Commit e3633d0

Browse files
committed
First iteration
1 parent 94539ef commit e3633d0

File tree

9 files changed

+260
-3
lines changed

9 files changed

+260
-3
lines changed

gradle/libs.versions.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,19 @@ nmcp = "0.0.9"
77
vanniktech = "0.30.0"
88
dokka = "2.0.0"
99
kotlinxSerializationJson = "1.8.0"
10+
kafkaStreams = "3.7.2"
11+
kotestTestcontainers = "2.0.2"
12+
kotlinLogging = "7.0.4"
1013

1114
[libraries]
1215
kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinCoroutines" }
1316
kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotestVersion" }
1417
kotest-assertions-core = { module = "io.kotest:kotest-assertions-core", version.ref = "kotestVersion" }
1518
arrow-core = { module = "io.arrow-kt:arrow-core", version.ref = "arrow" }
1619
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinxSerializationJson" }
20+
kafka-streams = { module = "org.apache.kafka:kafka-streams", version.ref = "kafkaStreams" }
21+
kotest-testcontainers = { module = "io.kotest.extensions:kotest-extensions-testcontainers", version.ref = "kotestTestcontainers" }
22+
kotlin-logging = { module = "io.github.oshai:kotlin-logging-jvm", version.ref = "kotlinLogging" }
1723

1824
[plugins]
1925
kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }

kcommand-core/src/main/kotlin/io/github/theunic/kcommand/core/RemoteTransport.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import kotlinx.coroutines.flow.Flow
77
import kotlinx.serialization.KSerializer
88
import kotlinx.coroutines.flow.map
99
import kotlinx.coroutines.flow.filterNotNull
10+
import kotlin.reflect.KClass
1011

1112
abstract class RemoteTransport<M: Any, R: Any>(
1213
private val registry: MessageRegistry<M>,
@@ -16,7 +17,7 @@ abstract class RemoteTransport<M: Any, R: Any>(
1617
prettyPrint = false
1718
}
1819
): Transport<M, R> {
19-
protected abstract fun doSend(serializedMessage: String)
20+
protected abstract suspend fun doSend(kclass: KClass<out M>, serializedMessage: String)
2021
protected abstract fun doReceiveFlow(): Flow<String>
2122

2223
override suspend fun send(message: M): Either<Unit, CompletableDeferred<R>> {
@@ -29,7 +30,7 @@ abstract class RemoteTransport<M: Any, R: Any>(
2930
val envelope = Envelope(typeName, payloadJson)
3031
val envelopeJson = json.encodeToString(Envelope.serializer(), envelope)
3132

32-
doSend(envelopeJson)
33+
doSend(kclass, envelopeJson)
3334

3435
// Fire-and-forget
3536
return Either.Left(Unit)

kcommand-core/src/test/kotlin/io/github/theunic/kcommand/core/RemoteTransportTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import kotlinx.serialization.Serializable
1010
import kotlinx.coroutines.flow.Flow
1111
import kotlinx.coroutines.flow.MutableSharedFlow
1212
import kotlinx.serialization.json.Json
13+
import kotlin.reflect.KClass
1314

1415
class DummyRemoteTransport<M : Any, R : Any>(
1516
registry: MessageRegistry<M>,
@@ -19,7 +20,7 @@ class DummyRemoteTransport<M : Any, R : Any>(
1920
val sentMessages = mutableListOf<String>()
2021
private val incomingFlow = MutableSharedFlow<String>()
2122

22-
override fun doSend(serializedMessage: String) {
23+
override suspend fun doSend(kclass: KClass<out M>, serializedMessage: String) {
2324
sentMessages.add(serializedMessage)
2425
}
2526

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
description =
2+
"""
3+
This library offers a scalable Message Bus implementation, enabling loose coupling for seamless system integration.
4+
It emphasizes the use of middlewares for customized message handling, ensuring flexibility in distributed architectures.
5+
Its user-friendly design allows developers to focus more on core functionalities, entrusting communication orchestration
6+
to the Message Bus implementations.
7+
""".trimIndent().replace("\n", " ")
8+
9+
plugins {
10+
alias(libs.plugins.serialization)
11+
}
12+
13+
dependencies {
14+
implementation(libs.kotlinx.coroutines)
15+
implementation(libs.arrow.core)
16+
implementation(libs.kafka.streams)
17+
implementation(libs.kotlin.logging)
18+
implementation(project(":kcommand-core"))
19+
20+
testImplementation(libs.kotest.runner.junit5)
21+
testImplementation(libs.kotest.assertions.core)
22+
testImplementation(libs.kotest.testcontainers)
23+
testImplementation(libs.kotlinx.serialization.json)
24+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package io.github.theunic.kcommand.transport.kafka
2+
3+
import io.github.oshai.kotlinlogging.KotlinLogging
4+
import io.github.theunic.kcommand.core.MessageRegistry
5+
import io.github.theunic.kcommand.core.RemoteTransport
6+
import kotlinx.coroutines.channels.awaitClose
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.callbackFlow
9+
import kotlinx.coroutines.suspendCancellableCoroutine
10+
import org.apache.kafka.clients.producer.KafkaProducer
11+
import org.apache.kafka.clients.producer.ProducerConfig
12+
import org.apache.kafka.clients.producer.ProducerRecord
13+
import org.apache.kafka.clients.producer.RecordMetadata
14+
import org.apache.kafka.streams.KafkaStreams
15+
import org.apache.kafka.streams.StreamsBuilder
16+
import org.apache.kafka.streams.kstream.KStream
17+
import java.time.Duration
18+
import java.util.Properties
19+
import kotlin.coroutines.resume
20+
import kotlin.coroutines.resumeWithException
21+
import kotlin.reflect.KClass
22+
23+
suspend fun KafkaProducer<String, String>.sendAwait(record: ProducerRecord<String, String>): RecordMetadata =
24+
suspendCancellableCoroutine { cont ->
25+
send(record) { metadata, exception ->
26+
if (exception == null) {
27+
cont.resume(metadata)
28+
} else {
29+
cont.resumeWithException(exception)
30+
}
31+
}
32+
}
33+
34+
class KafkaStreamsRemoteTransport<M : Any, R : Any>(
35+
private val config: KafkaStreamsTransportConfig<M>,
36+
registry: MessageRegistry<M>,
37+
) : RemoteTransport<M, R>(registry) {
38+
39+
private val producer: KafkaProducer<String, String> by lazy {
40+
val props = Properties().apply {
41+
putAll(config.streamsProperties)
42+
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
43+
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
44+
}
45+
KafkaProducer<String, String>(props)
46+
}
47+
48+
private val logger = KotlinLogging.logger {}
49+
50+
private lateinit var streams: KafkaStreams
51+
52+
override suspend fun doSend(kclass: KClass<out M>, serializedMessage: String) {
53+
val topic = config.topicResolver.invoke(kclass)
54+
producer.sendAwait(ProducerRecord(topic, serializedMessage))
55+
}
56+
57+
override fun doReceiveFlow(): Flow<String> = callbackFlow {
58+
val builder = StreamsBuilder()
59+
60+
val stream: KStream<String, String> = builder.stream(config.defaultTopic)
61+
stream.foreach { _, value ->
62+
val result = trySend(value)
63+
if (result.isFailure) {
64+
logger.error(result.exceptionOrNull()) {}
65+
}
66+
}
67+
68+
val topology = builder.build()
69+
streams = KafkaStreams(topology, config.streamsProperties)
70+
streams.start()
71+
72+
awaitClose {
73+
streams.close()
74+
producer.close()
75+
}
76+
}
77+
78+
fun stop() {
79+
streams.close(Duration.ofSeconds(1))
80+
producer.close()
81+
}
82+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.github.theunic.kcommand.transport.kafka
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig
4+
import org.apache.kafka.common.serialization.Serdes
5+
import org.apache.kafka.streams.StreamsConfig
6+
import java.util.Properties
7+
import kotlin.reflect.KClass
8+
9+
data class KafkaStreamsTransportConfig<M: Any, TOPIC: Enum<TOPIC>>(
10+
val inputTopics: List<TOPIC>,
11+
val topicResolver: (KClass<out M>) -> TOPIC,
12+
val streamsProperties: Properties
13+
) {
14+
companion object {
15+
/**
16+
* Helper to create a basic config object.
17+
* @param applicationId The application.id for Kafka Streams.
18+
* @param bootstrapServers Kafka bootstrap addresses.
19+
*/
20+
fun <M: Any, TOPIC: Enum<TOPIC>> basic(
21+
applicationId: String,
22+
bootstrapServers: String,
23+
inputTopics: List<TOPIC>,
24+
topicResolver: (KClass<out M>) -> TOPIC
25+
): KafkaStreamsTransportConfig<M, TOPIC> {
26+
val props = Properties().apply {
27+
put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
28+
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
29+
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
30+
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde::class.java.name)
31+
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde::class.java.name)
32+
}
33+
34+
return KafkaStreamsTransportConfig(
35+
streamsProperties = props,
36+
topicResolver = topicResolver,
37+
inputTopics = inputTopics,
38+
)
39+
}
40+
}
41+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package io.github.theunic.kcommand.transport.kafka
2+
3+
import io.github.theunic.kcommand.core.MessageRegistry
4+
import io.kotest.common.ExperimentalKotest
5+
import io.kotest.core.extensions.install
6+
import io.kotest.core.spec.style.BehaviorSpec
7+
import io.kotest.extensions.testcontainers.ContainerExtension
8+
import io.kotest.matchers.collections.shouldContain
9+
import io.kotest.matchers.shouldBe
10+
import kotlinx.coroutines.cancel
11+
import kotlinx.coroutines.delay
12+
import kotlinx.coroutines.flow.first
13+
import kotlinx.coroutines.launch
14+
import kotlinx.coroutines.test.runTest
15+
import org.apache.kafka.clients.admin.KafkaAdminClient
16+
import org.apache.kafka.clients.admin.NewTopic
17+
import org.testcontainers.containers.KafkaContainer
18+
import org.testcontainers.utility.DockerImageName
19+
import java.util.Properties
20+
21+
enum class Topics(val topicName: String) {
22+
DEFAULT("testTopic")
23+
}
24+
25+
@OptIn(ExperimentalKotest::class)
26+
class KafkaStreamsTransportTest : BehaviorSpec({
27+
context("The KafkaStreamsTransport should send and receive messages using Kafka streams") {
28+
given("A Kafka cluster") {
29+
val kafka = install(ContainerExtension(KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.8.1"))))
30+
31+
beforeSpec {
32+
kafka.start()
33+
}
34+
35+
val registry = MessageRegistry<TestMessage>()
36+
registry.register(
37+
TestMessage::class,
38+
TestMessage.serializer()
39+
)
40+
41+
val transportConfig = KafkaStreamsTransportConfig.basic<TestMessage>(
42+
applicationId = "kcommand-streams-test",
43+
bootstrapServers = kafka.bootstrapServers,
44+
topicResolver = { _ -> "defaultTopic" }
45+
)
46+
47+
val transport = KafkaStreamsRemoteTransport<TestMessage, Any>(
48+
config = transportConfig,
49+
registry = registry,
50+
)
51+
52+
val properties = Properties()
53+
properties["bootstrap.servers"] = kafka.bootstrapServers
54+
properties["connections.max.idle.ms"] = 10000
55+
properties["request.timeout.ms"] = 5000
56+
val adminClient = KafkaAdminClient.create(properties)
57+
adminClient.createTopics(listOf(NewTopic("testTopic", 1, 1)))
58+
59+
afterSpec {
60+
kafka.stop()
61+
transport.stop()
62+
}
63+
64+
`when`("the cluster is inspected") {
65+
val topics = adminClient.listTopics()
66+
topics.names().whenComplete { topicNames, _ ->
67+
topicNames shouldContain "testTopic"
68+
}
69+
}
70+
71+
given("A KafkaStreamsRemoteTransport with a real Kafka container") {
72+
`when`("calling send(...)") {
73+
then("the message should be received in the flow") {
74+
runTest {
75+
val job = launch {
76+
val (msg, either) = transport.receive().first()
77+
msg.content shouldBe "Hello from Testcontainers"
78+
either.isLeft() shouldBe true
79+
this.cancel()
80+
}
81+
82+
// Wait for the topology to start
83+
delay(2000)
84+
85+
val result = transport.send(TestMessage("Hello from Testcontainers"))
86+
result.isLeft() shouldBe true
87+
88+
job.join()
89+
}
90+
}
91+
}
92+
}
93+
}
94+
}
95+
})
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.github.theunic.kcommand.transport.kafka
2+
3+
import kotlinx.serialization.Serializable
4+
5+
@Serializable
6+
data class TestMessage(val content: String)

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ plugins {
44

55
rootProject.name = "kcommand"
66
include(":kcommand-core")
7+
include(":kcommand-kafka-transport")

0 commit comments

Comments
 (0)