Skip to content

Commit 6885c34

Browse files
committed
Upgrade to Spring Kafka 3.0.3
Closes gh-34354
1 parent fcf75fd commit 6885c34

File tree

4 files changed

+31
-21
lines changed

4 files changed

+31
-21
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
import org.springframework.kafka.listener.ContainerProperties;
3131
import org.springframework.kafka.listener.RecordInterceptor;
3232
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
33-
import org.springframework.kafka.support.converter.MessageConverter;
33+
import org.springframework.kafka.support.converter.BatchMessageConverter;
34+
import org.springframework.kafka.support.converter.RecordMessageConverter;
3435
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
3536

3637
/**
@@ -45,7 +46,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
4546

4647
private KafkaProperties properties;
4748

48-
private MessageConverter messageConverter;
49+
private BatchMessageConverter batchMessageConverter;
50+
51+
private RecordMessageConverter recordMessageConverter;
4952

5053
private RecordFilterStrategy<Object, Object> recordFilterStrategy;
5154

@@ -72,11 +75,19 @@ void setKafkaProperties(KafkaProperties properties) {
7275
}
7376

7477
/**
75-
* Set the {@link MessageConverter} to use.
76-
* @param messageConverter the message converter
78+
* Set the {@link BatchMessageConverter} to use.
79+
* @param batchMessageConverter the message converter
80+
*/
81+
void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
82+
this.batchMessageConverter = batchMessageConverter;
83+
}
84+
85+
/**
86+
* Set the {@link RecordMessageConverter} to use.
87+
* @param recordMessageConverter the message converter
7788
*/
78-
void setMessageConverter(MessageConverter messageConverter) {
79-
this.messageConverter = messageConverter;
89+
void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
90+
this.recordMessageConverter = recordMessageConverter;
8091
}
8192

8293
/**
@@ -164,7 +175,8 @@ private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Ob
164175
Listener properties = this.properties.getListener();
165176
map.from(properties::getConcurrency).to(factory::setConcurrency);
166177
map.from(properties::isAutoStartup).to(factory::setAutoStartup);
167-
map.from(this.messageConverter).to(factory::setMessageConverter);
178+
map.from(this.batchMessageConverter).to(factory::setBatchMessageConverter);
179+
map.from(this.recordMessageConverter).to(factory::setRecordMessageConverter);
168180
map.from(this.recordFilterStrategy).to(factory::setRecordFilterStrategy);
169181
map.from(this.replyTemplate).to(factory::setReplyTemplate);
170182
if (properties.getType().equals(Listener.Type.BATCH)) {

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.springframework.beans.factory.ObjectProvider;
2020
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
2121
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
22-
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
2322
import org.springframework.context.annotation.Bean;
2423
import org.springframework.context.annotation.Configuration;
2524
import org.springframework.kafka.annotation.EnableKafka;
@@ -38,7 +37,6 @@
3837
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
3938
import org.springframework.kafka.support.converter.BatchMessageConverter;
4039
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
41-
import org.springframework.kafka.support.converter.MessageConverter;
4240
import org.springframework.kafka.support.converter.RecordMessageConverter;
4341
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
4442

@@ -55,7 +53,7 @@ class KafkaAnnotationDrivenConfiguration {
5553

5654
private final KafkaProperties properties;
5755

58-
private final RecordMessageConverter messageConverter;
56+
private final RecordMessageConverter recordMessageConverter;
5957

6058
private final RecordFilterStrategy<Object, Object> recordFilterStrategy;
6159

@@ -76,7 +74,7 @@ class KafkaAnnotationDrivenConfiguration {
7674
private final BatchInterceptor<Object, Object> batchInterceptor;
7775

7876
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
79-
ObjectProvider<RecordMessageConverter> messageConverter,
77+
ObjectProvider<RecordMessageConverter> recordMessageConverter,
8078
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
8179
ObjectProvider<BatchMessageConverter> batchMessageConverter,
8280
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
@@ -87,10 +85,10 @@ class KafkaAnnotationDrivenConfiguration {
8785
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor,
8886
ObjectProvider<BatchInterceptor<Object, Object>> batchInterceptor) {
8987
this.properties = properties;
90-
this.messageConverter = messageConverter.getIfUnique();
88+
this.recordMessageConverter = recordMessageConverter.getIfUnique();
9189
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
9290
this.batchMessageConverter = batchMessageConverter
93-
.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
91+
.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));
9492
this.kafkaTemplate = kafkaTemplate.getIfUnique();
9593
this.transactionManager = kafkaTransactionManager.getIfUnique();
9694
this.rebalanceListener = rebalanceListener.getIfUnique();
@@ -105,9 +103,8 @@ class KafkaAnnotationDrivenConfiguration {
105103
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
106104
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
107105
configurer.setKafkaProperties(this.properties);
108-
MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))
109-
? this.batchMessageConverter : this.messageConverter;
110-
configurer.setMessageConverter(messageConverterToUse);
106+
configurer.setBatchMessageConverter(this.batchMessageConverter);
107+
configurer.setRecordMessageConverter(this.recordMessageConverter);
111108
configurer.setRecordFilterStrategy(this.recordFilterStrategy);
112109
configurer.setReplyTemplate(this.kafkaTemplate);
113110
configurer.setTransactionManager(this.transactionManager);

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverter() {
551551
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class).run((context) -> {
552552
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
553553
.getBean(ConcurrentKafkaListenerContainerFactory.class);
554-
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("messageConverter",
554+
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("recordMessageConverter",
555555
context.getBean("myMessageConverter"));
556556
});
557557
}
@@ -564,7 +564,7 @@ void testConcurrentKafkaListenerContainerFactoryInBatchModeWithCustomMessageConv
564564
.run((context) -> {
565565
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
566566
.getBean(ConcurrentKafkaListenerContainerFactory.class);
567-
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("messageConverter",
567+
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("batchMessageConverter",
568568
context.getBean("myBatchMessageConverter"));
569569
});
570570
}
@@ -577,7 +577,7 @@ void testConcurrentKafkaListenerContainerFactoryInBatchModeWrapsCustomMessageCon
577577
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
578578
.getBean(ConcurrentKafkaListenerContainerFactory.class);
579579
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory,
580-
"messageConverter");
580+
"batchMessageConverter");
581581
assertThat(messageConverter).isInstanceOf(BatchMessagingMessageConverter.class);
582582
assertThat(((BatchMessageConverter) messageConverter).getRecordMessageConverter())
583583
.isSameAs(context.getBean("myMessageConverter"));
@@ -589,7 +589,8 @@ void testConcurrentKafkaListenerContainerFactoryInBatchModeWithNoMessageConverte
589589
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
590590
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
591591
.getBean(ConcurrentKafkaListenerContainerFactory.class);
592-
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory, "messageConverter");
592+
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory,
593+
"batchMessageConverter");
593594
assertThat(messageConverter).isInstanceOf(BatchMessagingMessageConverter.class);
594595
assertThat(((BatchMessageConverter) messageConverter).getRecordMessageConverter()).isNull();
595596
});

spring-boot-project/spring-boot-dependencies/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1371,7 +1371,7 @@ bom {
13711371
]
13721372
}
13731373
}
1374-
library("Spring Kafka", "3.0.2") {
1374+
library("Spring Kafka", "3.0.3") {
13751375
group("org.springframework.kafka") {
13761376
modules = [
13771377
"spring-kafka",

0 commit comments

Comments
 (0)