Skip to content

Commit 0efa355

Browse files
committed
Add stopable interface. Implement top-down stop mechanism for DefaultMessageBus. DefaultMessageBus now accepts more than one transport that can be configured per message
1 parent ba5dbff commit 0efa355

File tree

8 files changed

+66
-8
lines changed

8 files changed

+66
-8
lines changed

kcommand-core/src/main/kotlin/io/github/theunic/kcommand/core/DefaultMessageBus.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.github.theunic.kcommand.core
22

33
import arrow.core.getOrElse
4+
import io.github.theunic.kcommand.core.transport.AggregatorTransport
45
import kotlinx.coroutines.CompletableDeferred
56
import kotlinx.coroutines.CoroutineScope
67
import kotlinx.coroutines.Dispatchers
@@ -9,8 +10,8 @@ import kotlinx.coroutines.flow.onEach
910

1011
class DefaultMessageBus<M : Any, R : Any>(
1112
middlewares: List<Middleware<M, R>> = listOf(),
12-
private val transport: Transport<M, R> = LocalTransport(),
13-
) : AbstractMessageBus<M, R>(middlewares) {
13+
private val transport: AggregatorTransport<M, R> = AggregatorTransport(),
14+
) : AbstractMessageBus<M, R>(middlewares), Stopable {
1415
init {
1516
transport
1617
.receive()
@@ -22,4 +23,8 @@ class DefaultMessageBus<M : Any, R : Any>(
2223
transport
2324
.send(message)
2425
.getOrElse { CompletableDeferred() }
26+
27+
override fun stop() {
28+
transport.stop()
29+
}
2530
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.github.theunic.kcommand.core
2+
3+
interface Stopable {
4+
fun stop()
5+
}

kcommand-core/src/main/kotlin/io/github/theunic/kcommand/core/Transport.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,5 @@ import kotlinx.coroutines.flow.Flow
66

77
interface Transport<M : Any, R : Any> {
88
suspend fun send(message: M): Either<Unit, CompletableDeferred<R>>
9-
109
fun receive(): Flow<Pair<M, Either<Unit, CompletableDeferred<R>>>>
1110
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.github.theunic.kcommand.core.transport
2+
3+
import arrow.core.Either
4+
import io.github.theunic.kcommand.core.Stopable
5+
import io.github.theunic.kcommand.core.Transport
6+
import kotlinx.coroutines.CompletableDeferred
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.merge
9+
10+
class AggregatorTransport<M : Any, R : Any>(
11+
private val transports: Map<String, Transport<M, R>> = mapOf(),
12+
private val transportResolver: (M) -> String? = { _ -> null } ,
13+
) : Transport<M, R>, Stopable {
14+
15+
private val localTransport: Transport<M, R> = LocalTransport()
16+
17+
override suspend fun send(message: M): Either<Unit, CompletableDeferred<R>> {
18+
val transportName = transportResolver(message)
19+
val fallbackTransport = if (transportName == null) {
20+
localTransport
21+
} else {
22+
transports[transportName] ?: localTransport
23+
}
24+
25+
return fallbackTransport.send(message)
26+
}
27+
28+
override fun receive(): Flow<Pair<M, Either<Unit, CompletableDeferred<R>>>> {
29+
val flows = buildList {
30+
add(localTransport.receive())
31+
addAll(transports.values.map { it.receive() })
32+
}
33+
return flows.merge()
34+
}
35+
36+
override fun stop() {
37+
transports.values.forEach { tr ->
38+
if (tr is Stopable) {
39+
tr.stop()
40+
}
41+
}
42+
}
43+
}

kcommand-core/src/main/kotlin/io/github/theunic/kcommand/core/LocalTransport.kt renamed to kcommand-core/src/main/kotlin/io/github/theunic/kcommand/core/transport/LocalTransport.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
package io.github.theunic.kcommand.core
1+
package io.github.theunic.kcommand.core.transport
22

33
import arrow.core.Either
4+
import io.github.theunic.kcommand.core.Transport
45
import kotlinx.coroutines.CompletableDeferred
56
import kotlinx.coroutines.flow.Flow
67
import kotlinx.coroutines.flow.MutableSharedFlow

kcommand-core/src/main/kotlin/io/github/theunic/kcommand/core/RemoteTransport.kt renamed to kcommand-core/src/main/kotlin/io/github/theunic/kcommand/core/transport/RemoteTransport.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
package io.github.theunic.kcommand.core
1+
package io.github.theunic.kcommand.core.transport
22

33
import arrow.core.Either
4+
import io.github.theunic.kcommand.core.Envelope
5+
import io.github.theunic.kcommand.core.MessageRegistry
6+
import io.github.theunic.kcommand.core.Transport
47
import kotlinx.serialization.json.Json
58
import kotlinx.coroutines.CompletableDeferred
69
import kotlinx.coroutines.flow.Flow

kcommand-core/src/test/kotlin/io/github/theunic/kcommand/core/RemoteTransportTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.theunic.kcommand.core
22

3+
import io.github.theunic.kcommand.core.transport.RemoteTransport
34
import io.kotest.core.spec.style.BehaviorSpec
45
import io.kotest.matchers.shouldBe
56
import kotlinx.coroutines.delay

kcommand-kafka-transport/src/main/kotlin/io/github/theunic/kcommand/transport/kafka/KafkaStreamsTransport.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package io.github.theunic.kcommand.transport.kafka
22

33
import io.github.oshai.kotlinlogging.KotlinLogging
44
import io.github.theunic.kcommand.core.MessageRegistry
5-
import io.github.theunic.kcommand.core.RemoteTransport
5+
import io.github.theunic.kcommand.core.Stopable
6+
import io.github.theunic.kcommand.core.transport.RemoteTransport
67
import kotlinx.coroutines.channels.awaitClose
78
import kotlinx.coroutines.flow.Flow
89
import kotlinx.coroutines.flow.callbackFlow
@@ -34,7 +35,7 @@ suspend fun KafkaProducer<String, String>.sendAwait(record: ProducerRecord<Strin
3435
class KafkaStreamsRemoteTransport<M : Any, R : Any, TOPIC: Enum<TOPIC>>(
3536
private val config: KafkaStreamsTransportConfig<M, TOPIC>,
3637
registry: MessageRegistry<M>,
37-
) : RemoteTransport<M, R>(registry) {
38+
) : RemoteTransport<M, R>(registry), Stopable {
3839

3940
private val producer: KafkaProducer<String, String> by lazy {
4041
val props = Properties().apply {
@@ -77,7 +78,7 @@ class KafkaStreamsRemoteTransport<M : Any, R : Any, TOPIC: Enum<TOPIC>>(
7778
}
7879
}
7980

80-
fun stop() {
81+
override fun stop() {
8182
streams.close(Duration.ofSeconds(1))
8283
producer.close()
8384
}

0 commit comments

Comments
 (0)