Skip to content

Commit a1ec4a0

Browse files
Jujuwryyartembilan
authored andcommitted
GH-4097: Fix SmartMessageConverter support in batch listeners
Fixes: #4097 `BatchMessagingMessageConverter` was missing `setMessagingConverter()` method that exists in `MessagingMessageConverter`, causing `SmartMessageConverter` configured via `@KafkaListener(contentTypeConverter)` to be ignored in batch listeners. This inconsistency between regular and batch listeners leads to `ClassCastException` when `byte[]` values aren't converted to the expected `String` type, breaking the contract that `SmartMessageConverter` should work the same way regardless of listener type. The fix ensures `SmartMessageConverter` propagation works consistently by: - Adding setMessagingConverter() to `BatchMessagingMessageConverter` that delegates to underlying `MessagingMessageConverter` - Overriding setMessagingConverter() in `BatchMessagingMessageListenerAdapter` to propagate the converter to batch converter - Maintaining the same `SmartMessageConverter` behavior between regular and batch listeners Integration testing revealed the root cause of GH-4097. When Spring processes a `@KafkaListener` with `contentTypeConverter` and `batch="true"`, the framework: 1. Calls `setBatchMessageConverter()` on the adapter 2. This internally calls `setMessageConverter()` which sets `converterSet=true` 3. Spring then tries to apply `contentTypeConverter` by calling `setMessagingConverter()` 4. The parent's validation `Assert.isTrue(!this.converterSet, ...)` blocks this The unit test didn't catch this because it bypassed the adapter and Spring framework integration entirely. Changes: - `BatchMessagingMessageListenerAdapter.setMessagingConverter()`: Override now directly applies `SmartMessageConverter` to batch converter (which propagates to record converter) without calling super, bypassing the validation that doesn't apply to the batch listener workflow - `BatchSmartMessageConverterTests`: Replaced unit test with full integration test using `@SpringJUnitConfig`, `@EmbeddedKafka`, `ConcurrentKafkaListenerContainerFactory`, and `@KafkaListener` to verify the complete framework flow - Added minimal `ByteArrayToStringConverter` (24 lines) for testing as no existing Spring Framework converter provides simple byte[] to String conversion needed for this test scenario All tests pass and checkstyle validation successful. - Use pattern matching for instanceof in setMessagingConverter() to avoid explicit casting - Fix constructor parameter indentation to use tabs only (not mixed spaces) - Address checkstyle violations per reviewer feedback These changes improve code readability without affecting functionality. * Fix remaining indentation issues in method parameters - Fix `toMessagingMessage()` parameter indentation in `BatchMessagingMessageListenerAdapter` - Fix `toMessage()` parameter indentation in `BatchMessagingMessageConverter` - Use single tab indentation consistently per Spring code style Problem: When using `@KafkaListener(contentTypeConverter = "...")` with a batch listener (`batch = "true"`), the SmartMessageConverter was not being applied during message conversion, resulting in ClassCastException when trying to process messages that required conversion (e.g., byte[] to String). Root Cause: The issue stemmed from two problems: 1. `BatchMessagingMessageConverter` default constructor created an instance without a `RecordMessageConverter` (`this(null)`), preventing proper per-record conversion within batch processing. 2. `BatchMessagingMessageListenerAdapter` did not override `setMessagingConverter()` to propagate the SmartMessageConverter from the annotation configuration to the batch converter's record converter. Additionally, the initial approach of bypassing the parent's validation in `setMessagingConverter()` conflicted with the framework's design to prevent the "paradox of choice" - users should configure converters either via factory setter OR annotation attribute, not both. Solution: 1. Changed `BatchMessagingMessageConverter` default constructor to always create a `MessagingMessageConverter` by default (`this(new MessagingMessageConverter())`). This ensures batch converters always have a record converter for per-record conversion within batches, and enables annotation-only configuration without requiring users to set converters on the factory. 2. Added `setMessagingConverter()` method to `BatchMessagingMessageConverter` that propagates the SmartMessageConverter to its internal record converter when it's an instance of `MessagingMessageConverter`. 3. Overrode `setMessagingConverter()` in `BatchMessagingMessageListenerAdapter` to: - Call `super.setMessagingConverter(messageConverter)` first, which applies the same validation as the parent class (`Assert.isTrue(!this.converterSet)`) to prevent configuration conflicts between factory setter and annotation. - Propagate the SmartMessageConverter to the batch converter's record converter for proper message conversion in batch processing. This approach: - Respects the framework's validation to prevent the "paradox of choice" - Ensures both batch and per-record conversion paths work correctly - Allows users to configure converters via annotation without needing to set converters on the factory (annotation-only configuration) - Avoids the complexity and potential issues of cloning converters - Works for multiple listeners with different contentTypeConverter values Testing: Added comprehensive integration test `BatchSmartMessageConverterTests` that: - Verifies SmartMessageConverter works with batch listeners using `@KafkaListener(contentTypeConverter = "...")` - Tests multiple listeners with different converters to ensure isolation - Uses `ConcurrentKafkaListenerContainerFactory` with batch mode to verify annotation attributes propagate correctly to the record converter * Polish documentation and test code - Improve Javadoc clarity with technical explanation for method constraints - Remove redundant phrase from `BatchMessagingMessageConverter` documentation - Generalize test class description for future extensibility - Clean up test assertions by removing redundant size checks - Use descriptive variable names in test data * Fix Javadoc links in `setMessagingConverter` method - Add proper links for `SmartMessageConverter` and `BatchMessageConverter` - Use code formatting for `batchMessageConverter` field reference - Simplify `setBatchMessageConverter` method link format Signed-off-by: Jujuwryy <[email protected]> # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java
1 parent 14a82e5 commit a1ec4a0

