Skip to content

Commit eda59f7

Browse files
committed
Start building against Spring Kafka 3.0.3 snapshots
See gh-34153
1 parent cd43b4e commit eda59f7

File tree

4 files changed

+33
-23
lines changed

4 files changed

+33
-23
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2022 the original author or authors.
2+
* Copyright 2012-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,7 +29,8 @@
2929
import org.springframework.kafka.listener.ContainerProperties;
3030
import org.springframework.kafka.listener.RecordInterceptor;
3131
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
32-
import org.springframework.kafka.support.converter.MessageConverter;
32+
import org.springframework.kafka.support.converter.BatchMessageConverter;
33+
import org.springframework.kafka.support.converter.RecordMessageConverter;
3334
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
3435

3536
/**
@@ -43,7 +44,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
4344

4445
private KafkaProperties properties;
4546

46-
private MessageConverter messageConverter;
47+
private BatchMessageConverter batchMessageConverter;
48+
49+
private RecordMessageConverter recordMessageConverter;
4750

4851
private RecordFilterStrategy<Object, Object> recordFilterStrategy;
4952

@@ -68,11 +71,19 @@ void setKafkaProperties(KafkaProperties properties) {
6871
}
6972

7073
/**
71-
* Set the {@link MessageConverter} to use.
72-
* @param messageConverter the message converter
74+
* Set the {@link BatchMessageConverter} to use.
75+
* @param batchMessageConverter the message converter
76+
*/
77+
void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
78+
this.batchMessageConverter = batchMessageConverter;
79+
}
80+
81+
/**
82+
* Set the {@link RecordMessageConverter} to use.
83+
* @param recordMessageConverter the message converter
7384
*/
74-
void setMessageConverter(MessageConverter messageConverter) {
75-
this.messageConverter = messageConverter;
85+
void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
86+
this.recordMessageConverter = recordMessageConverter;
7687
}
7788

7889
/**
@@ -151,7 +162,8 @@ private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Ob
151162
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
152163
Listener properties = this.properties.getListener();
153164
map.from(properties::getConcurrency).to(factory::setConcurrency);
154-
map.from(this.messageConverter).to(factory::setMessageConverter);
165+
map.from(this.batchMessageConverter).to(factory::setBatchMessageConverter);
166+
map.from(this.recordMessageConverter).to(factory::setRecordMessageConverter);
155167
map.from(this.recordFilterStrategy).to(factory::setRecordFilterStrategy);
156168
map.from(this.replyTemplate).to(factory::setReplyTemplate);
157169
if (properties.getType().equals(Listener.Type.BATCH)) {

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2022 the original author or authors.
2+
* Copyright 2012-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@
1919
import org.springframework.beans.factory.ObjectProvider;
2020
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
2121
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
22-
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
2322
import org.springframework.context.annotation.Bean;
2423
import org.springframework.context.annotation.Configuration;
2524
import org.springframework.kafka.annotation.EnableKafka;
@@ -35,7 +34,6 @@
3534
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
3635
import org.springframework.kafka.support.converter.BatchMessageConverter;
3736
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
38-
import org.springframework.kafka.support.converter.MessageConverter;
3937
import org.springframework.kafka.support.converter.RecordMessageConverter;
4038
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
4139

@@ -51,7 +49,7 @@ class KafkaAnnotationDrivenConfiguration {
5149

5250
private final KafkaProperties properties;
5351

54-
private final RecordMessageConverter messageConverter;
52+
private final RecordMessageConverter recordMessageConverter;
5553

5654
private final RecordFilterStrategy<Object, Object> recordFilterStrategy;
5755

@@ -70,7 +68,7 @@ class KafkaAnnotationDrivenConfiguration {
7068
private final RecordInterceptor<Object, Object> recordInterceptor;
7169

7270
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
73-
ObjectProvider<RecordMessageConverter> messageConverter,
71+
ObjectProvider<RecordMessageConverter> recordMessageConverter,
7472
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
7573
ObjectProvider<BatchMessageConverter> batchMessageConverter,
7674
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
@@ -80,10 +78,10 @@ class KafkaAnnotationDrivenConfiguration {
8078
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
8179
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
8280
this.properties = properties;
83-
this.messageConverter = messageConverter.getIfUnique();
81+
this.recordMessageConverter = recordMessageConverter.getIfUnique();
8482
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
8583
this.batchMessageConverter = batchMessageConverter
86-
.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
84+
.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));
8785
this.kafkaTemplate = kafkaTemplate.getIfUnique();
8886
this.transactionManager = kafkaTransactionManager.getIfUnique();
8987
this.rebalanceListener = rebalanceListener.getIfUnique();
@@ -97,9 +95,8 @@ class KafkaAnnotationDrivenConfiguration {
9795
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
9896
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
9997
configurer.setKafkaProperties(this.properties);
100-
MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))
101-
? this.batchMessageConverter : this.messageConverter;
102-
configurer.setMessageConverter(messageConverterToUse);
98+
configurer.setBatchMessageConverter(this.batchMessageConverter);
99+
configurer.setRecordMessageConverter(this.recordMessageConverter);
103100
configurer.setRecordFilterStrategy(this.recordFilterStrategy);
104101
configurer.setReplyTemplate(this.kafkaTemplate);
105102
configurer.setTransactionManager(this.transactionManager);

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverter() {
519519
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class).run((context) -> {
520520
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
521521
.getBean(ConcurrentKafkaListenerContainerFactory.class);
522-
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("messageConverter",
522+
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("recordMessageConverter",
523523
context.getBean("myMessageConverter"));
524524
});
525525
}
@@ -531,7 +531,7 @@ void testConcurrentKafkaListenerContainerFactoryInBatchModeWithCustomMessageConv
531531
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
532532
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
533533
.getBean(ConcurrentKafkaListenerContainerFactory.class);
534-
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("messageConverter",
534+
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("batchMessageConverter",
535535
context.getBean("myBatchMessageConverter"));
536536
});
537537
}
@@ -543,7 +543,7 @@ void testConcurrentKafkaListenerContainerFactoryInBatchModeWrapsCustomMessageCon
543543
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
544544
.getBean(ConcurrentKafkaListenerContainerFactory.class);
545545
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory,
546-
"messageConverter");
546+
"batchMessageConverter");
547547
assertThat(messageConverter).isInstanceOf(BatchMessagingMessageConverter.class);
548548
assertThat(((BatchMessageConverter) messageConverter).getRecordMessageConverter())
549549
.isSameAs(context.getBean("myMessageConverter"));
@@ -555,7 +555,8 @@ void testConcurrentKafkaListenerContainerFactoryInBatchModeWithNoMessageConverte
555555
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
556556
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
557557
.getBean(ConcurrentKafkaListenerContainerFactory.class);
558-
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory, "messageConverter");
558+
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory,
559+
"batchMessageConverter");
559560
assertThat(messageConverter).isInstanceOf(BatchMessagingMessageConverter.class);
560561
assertThat(((BatchMessageConverter) messageConverter).getRecordMessageConverter()).isNull();
561562
});

spring-boot-project/spring-boot-dependencies/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1384,7 +1384,7 @@ bom {
13841384
]
13851385
}
13861386
}
1387-
library("Spring Kafka", "3.0.2") {
1387+
library("Spring Kafka", "3.0.3-SNAPSHOT") {
13881388
group("org.springframework.kafka") {
13891389
modules = [
13901390
"spring-kafka",

0 commit comments

Comments
 (0)