Skip to content

Commit 8b44432

Browse files
committed
Introduce an abstract RemoteTransport class
1 parent 17a9eee commit 8b44432

File tree

8 files changed

+219
-0
lines changed

8 files changed

+219
-0
lines changed

build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ plugins {
77
alias(libs.plugins.nmcp)
88
alias(libs.plugins.vanniktech)
99
alias(libs.plugins.dokka)
10+
alias(libs.plugins.serialization)
1011
`java-library`
1112
}
1213

gradle/libs.versions.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@ arrow = "2.0.1"
66
nmcp = "0.0.9"
77
vanniktech = "0.30.0"
88
dokka = "2.0.0"
9+
kotlinxSerializationJson = "1.8.0"
910

1011
[libraries]
1112
kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinCoroutines" }
1213
kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotestVersion" }
1314
kotest-assertions-core = { module = "io.kotest:kotest-assertions-core", version.ref = "kotestVersion" }
1415
arrow-core = { module = "io.arrow-kt:arrow-core", version.ref = "arrow" }
16+
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinxSerializationJson" }
1517

1618
[plugins]
1719
kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
1820
nmcp = { id = "com.gradleup.nmcp", version.ref = "nmcp" }
1921
vanniktech = { id = "com.vanniktech.maven.publish", version.ref = "vanniktech" }
2022
dokka = { id = "org.jetbrains.dokka", version.ref = "dokka" }
23+
serialization = { id = "org.jetbrains.kotlin.plugin.serialization", version.ref = "kotlin" }

