Skip to content

Commit b7fd563

Browse files
committed
Allow for deep kafka connection customization
1 parent a743698 commit b7fd563

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

kcommand-kafka-transport/src/main/kotlin/io/github/theunic/kcommand/transport/kafka/KafkaStreamsTransportConfig.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ data class KafkaStreamsTransportConfig<M : Any, TOPIC : Enum<TOPIC>>(
2222
bootstrapServers: String,
2323
inputTopics: List<TOPIC>,
2424
topicResolver: (KClass<out M>) -> TOPIC,
25+
propertiesModifier: (Properties) -> Properties = { it },
2526
): KafkaStreamsTransportConfig<M, TOPIC> {
2627
val props =
2728
Properties().apply {
@@ -32,8 +33,10 @@ data class KafkaStreamsTransportConfig<M : Any, TOPIC : Enum<TOPIC>>(
3233
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde::class.java.name)
3334
}
3435

36+
val finalProps = propertiesModifier.invoke(props)
37+
3538
return KafkaStreamsTransportConfig(
36-
streamsProperties = props,
39+
streamsProperties = finalProps,
3740
topicResolver = topicResolver,
3841
inputTopics = inputTopics,
3942
)

0 commit comments

Comments
 (0)