Skip to content

Commit a743698

Browse files
committed
Improve documentation
1 parent a9adff5 commit a743698

File tree

1 file changed

+151
-0
lines changed

1 file changed

+151
-0
lines changed

README.md

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,157 @@ fun main() {
4545
}
4646
```
4747

48+
#### Using multiples transports
49+
50+
Sometimes, you want your DefaultMessageBus to communicate through multiple transports at once (for example, a Kafka transport for some messages and a local fallback for others). In that scenario, you can create an AggregatorTransport that internally holds references to several sub-transports. The AggregatorTransport implements the Transport interface and delegates to each sub-transport under the hood, including a local transport as a fallback if none is specified:
51+
52+
```kotlin
53+
// Example aggregator configuration
54+
import kotlinx.serialization.Serializable
55+
56+
// Suppose you have different sub-transports:
57+
val kafkaTransport = KafkaTransport()
58+
val rabbitTransport = RabbitMQTransport()
59+
60+
@Serializable
61+
sealed class MyMessage {
62+
@Serializable
63+
data class KafkaMessage(val message: String) : MyMessage()
64+
@Serializable
65+
data class RabbitMQMessage(val message: String) : MyMessage()
66+
@Serializable
67+
data class DefaultMessage(val message: String) : MyMessage()
68+
}
69+
70+
// Create an aggregator transport with them:
71+
val aggregatorTransport = AggregatorTransport(
72+
transports = mapOf(
73+
"kafka" to kafkaTransport,
74+
"rabbit" to rabbitTransport
75+
),
76+
transportResolver = { message: MyMessage ->
77+
// Decide which transport name to use (kafka? rabbit? or null for local)
78+
when (message) {
79+
is MyMessage.KafkaMessage -> "kafka"
80+
is MyMessage.RabbitMQMessage -> "rabbit"
81+
else -> null
82+
}
83+
}
84+
)
85+
86+
// Then, create the DefaultMessageBus using that aggregator:
87+
val messageBus = DefaultMessageBus<MyMessage, Int>(
88+
transport = aggregatorTransport
89+
)
90+
91+
runBlocking {
92+
// A KafkaMessage
93+
messageBus.handle(MyMessage.KafkaMessage("Hello Kafka"))
94+
// A RabbitMQMessage
95+
messageBus.handle(MyMessage.RabbitMQMessage("Hello Rabbit"))
96+
// A local message
97+
messageBus.handle(MyMessage.DefaultMessage("local processing"))
98+
}
99+
```
100+
101+
With this approach, the DefaultMessageBus sees only one Transport (the aggregator), but it can route different messages to different sub-transports based on your custom logic (annotation, message type, etc.). If neither your logic nor the map contains a match, the aggregator’s built-in local transport will handle the message
102+
103+
## Transports
104+
105+
A Transport is responsible for either sending messages somewhere (e.g., publishing them to a remote broker) or receiving them (e.g., consuming from a queue/topic) and ultimately passing them into the bus.
106+
107+
1. LocalTransport: Processes messages entirely in-process (no remote broker).
108+
2. KafkaStreamsRemoteTransport: Uses Kafka Streams to consume and/or publish messages to Kafka.
109+
3. AggregatorTransport: A composite transport that can hold multiple sub-transports (Kafka, Rabbit, local fallback, etc.) and decide at runtime which one to use.
110+
111+
Each Transport implements
112+
113+
```kotlin
114+
interface Transport<M : Any, R : Any> {
115+
suspend fun send(message: M): Either<Unit, CompletableDeferred<R>>
116+
fun receive(): Flow<Pair<M, Either<Unit, CompletableDeferred<R>>>>
117+
}
118+
```
119+
120+
* `send(message)` 👉 may push the message to a remote system or process it locally (depending on the implementation).
121+
* `reive()` 👉 provides a Flow of incoming messages from that transport (e.g., from Kafka, Rabbit, or a local channel).
122+
123+
Below are some of the provided transports (besides the AggregatorTransport, which is documented above with DefaultMessageBus):
124+
125+
### LocalTransport
126+
127+
The simplest transport is LocalTransport, which processes all messages in-process without any remote broker. It’s ideal for testing or purely local scenarios
128+
129+
```kotlin
130+
// Example usage with LocalTransport
131+
val localTransport = LocalTransport<String, Int>()
132+
133+
// Create the DefaultMessageBus specifying the local transport
134+
val messageBus = DefaultMessageBus<String, Int>(
135+
transport = localTransport
136+
)
137+
138+
// Subscribe locally
139+
messageBus.subscribe(String::class) { it.length }
140+
141+
// Dispatch a message
142+
runBlocking {
143+
val result = messageBus.handle("A local message")
144+
println(result.await()) // => 14
145+
}
146+
```
147+
148+
Since everything remains in-process, there’s no remote queue or topic to consume. This transport is perfect for simpler testing or “single JVM” usage.
149+
150+
### KafkaStreamsRemoteTransport
151+
152+
**_KafkaStreamsRemoteTransport_** leverages Kafka Streams to both publish messages to Kafka and consume them from Kafka topics
153+
154+
#### Installation
155+
156+
Kafka transport lives in it's own separate package
157+
158+
```kotlin
159+
dependencies {
160+
// ...
161+
implementation("io.github.theunic:kcommand-core")
162+
implementation("io.github.theunic:kcommand-kafka-transport")
163+
}
164+
```
165+
166+
#### Usage example
167+
168+
```kotlin
169+
// Suppose we have some config and a "MessageRegistry" for serialization
170+
val kafkaConfig = KafkaTransportConfig(
171+
applicationId = "my-kafka-streams-app",
172+
bootstrapServers = "localhost:9092",
173+
// other settings if needed
174+
)
175+
176+
val kafkaTransport = KafkaStreamsRemoteTransport<String, Int>(
177+
config = kafkaConfig
178+
// possibly pass a serializer or registry if needed
179+
)
180+
181+
// Create the DefaultMessageBus
182+
val messageBus = DefaultMessageBus<String, Int>(
183+
transport = kafkaTransport
184+
)
185+
186+
// Subscribing locally is optional, or you can rely on external handling
187+
messageBus.subscribe(String::class) { it.length }
188+
189+
// Now when you do:
190+
runBlocking {
191+
val deferred = messageBus.handle("Hello Kafka")
192+
println(deferred.await()) // logic depends on the transport's approach
193+
}
194+
195+
// Meanwhile, KafkaStreamsRemoteTransport will consume messages from the configured topics
196+
// and re-inject them into the bus, if you collect the flow or run it in parallel.
197+
```
198+
48199
## Contributing
49200

50201
Contributions of all sizes are welcome.

0 commit comments

Comments
 (0)