File tree

3 files changed

+311
-7
lines changed

3 files changed

+311
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.kafka.support.converter.RecordMessageConverter;
3333
import org.springframework.lang.Nullable;
3434
import org.springframework.messaging.Message;
35+
import org.springframework.messaging.converter.SmartMessageConverter;
3536
import org.springframework.messaging.support.MessageBuilder;
3637
import org.springframework.util.Assert;
3738

@@ -55,6 +56,7 @@
5556
* @author Venil Noronha
5657
* @author Wang ZhiYang
5758
* @author Sanghyeok An
59+
* @author George Mahfoud
5860
* @since 1.1
5961
*/
6062
public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
@@ -107,6 +109,30 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdap
107109
this.batchToRecordAdapter = batchToRecordAdapter;
108110
}
109111

112+
/**
113+
* Set the {@link SmartMessageConverter} to use with the batch message converter.
114+
* <p>
115+
* When a {@code SmartMessageConverter} is configured via
116+
* {@code @KafkaListener(contentTypeConverter = "...")}, this method ensures it is
117+
* properly propagated to the batch converter's record converter for message conversion
118+
* in batch listeners.
119+
* <p>
120+
* This method cannot be called after {@link #setBatchMessageConverter(BatchMessageConverter)}
121+
* as it would cause a mutation of the internal {@code batchMessageConverter}. Instead, the
122+
* {@link SmartMessageConverter} has to be provided on the external {@link BatchMessageConverter}.
123+
* Since {@link BatchMessagingMessageConverter} now
124+
* always has a default {@link org.springframework.kafka.support.converter.MessagingMessageConverter},
125+
* users can configure the converter via the annotation without needing to set it on the factory.
126+
* @param messageConverter the converter to set
127+
*/
128+
@Override
129+
public void setMessagingConverter(SmartMessageConverter messageConverter) {
130+
super.setMessagingConverter(messageConverter);
131+
if (this.batchMessageConverter instanceof BatchMessagingMessageConverter batchConverter) {
132+
batchConverter.setMessagingConverter(messageConverter);
133+
}
134+
}
135+
110136
/**
111137
* Return the {@link BatchMessagingMessageConverter} for this listener,
112138
* being able to convert {@link org.springframework.messaging.Message}.

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.kafka.support.serializer.SerializationUtils;
4343
import org.springframework.lang.Nullable;
4444
import org.springframework.messaging.Message;
45+
import org.springframework.messaging.converter.SmartMessageConverter;
4546
import org.springframework.messaging.support.MessageBuilder;
4647

4748
/**
@@ -65,6 +66,7 @@
6566
* @author Hope Kim
6667
* @author Borahm Lee
6768
* @author Artem Bilan
69+
* @author George Mahfoud
6870
*
6971
* @since 1.1
7072
*/
@@ -84,10 +86,11 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter {
8486
private boolean rawRecordHeader;
8587

8688
/**
87-
* Create an instance that does not convert the record values.
89+
* Create an instance with a default {@link MessagingMessageConverter} for record conversion.
90+
* @since 3.3.11
8891
*/
8992
public BatchMessagingMessageConverter() {
90-
this(null);
93+
this(new MessagingMessageConverter());
9194
}
9295

9396
/**
@@ -136,6 +139,18 @@ public RecordMessageConverter getRecordMessageConverter() {
136139
return this.recordConverter;
137140
}
138141

142+
/**
143+
* Set a {@link SmartMessageConverter} to convert the record value to
144+
* the desired type.
145+
* @param messagingConverter the converter.
146+
* @since 3.3.11
147+
*/
148+
public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) {
149+
if (this.recordConverter instanceof MessagingMessageConverter messagingRecordConverter) {
150+
messagingRecordConverter.setMessagingConverter(messagingConverter);
151+
}
152+
}
153+
139154
/**
140155
* Set to true to add the raw {@code List<ConsumerRecord<?, ?>>} as a header
141156
* {@link KafkaHeaders#RAW_DATA}.
@@ -267,14 +282,20 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
267282
* @param type the type - must be a {@link ParameterizedType} with a single generic
268283
* type parameter.
269284
* @param conversionFailures Conversion failures.
270-
* @return the converted payload.
285+
* @return the converted payload, potentially further processed by a {@link SmartMessageConverter}.
271286
*/
272287
protected Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
273288
try {
274-
Object payload = this.recordConverter
275-
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
276-
conversionFailures.add(null);
277-
return payload;
289+
if (this.recordConverter != null) {
290+
Type actualType = ((ParameterizedType) type).getActualTypeArguments()[0];
291+
Object payload = this.recordConverter
292+
.toMessage(record, null, null, actualType).getPayload();
293+
conversionFailures.add(null);
294+
return payload;
295+
}
296+
else {
297+
return null;
298+
}
278299
}
279300
catch (ConversionException ex) {
280301
byte[] original = null;
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Copyright 2016-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import org.apache.kafka.clients.consumer.ConsumerConfig;
26+
import org.apache.kafka.clients.producer.ProducerConfig;
27+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
28+
import org.apache.kafka.common.serialization.ByteArraySerializer;
29+
import org.apache.kafka.common.serialization.IntegerDeserializer;
30+
import org.apache.kafka.common.serialization.IntegerSerializer;
31+
import org.junit.jupiter.api.Test;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.kafka.annotation.EnableKafka;
37+
import org.springframework.kafka.annotation.KafkaListener;
38+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
39+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
40+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
41+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
42+
import org.springframework.kafka.core.KafkaTemplate;
43+
import org.springframework.kafka.core.ProducerFactory;
44+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
45+
import org.springframework.kafka.test.context.EmbeddedKafka;
46+
import org.springframework.kafka.test.utils.KafkaTestUtils;
47+
import org.springframework.messaging.Message;
48+
import org.springframework.messaging.MessageHeaders;
49+
import org.springframework.messaging.converter.SmartMessageConverter;
50+
import org.springframework.messaging.support.MessageBuilder;
51+
import org.springframework.test.annotation.DirtiesContext;
52+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
53+
54+
import static org.assertj.core.api.Assertions.assertThat;
55+
56+
/**
57+
* Integration tests for SmartMessageConverter support in batch listeners.
58+
*
59+
* @author George Mahfoud
60+
* @since 3.3.11
61+
*/
62+
@SpringJUnitConfig
63+
@DirtiesContext
64+
@EmbeddedKafka(partitions = 1, topics = { "smartBatchTopic", "smartBatchTopic2" })
65+
class BatchSmartMessageConverterTests {
66+
67+
@Autowired
68+
private KafkaTemplate<Integer, byte[]> template;
69+
70+
@Autowired
71+
private Config config;
72+
73+
@Test
74+
void testContentTypeConverterWithBatchListener() throws Exception {
75+
BatchListener listener = this.config.batchListener();
76+
listener.reset(2);
77+
78+
this.template.send("smartBatchTopic", "hello".getBytes());
79+
this.template.send("smartBatchTopic", "world".getBytes());
80+
81+
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
82+
assertThat(listener.received).containsExactly("hello", "world");
83+
}
84+
85+
@Test
86+
void testMultipleListenersWithDifferentConverters() throws Exception {
87+
BatchListener listener1 = this.config.batchListener();
88+
BatchListener2 listener2 = this.config.batchListener2();
89+
listener1.reset(1);
90+
listener2.reset(1);
91+
92+
String listener1Data = "listener1Data";
93+
String listener2Data = "listener2Data";
94+
this.template.send("smartBatchTopic", listener1Data.getBytes());
95+
this.template.send("smartBatchTopic2", listener2Data.getBytes());
96+
97+
assertThat(listener1.latch.await(10, TimeUnit.SECONDS)).isTrue();
98+
assertThat(listener2.latch.await(10, TimeUnit.SECONDS)).isTrue();
99+
assertThat(listener1.received).containsExactly(listener1Data);
100+
assertThat(listener2.received).containsExactly(listener2Data.toUpperCase());
101+
}
102+
103+
@Configuration
104+
@EnableKafka
105+
public static class Config {
106+
107+
@Bean
108+
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKafkaBroker embeddedKafka) {
109+
ConcurrentKafkaListenerContainerFactory<Integer, byte[]> factory =
110+
new ConcurrentKafkaListenerContainerFactory<>();
111+
factory.setConsumerFactory(consumerFactory(embeddedKafka));
112+
factory.setBatchListener(true);
113+
return factory;
114+
}
115+
116+
@Bean
117+
public DefaultKafkaConsumerFactory<Integer, byte[]> consumerFactory(EmbeddedKafkaBroker embeddedKafka) {
118+
return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka));
119+
}
120+
121+
@Bean
122+
public Map<String, Object> consumerConfigs(EmbeddedKafkaBroker embeddedKafka) {
123+
Map<String, Object> consumerProps =
124+
KafkaTestUtils.consumerProps("smartBatchGroup", "false", embeddedKafka);
125+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
126+
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
127+
return consumerProps;
128+
}
129+
130+
@Bean
131+
public KafkaTemplate<Integer, byte[]> template(EmbeddedKafkaBroker embeddedKafka) {
132+
return new KafkaTemplate<>(producerFactory(embeddedKafka));
133+
}
134+
135+
@Bean
136+
public ProducerFactory<Integer, byte[]> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
137+
return new DefaultKafkaProducerFactory<>(producerConfigs(embeddedKafka));
138+
}
139+
140+
@Bean
141+
public Map<String, Object> producerConfigs(EmbeddedKafkaBroker embeddedKafka) {
142+
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
143+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
144+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
145+
return props;
146+
}
147+
148+
@Bean
149+
public SmartMessageConverter byteArrayToStringConverter() {
150+
return new ByteArrayConverter(bytes -> new String(bytes));
151+
}
152+
153+
@Bean
154+
public SmartMessageConverter byteArrayToUpperCaseConverter() {
155+
return new ByteArrayConverter(bytes -> new String(bytes).toUpperCase());
156+
}
157+
158+
@Bean
159+
public BatchListener batchListener() {
160+
return new BatchListener();
161+
}
162+
163+
@Bean
164+
public BatchListener2 batchListener2() {
165+
return new BatchListener2();
166+
}
167+
168+
}
169+
170+
public static class BatchListener {
171+
172+
private CountDownLatch latch = new CountDownLatch(2);
173+
174+
private final List<String> received = new ArrayList<>();
175+
176+
@KafkaListener(
177+
id = "batchSmartListener",
178+
topics = "smartBatchTopic",
179+
groupId = "smartBatchGroup",
180+
contentTypeConverter = "byteArrayToStringConverter",
181+
batch = "true"
182+
)
183+
public void listen(List<String> messages) {
184+
messages.forEach(message -> {
185+
this.received.add(message);
186+
this.latch.countDown();
187+
});
188+
}
189+
190+
void reset(int expectedCount) {
191+
this.received.clear();
192+
this.latch = new CountDownLatch(expectedCount);
193+
}
194+
195+
}
196+
197+
public static class BatchListener2 {
198+
199+
private CountDownLatch latch = new CountDownLatch(1);
200+
201+
private final List<String> received = new ArrayList<>();
202+
203+
@KafkaListener(
204+
id = "batchSmartListener2",
205+
topics = "smartBatchTopic2",
206+
groupId = "smartBatchGroup2",
207+
contentTypeConverter = "byteArrayToUpperCaseConverter",
208+
batch = "true"
209+
)
210+
public void listen(List<String> messages) {
211+
messages.forEach(message -> {
212+
this.received.add(message);
213+
this.latch.countDown();
214+
});
215+
}
216+
217+
void reset(int expectedCount) {
218+
this.received.clear();
219+
this.latch = new CountDownLatch(expectedCount);
220+
}
221+
222+
}
223+
224+
/**
225+
* Simple SmartMessageConverter for testing that converts byte[] to String using a function.
226+
*/
227+
static class ByteArrayConverter implements SmartMessageConverter {
228+
229+
private final java.util.function.Function<byte[], String> converter;
230+
231+
ByteArrayConverter(java.util.function.Function<byte[], String> converter) {
232+
this.converter = converter;
233+
}
234+
235+
@Override
236+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
237+
Object payload = message.getPayload();
238+
return (payload instanceof byte[] bytes) ? this.converter.apply(bytes) : payload;
239+
}
240+
241+
@Override
242+
public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {
243+
return fromMessage(message, targetClass);
244+
}
245+
246+
@Override
247+
public Message<?> toMessage(Object payload, MessageHeaders headers) {
248+
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
249+
}
250+
251+
@Override
252+
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
253+
return toMessage(payload, headers);
254+
}
255+
256+
}
257+
}

0 commit comments

Comments
 (0)