kcommand-core/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,14 @@ description =
66
to the Message Bus implementations.
77
""".trimIndent().replace("\n", " ")
88

9+
plugins {
10+
alias(libs.plugins.serialization)
11+
}
12+
913
dependencies {
1014
implementation(libs.kotlinx.coroutines)
1115
implementation(libs.arrow.core)
16+
implementation(libs.kotlinx.serialization.json)
1217
testImplementation(libs.kotest.runner.junit5)
1318
testImplementation(libs.kotest.assertions.core)
1419
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.github.theunic.kcommand.core
2+
3+
import kotlinx.serialization.Serializable
4+
5+
@Serializable
6+
data class Envelope(
7+
val type: String,
8+
val payload: String,
9+
)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.github.theunic.kcommand.core
2+
3+
import kotlinx.serialization.KSerializer
4+
import kotlin.reflect.KClass
5+
6+
class MessageRegistry<M : Any> {
7+
private val classToEntry = mutableMapOf<KClass<out M>, Pair<String, KSerializer<out M>>>()
8+
private val nameToSerializer = mutableMapOf<String, KSerializer<out M>>()
9+
10+
fun <T : M> register(kclass: KClass<T>, serializer: KSerializer<T>) {
11+
val typeName = kclass.qualifiedName.toString()
12+
classToEntry[kclass] = typeName to serializer
13+
nameToSerializer[typeName] = serializer
14+
}
15+
16+
fun typeNameAndSerializer(kclass: KClass<out M>): Pair<String, KSerializer<out M>>? {
17+
return classToEntry[kclass]
18+
}
19+
20+
fun serializerFor(typeName: String): KSerializer<out M>? {
21+
return nameToSerializer[typeName]
22+
}
23+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.github.theunic.kcommand.core
2+
3+
import arrow.core.Either
4+
import kotlinx.serialization.json.Json
5+
import kotlinx.coroutines.CompletableDeferred
6+
import kotlinx.coroutines.flow.Flow
7+
import kotlinx.serialization.KSerializer
8+
import kotlinx.coroutines.flow.map
9+
import kotlinx.coroutines.flow.filterNotNull
10+
11+
abstract class RemoteTransport<M: Any, R: Any>(
12+
private val registry: MessageRegistry<M>,
13+
private val json: Json = Json {
14+
ignoreUnknownKeys = false
15+
encodeDefaults = true
16+
prettyPrint = false
17+
}
18+
): Transport<M, R> {
19+
protected abstract fun doSend(serializedMessage: String)
20+
protected abstract fun doReceiveFlow(): Flow<String>
21+
22+
override suspend fun send(message: M): Either<Unit, CompletableDeferred<R>> {
23+
val kclass = message::class
24+
val (typeName, serializer) = registry.typeNameAndSerializer(kclass)
25+
?: return Either.Left(Unit)
26+
27+
@Suppress("UNCHECKED_CAST")
28+
val payloadJson = json.encodeToString(serializer as KSerializer<M>, message)
29+
val envelope = Envelope(typeName, payloadJson)
30+
val envelopeJson = json.encodeToString(Envelope.serializer(), envelope)
31+
32+
doSend(envelopeJson)
33+
34+
// Fire-and-forget
35+
return Either.Left(Unit)
36+
}
37+
38+
override fun receive(): Flow<Pair<M, Either<Unit, CompletableDeferred<R>>>> {
39+
return doReceiveFlow()
40+
.map { raw ->
41+
try {
42+
val envelope = json.decodeFromString(Envelope.serializer(), raw)
43+
val serializer = registry.serializerFor(envelope.type)
44+
if (serializer != null) {
45+
val obj = json.decodeFromString(serializer, envelope.payload)
46+
val casted = obj as? M
47+
if (casted != null) {
48+
casted to Either.Left(Unit)
49+
} else {
50+
null
51+
}
52+
} else {
53+
null
54+
}
55+
} catch (e: Exception) {
56+
null
57+
}
58+
}
59+
.filterNotNull()
60+
}
61+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package io.github.theunic.kcommand.core
2+
3+
import io.kotest.core.spec.style.BehaviorSpec
4+
import io.kotest.matchers.shouldBe
5+
import kotlinx.coroutines.delay
6+
import kotlinx.coroutines.flow.first
7+
import kotlinx.coroutines.launch
8+
import kotlinx.coroutines.test.runTest
9+
import kotlinx.serialization.Serializable
10+
import kotlinx.coroutines.flow.Flow
11+
import kotlinx.coroutines.flow.MutableSharedFlow
12+
import kotlinx.serialization.json.Json
13+
14+
class DummyRemoteTransport<M : Any, R : Any>(
15+
registry: MessageRegistry<M>,
16+
json: Json = Json { ignoreUnknownKeys = true }
17+
) : RemoteTransport<M, R>(registry, json) {
18+
19+
val sentMessages = mutableListOf<String>()
20+
private val incomingFlow = MutableSharedFlow<String>()
21+
22+
override fun doSend(serializedMessage: String) {
23+
sentMessages.add(serializedMessage)
24+
}
25+
26+
override fun doReceiveFlow(): Flow<String> {
27+
return incomingFlow
28+
}
29+
30+
suspend fun emitIncomingMessage(raw: String) {
31+
incomingFlow.emit(raw)
32+
}
33+
}
34+
35+
@Serializable
36+
sealed class BaseCommand
37+
38+
@Serializable
39+
data class MyCommand(val id: Int, val payload: String) : BaseCommand()
40+
41+
@Serializable
42+
data class OtherCommand(val name: String) : BaseCommand()
43+
44+
class RemoteTransportTest : BehaviorSpec({
45+
val registry = MessageRegistry<BaseCommand>().apply {
46+
register(MyCommand::class, MyCommand.serializer())
47+
register(OtherCommand::class, OtherCommand.serializer())
48+
}
49+
50+
val transport = DummyRemoteTransport<BaseCommand, Any>(registry)
51+
52+
given("A RemoteTransport with a dummy implementation") {
53+
54+
`when`("we send a MyCommand object") {
55+
then("it should serialize the object into an Envelope and store it") {
56+
runTest {
57+
val cmd = MyCommand(42, "HelloTransport")
58+
val result = transport.send(cmd)
59+
60+
result.isLeft() shouldBe true
61+
62+
transport.sentMessages.size shouldBe 1
63+
val raw = transport.sentMessages.first()
64+
65+
raw.contains(MyCommand::class.qualifiedName.toString()) shouldBe true
66+
raw.contains("HelloTransport") shouldBe true
67+
}
68+
}
69+
}
70+
71+
`when`("we receive a raw envelope for OtherCommand") {
72+
then("it should reconstruct the object and emit it in the flow") {
73+
runTest {
74+
val job = launch {
75+
transport.receive().collect {
76+
println("DEBUG: flow emitted => $it")
77+
}
78+
}
79+
80+
val envelopeJson = """
81+
{
82+
"type": "${OtherCommand::class.qualifiedName}",
83+
"payload": "{\"name\":\"myName\"}"
84+
}
85+
""".trimIndent()
86+
87+
transport.emitIncomingMessage(envelopeJson)
88+
89+
delay(500)
90+
job.cancel()
91+
}
92+
}
93+
}
94+
95+
`when`("we receive an envelope with a type not registered") {
96+
then("it should emit nothing (and we don't see a Pair)") {
97+
runTest {
98+
val job = launch {
99+
transport.receive().first()
100+
}
101+
102+
val invalidEnvelopeJson = """
103+
{
104+
"type": "com.example.UnknownCommand",
105+
"payload": "{}"
106+
}
107+
""".trimIndent()
108+
109+
transport.emitIncomingMessage(invalidEnvelopeJson)
110+
111+
job.cancel()
112+
}
113+
}
114+
}
115+
}
116+
})

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ plugins {
44

55
rootProject.name = "kcommand"
66
include(":kcommand-core")
7+
include("kcommand-kafka-transport")

0 commit comments

Comments
 (0)