@@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
66import org.apache.kafka.clients.producer.ProducerConfig
77import org.apache.kafka.common.serialization.StringDeserializer
88import org.apache.kafka.common.serialization.StringSerializer
9+ import org.springframework.beans.factory.annotation.Value
910import org.springframework.context.annotation.Bean
1011import org.springframework.context.annotation.Configuration
1112import org.springframework.context.annotation.Profile
@@ -24,6 +25,12 @@ import org.springframework.kafka.support.serializer.JsonSerializer
2425@EnableKafka
2526class KafkaConfig {
2627
28+ @Value(" \$ {spring.kafka.bootstrap-servers}" )
29+ private lateinit var bootstrapServer: String
30+
31+ @Value(" \$ {spring.kafka.consumer.group-id}" )
32+ private lateinit var groupId: String
33+
2734 @Bean
2835 fun kafkaListenerContainerFactory (consumerFactory : ConsumerFactory <String , JsonNode >) =
2936 ConcurrentKafkaListenerContainerFactory <String , JsonNode >().also { it.consumerFactory = consumerFactory }
@@ -32,6 +39,8 @@ class KafkaConfig {
3239 fun consumerFactory () = DefaultKafkaConsumerFactory <String , JsonNode >(consumerProps)
3340
3441 val consumerProps = mapOf (
42+ ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to bootstrapServer,
43+ ConsumerConfig .GROUP_ID_CONFIG to groupId,
3544 ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
3645 ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer ::class .java,
3746 JsonDeserializer .USE_TYPE_INFO_HEADERS to false ,
@@ -44,6 +53,7 @@ class KafkaConfig {
4453 fun producerFactory () = DefaultKafkaProducerFactory <String , KafkaMessage >(senderProps)
4554
4655 val senderProps = mapOf (
56+ ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to bootstrapServer,
4757 ProducerConfig .LINGER_MS_CONFIG to 10 ,
4858 ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG to StringSerializer ::class .java,
4959 ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer ::class .java
0 commit comments