Skip to content

Commit a3eb765

Browse files
committed
Fixes
1 parent e3633d0 commit a3eb765

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

kcommand-kafka-transport/src/main/kotlin/io/github/theunic/kcommand/transport/kafka/KafkaStreamsTransport.kt

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ suspend fun KafkaProducer<String, String>.sendAwait(record: ProducerRecord<Strin
3131
}
3232
}
3333

34-
class KafkaStreamsRemoteTransport<M : Any, R : Any>(
35-
private val config: KafkaStreamsTransportConfig<M>,
34+
class KafkaStreamsRemoteTransport<M : Any, R : Any, TOPIC: Enum<TOPIC>>(
35+
private val config: KafkaStreamsTransportConfig<M, TOPIC>,
3636
registry: MessageRegistry<M>,
3737
) : RemoteTransport<M, R>(registry) {
3838

@@ -51,17 +51,19 @@ class KafkaStreamsRemoteTransport<M : Any, R : Any>(
5151

5252
override suspend fun doSend(kclass: KClass<out M>, serializedMessage: String) {
5353
val topic = config.topicResolver.invoke(kclass)
54-
producer.sendAwait(ProducerRecord(topic, serializedMessage))
54+
producer.sendAwait(ProducerRecord(topic.name, serializedMessage))
5555
}
5656

5757
override fun doReceiveFlow(): Flow<String> = callbackFlow {
5858
val builder = StreamsBuilder()
5959

60-
val stream: KStream<String, String> = builder.stream(config.defaultTopic)
61-
stream.foreach { _, value ->
62-
val result = trySend(value)
63-
if (result.isFailure) {
64-
logger.error(result.exceptionOrNull()) {}
60+
config.inputTopics.forEach { topic ->
61+
val stream: KStream<String, String> = builder.stream(topic.name)
62+
stream.foreach { _, value ->
63+
val result = trySend(value)
64+
if (result.isFailure) {
65+
logger.error(result.exceptionOrNull()) {}
66+
}
6567
}
6668
}
6769

kcommand-kafka-transport/src/test/kotlin/KafkaStreamsTransportTest.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,14 @@ class KafkaStreamsTransportTest : BehaviorSpec({
3838
TestMessage.serializer()
3939
)
4040

41-
val transportConfig = KafkaStreamsTransportConfig.basic<TestMessage>(
41+
val transportConfig = KafkaStreamsTransportConfig.basic<TestMessage, Topics>(
4242
applicationId = "kcommand-streams-test",
4343
bootstrapServers = kafka.bootstrapServers,
44-
topicResolver = { _ -> "defaultTopic" }
44+
inputTopics = listOf(Topics.DEFAULT),
45+
topicResolver = { _ -> Topics.DEFAULT }
4546
)
4647

47-
val transport = KafkaStreamsRemoteTransport<TestMessage, Any>(
48+
val transport = KafkaStreamsRemoteTransport<TestMessage, Any, Topics>(
4849
config = transportConfig,
4950
registry = registry,
5051
)

0 commit comments

Comments
 (0)