Skip to content

Commit 3b3f910

Browse files
committed
Fix SmartMessageConverter support in batch listeners
Fixes GH-4097 Problem: When using `@KafkaListener(contentTypeConverter = "...")` with a batch listener (`batch = "true"`), the SmartMessageConverter was not being applied during message conversion, resulting in ClassCastException when trying to process messages that required conversion (e.g., byte[] to String). Root Cause: The issue stemmed from two problems: 1. `BatchMessagingMessageConverter` default constructor created an instance without a `RecordMessageConverter` (`this(null)`), preventing proper per-record conversion within batch processing. 2. `BatchMessagingMessageListenerAdapter` did not override `setMessagingConverter()` to propagate the SmartMessageConverter from the annotation configuration to the batch converter's record converter. Additionally, the initial approach of bypassing the parent's validation in `setMessagingConverter()` conflicted with the framework's design to prevent the "paradox of choice" - users should configure converters either via factory setter OR annotation attribute, not both. Solution: 1. Changed `BatchMessagingMessageConverter` default constructor to always create a `MessagingMessageConverter` by default (`this(new MessagingMessageConverter())`). This ensures batch converters always have a record converter for per-record conversion within batches, and enables annotation-only configuration without requiring users to set converters on the factory. 2. Added `setMessagingConverter()` method to `BatchMessagingMessageConverter` that propagates the SmartMessageConverter to its internal record converter when it's an instance of `MessagingMessageConverter`. 3. Overrode `setMessagingConverter()` in `BatchMessagingMessageListenerAdapter` to: - Call `super.setMessagingConverter(messageConverter)` first, which applies the same validation as the parent class (`Assert.isTrue(!this.converterSet)`) to prevent configuration conflicts between factory setter and annotation. - Propagate the SmartMessageConverter to the batch converter's record converter for proper message conversion in batch processing. This approach: - Respects the framework's validation to prevent the "paradox of choice" - Ensures both batch and per-record conversion paths work correctly - Allows users to configure converters via annotation without needing to set converters on the factory (annotation-only configuration) - Avoids the complexity and potential issues of cloning converters - Works for multiple listeners with different contentTypeConverter values Testing: Added comprehensive integration test `BatchSmartMessageConverterTests` that: - Verifies SmartMessageConverter works with batch listeners using `@KafkaListener(contentTypeConverter = "...")` - Tests multiple listeners with different converters to ensure isolation - Uses `ConcurrentKafkaListenerContainerFactory` with batch mode to verify annotation attributes propagate correctly to the record converter Signed-off-by: Jujuwryy <[email protected]>
1 parent 240254e commit 3b3f910

File tree

3 files changed

+84
-23
lines changed

3 files changed

+84
-23
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,21 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdap
114114
* <p>
115115
* When a {@code SmartMessageConverter} is configured via
116116
* {@code @KafkaListener(contentTypeConverter = "...")}, this method ensures it is
117-
* properly propagated to the batch converter, which will then propagate it to the
118-
* record converter for message conversion in batch listeners.
117+
* properly propagated to the batch converter's record converter for message conversion
118+
* in batch listeners.
119119
* <p>
120-
* This override does not call the parent implementation because the parent's validation
121-
* (checking {@code converterSet}) blocks setting the SmartMessageConverter after
122-
* {@code setBatchMessageConverter} has been called, which is the normal workflow for
123-
* batch listeners.
120+
* Uses the same validation as the parent class to prevent the paradox of choice:
121+
* not allowed when a custom {@link #setBatchMessageConverter(BatchMessageConverter)
122+
* batchMessageConverter} is provided. Since {@link BatchMessagingMessageConverter} now
123+
* always has a default {@link org.springframework.kafka.support.converter.MessagingMessageConverter},
124+
* users can configure the converter via the annotation without needing to set it on the factory.
124125
* @param messageConverter the converter to set
125126
*/
126127
@Override
127128
public void setMessagingConverter(SmartMessageConverter messageConverter) {
128-
if (this.batchMessageConverter instanceof BatchMessagingMessageConverter messagingConverter) {
129-
messagingConverter.setMessagingConverter(messageConverter);
129+
super.setMessagingConverter(messageConverter);
130+
if (this.batchMessageConverter instanceof BatchMessagingMessageConverter batchConverter) {
131+
batchConverter.setMessagingConverter(messageConverter);
130132
}
131133
}
132134

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,11 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter {
8989
private boolean rawRecordHeader;
9090

9191
/**
92-
* Create an instance that does not convert the record values.
92+
* Create an instance with a default {@link MessagingMessageConverter} for record conversion.
93+
* @since 3.3.11
9394
*/
9495
public BatchMessagingMessageConverter() {
95-
this(null);
96+
this(new MessagingMessageConverter());
9697
}
9798

9899
/**

spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4242
import org.springframework.kafka.core.KafkaTemplate;
4343
import org.springframework.kafka.core.ProducerFactory;
44-
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
45-
import org.springframework.kafka.support.converter.MessagingMessageConverter;
4644
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4745
import org.springframework.kafka.test.context.EmbeddedKafka;
4846
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -64,7 +62,7 @@
6462
*/
6563
@SpringJUnitConfig
6664
@DirtiesContext
67-
@EmbeddedKafka(partitions = 1, topics = { "smartBatchTopic" })
65+
@EmbeddedKafka(partitions = 1, topics = { "smartBatchTopic", "smartBatchTopic2" })
6866
class BatchSmartMessageConverterTests {
6967

7068
@Autowired
@@ -75,18 +73,32 @@ class BatchSmartMessageConverterTests {
7573

7674
@Test
7775
void testContentTypeConverterWithBatchListener() throws Exception {
78-
// Given: A batch listener with contentTypeConverter configured
7976
BatchListener listener = this.config.batchListener();
77+
listener.reset(2);
8078

81-
// When: Send byte[] messages that should be converted to String
8279
this.template.send("smartBatchTopic", "hello".getBytes());
8380
this.template.send("smartBatchTopic", "world".getBytes());
8481

85-
// Then: SmartMessageConverter should convert byte[] to String for batch listener
8682
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
8783
assertThat(listener.received).hasSize(2).containsExactly("hello", "world");
8884
}
8985

86+
@Test
87+
void testMultipleListenersWithDifferentConverters() throws Exception {
88+
BatchListener listener1 = this.config.batchListener();
89+
BatchListener2 listener2 = this.config.batchListener2();
90+
listener1.reset(1);
91+
listener2.reset(1);
92+
93+
this.template.send("smartBatchTopic", "foo".getBytes());
94+
this.template.send("smartBatchTopic2", "bar".getBytes());
95+
96+
assertThat(listener1.latch.await(10, TimeUnit.SECONDS)).isTrue();
97+
assertThat(listener2.latch.await(10, TimeUnit.SECONDS)).isTrue();
98+
assertThat(listener1.received).hasSize(1).containsExactly("foo");
99+
assertThat(listener2.received).hasSize(1).containsExactly("BAR");
100+
}
101+
90102
@Configuration
91103
@EnableKafka
92104
public static class Config {
@@ -97,8 +109,6 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKa
97109
new ConcurrentKafkaListenerContainerFactory<>();
98110
factory.setConsumerFactory(consumerFactory(embeddedKafka));
99111
factory.setBatchListener(true);
100-
// Set up batch converter with record converter - framework will propagate SmartMessageConverter
101-
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(new MessagingMessageConverter()));
102112
return factory;
103113
}
104114

@@ -136,19 +146,29 @@ public Map<String, Object> producerConfigs(EmbeddedKafkaBroker embeddedKafka) {
136146

137147
@Bean
138148
public SmartMessageConverter byteArrayToStringConverter() {
139-
return new ByteArrayToStringConverter();
149+
return new ByteArrayConverter(bytes -> new String(bytes));
150+
}
151+
152+
@Bean
153+
public SmartMessageConverter byteArrayToUpperCaseConverter() {
154+
return new ByteArrayConverter(bytes -> new String(bytes).toUpperCase());
140155
}
141156

142157
@Bean
143158
public BatchListener batchListener() {
144159
return new BatchListener();
145160
}
146161

162+
@Bean
163+
public BatchListener2 batchListener2() {
164+
return new BatchListener2();
165+
}
166+
147167
}
148168

149169
public static class BatchListener {
150170

151-
private final CountDownLatch latch = new CountDownLatch(2);
171+
private CountDownLatch latch = new CountDownLatch(2);
152172

153173
private final List<String> received = new ArrayList<>();
154174

@@ -166,17 +186,55 @@ public void listen(List<String> messages) {
166186
});
167187
}
168188

189+
void reset(int expectedCount) {
190+
this.received.clear();
191+
this.latch = new CountDownLatch(expectedCount);
192+
}
193+
194+
}
195+
196+
public static class BatchListener2 {
197+
198+
private CountDownLatch latch = new CountDownLatch(1);
199+
200+
private final List<String> received = new ArrayList<>();
201+
202+
@KafkaListener(
203+
id = "batchSmartListener2",
204+
topics = "smartBatchTopic2",
205+
groupId = "smartBatchGroup2",
206+
contentTypeConverter = "byteArrayToUpperCaseConverter",
207+
batch = "true"
208+
)
209+
public void listen(List<String> messages) {
210+
messages.forEach(message -> {
211+
this.received.add(message);
212+
this.latch.countDown();
213+
});
214+
}
215+
216+
void reset(int expectedCount) {
217+
this.received.clear();
218+
this.latch = new CountDownLatch(expectedCount);
219+
}
220+
169221
}
170222

171223
/**
172-
* Simple SmartMessageConverter for testing that converts byte[] to String.
224+
* Simple SmartMessageConverter for testing that converts byte[] to String using a function.
173225
*/
174-
static class ByteArrayToStringConverter implements SmartMessageConverter {
226+
static class ByteArrayConverter implements SmartMessageConverter {
227+
228+
private final java.util.function.Function<byte[], String> converter;
229+
230+
ByteArrayConverter(java.util.function.Function<byte[], String> converter) {
231+
this.converter = converter;
232+
}
175233

176234
@Override
177235
public Object fromMessage(Message<?> message, Class<?> targetClass) {
178236
Object payload = message.getPayload();
179-
return (payload instanceof byte[] bytes) ? new String(bytes) : payload;
237+
return (payload instanceof byte[] bytes) ? this.converter.apply(bytes) : payload;
180238
}
181239

182240
@Override

0 commit comments

Comments
 (0)