Skip to content

Commit 5a43d31

Browse files
committed
Fix SmartMessageConverter support in batch listeners
BatchMessagingMessageConverter was missing setMessagingConverter() method that exists in MessagingMessageConverter, causing SmartMessageConverter configured via @KafkaListener(contentTypeConverter) to be ignored in batch listeners. This inconsistency between regular and batch listeners leads to ClassCastException when byte[] values aren't converted to the expected String type, breaking the contract that SmartMessageConverter should work the same way regardless of listener type. The fix ensures SmartMessageConverter propagation works consistently by: - Adding setMessagingConverter() to BatchMessagingMessageConverter that delegates to underlying MessagingMessageConverter - Overriding setMessagingConverter() in BatchMessagingMessageListenerAdapter to propagate the converter to batch converter - Maintaining the same SmartMessageConverter behavior between regular and batch listeners Fixes GH-4097 Signed-off-by: Jujuwryy <[email protected]>
1 parent 558453b commit 5a43d31

File tree

3 files changed

+202
-10
lines changed

3 files changed

+202
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
3333
import org.springframework.kafka.support.converter.RecordMessageConverter;
3434
import org.springframework.messaging.Message;
35+
import org.springframework.messaging.converter.SmartMessageConverter;
3536
import org.springframework.messaging.support.MessageBuilder;
3637
import org.springframework.util.Assert;
3738

@@ -80,7 +81,7 @@ public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Met
8081
* @param errorHandler the error handler.
8182
*/
8283
public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method,
83-
@Nullable KafkaListenerErrorHandler errorHandler) {
84+
@Nullable KafkaListenerErrorHandler errorHandler) {
8485

8586
super(bean, method, errorHandler);
8687
}
@@ -107,6 +108,24 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdap
107108
this.batchToRecordAdapter = batchToRecordAdapter;
108109
}
109110

111+
/**
112+
* Set the {@link SmartMessageConverter} to use with both the default record converter
113+
* and the batch message converter.
114+
* <p>
115+
* When a {@code SmartMessageConverter} is configured via
116+
* {@code @KafkaListener(contentTypeConverter = "...")}, this method ensures it is
117+
* properly propagated to both the record converter (via the parent class) and the
118+
* batch converter to support message conversion in batch listeners.
119+
* @param messageConverter the converter to set
120+
*/
121+
@Override
122+
public void setMessagingConverter(SmartMessageConverter messageConverter) {
123+
super.setMessagingConverter(messageConverter);
124+
if (this.batchMessageConverter instanceof BatchMessagingMessageConverter batchConverter) {
125+
batchConverter.setMessagingConverter(messageConverter);
126+
}
127+
}
128+
110129
/**
111130
* Return the {@link BatchMessagingMessageConverter} for this listener,
112131
* being able to convert {@link org.springframework.messaging.Message}.
@@ -124,7 +143,7 @@ public boolean wantsPollResult() {
124143

125144
@Override
126145
public void onMessage(ConsumerRecords<K, V> records, @Nullable Acknowledgment acknowledgment,
127-
Consumer<K, V> consumer) {
146+
Consumer<K, V> consumer) {
128147
invoke(records, acknowledgment, consumer, NULL_MESSAGE);
129148
}
130149

@@ -139,7 +158,7 @@ public void onMessage(ConsumerRecords<K, V> records, @Nullable Acknowledgment ac
139158
*/
140159
@Override
141160
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
142-
@Nullable Consumer<?, ?> consumer) {
161+
@Nullable Consumer<?, ?> consumer) {
143162

144163
Message<?> message;
145164
if (!isConsumerRecordList()) {
@@ -170,7 +189,7 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme
170189

171190
@SuppressWarnings({ "unchecked", "rawtypes" })
172191
protected Message<?> toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment,
173-
@Nullable Consumer<?, ?> consumer) {
192+
@Nullable Consumer<?, ?> consumer) {
174193

175194
return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType());
176195
}

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.springframework.kafka.support.KafkaNull;
4444
import org.springframework.kafka.support.serializer.SerializationUtils;
4545
import org.springframework.messaging.Message;
46+
import org.springframework.messaging.converter.SmartMessageConverter;
4647
import org.springframework.messaging.support.MessageBuilder;
4748

4849
/**
@@ -78,6 +79,8 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter {
7879
@Nullable
7980
private final RecordMessageConverter recordConverter;
8081

82+
private @Nullable SmartMessageConverter messagingConverter;
83+
8184
private boolean generateMessageId = false;
8285

8386
private boolean generateTimestamp = false;
@@ -142,6 +145,20 @@ public RecordMessageConverter getRecordMessageConverter() {
142145
return this.recordConverter;
143146
}
144147

148+
/**
149+
* Set a spring-messaging {@link SmartMessageConverter} to convert the record value to
150+
* the desired type.
151+
* @param messagingConverter the converter.
152+
* @since 3.3.11
153+
*/
154+
public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) {
155+
this.messagingConverter = messagingConverter;
156+
157+
if (this.recordConverter instanceof MessagingMessageConverter messagingRecordConverter) {
158+
messagingRecordConverter.setMessagingConverter(messagingConverter);
159+
}
160+
}
161+
145162
/**
146163
* Set to true to add the raw {@code List<ConsumerRecord<?, ?>>} as a header
147164
* {@link KafkaHeaders#RAW_DATA}.
@@ -154,7 +171,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) {
154171

155172
@Override // NOSONAR
156173
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
157-
@Nullable Consumer<?, ?> consumer, Type type) {
174+
@Nullable Consumer<?, ?> consumer, Type type) {
158175

159176
KafkaMessageHeaders kafkaMessageHeaders =
160177
new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp);
@@ -208,7 +225,7 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
208225
}
209226

210227
private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
211-
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {
228+
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {
212229

213230
if (this.headerMapper != null) {
214231
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
@@ -223,8 +240,8 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
223240
}
224241

225242
private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys,
226-
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
227-
List<Long> timestamps, List<ConversionException> conversionFailures) {
243+
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
244+
List<Long> timestamps, List<ConversionException> conversionFailures) {
228245

229246
payloads.add(obtainPayload(type, record, conversionFailures));
230247
keys.add(record.key());
@@ -275,13 +292,14 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
275292
* @param type the type - must be a {@link ParameterizedType} with a single generic
276293
* type parameter.
277294
* @param conversionFailures Conversion failures.
278-
* @return the converted payload.
295+
* @return the converted payload, potentially further processed by a {@link SmartMessageConverter}.
279296
*/
280297
protected @Nullable Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
281298
try {
282299
if (this.recordConverter != null) {
300+
Type actualType = ((ParameterizedType) type).getActualTypeArguments()[0];
283301
Object payload = this.recordConverter
284-
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
302+
.toMessage(record, null, null, actualType).getPayload();
285303
conversionFailures.add(null);
286304
return payload;
287305
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright 2016-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.lang.reflect.Type;
20+
import java.util.Arrays;
21+
import java.util.List;
22+
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
27+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
28+
import org.springframework.messaging.Message;
29+
import org.springframework.messaging.MessageHeaders;
30+
import org.springframework.messaging.converter.SmartMessageConverter;
31+
import org.springframework.messaging.support.MessageBuilder;
32+
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
35+
/**
36+
* Tests for SmartMessageConverter support in batch listeners.
37+
* Reproduces the issue described in GH-4097.
38+
*
39+
* @author Jujuwryy
40+
* @since 3.3.11
41+
*/
42+
class BatchSmartMessageConverterTests {
43+
44+
@Test
45+
void testSmartMessageConverterWorksInBatchConversion() {
46+
// Given: A BatchMessagingMessageConverter with a record converter and SmartMessageConverter
47+
MessagingMessageConverter recordConverter = new MessagingMessageConverter();
48+
BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter);
49+
50+
// Set up SmartMessageConverter that converts byte[] to String
51+
TestStringMessageConverter smartConverter = new TestStringMessageConverter();
52+
batchConverter.setMessagingConverter(smartConverter);
53+
54+
// Create test records with byte[] values that need conversion to String
55+
List<ConsumerRecord<?, ?>> records = Arrays.asList(
56+
new ConsumerRecord<>("topic", 0, 0, "key", "hello".getBytes()),
57+
new ConsumerRecord<>("topic", 0, 1, "key", "world".getBytes())
58+
);
59+
60+
// When: Convert batch with List<String> target type
61+
Type targetType = new TestParameterizedType(List.class, new Type[]{String.class});
62+
Message<?> result = batchConverter.toMessage(records, null, null, targetType);
63+
64+
// Then: Verify the SmartMessageConverter was applied and byte[] was converted to String
65+
assertThat(result).isNotNull();
66+
assertThat(result.getPayload()).isInstanceOf(List.class);
67+
68+
List<?> payloads = (List<?>) result.getPayload();
69+
assertThat(payloads).hasSize(2);
70+
assertThat(payloads.get(0)).isEqualTo("hello");
71+
assertThat(payloads.get(1)).isEqualTo("world");
72+
}
73+
74+
@Test
75+
void testBatchConversionWithoutSmartMessageConverter() {
76+
// Given: A BatchMessagingMessageConverter without SmartMessageConverter
77+
MessagingMessageConverter recordConverter = new MessagingMessageConverter();
78+
BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter);
79+
80+
// Create test records with byte[] values
81+
List<ConsumerRecord<?, ?>> records = Arrays.asList(
82+
new ConsumerRecord<>("topic", 0, 0, "key", "test".getBytes())
83+
);
84+
85+
// When: Convert batch
86+
Type targetType = new TestParameterizedType(List.class, new Type[]{String.class});
87+
Message<?> result = batchConverter.toMessage(records, null, null, targetType);
88+
89+
// Then: Should work but payloads remain as byte[]
90+
assertThat(result).isNotNull();
91+
List<?> payloads = (List<?>) result.getPayload();
92+
assertThat(payloads.get(0)).isInstanceOf(byte[].class);
93+
}
94+
95+
/**
96+
* Test SmartMessageConverter that converts byte[] to String.
97+
*/
98+
static class TestStringMessageConverter implements SmartMessageConverter {
99+
100+
@Override
101+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
102+
return convertPayload(message.getPayload());
103+
}
104+
105+
@Override
106+
public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {
107+
return convertPayload(message.getPayload());
108+
}
109+
110+
@Override
111+
public Message<?> toMessage(Object payload, MessageHeaders headers) {
112+
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
113+
}
114+
115+
@Override
116+
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
117+
return toMessage(payload, headers);
118+
}
119+
120+
private Object convertPayload(Object payload) {
121+
// Convert byte[] to String - this is the core functionality being tested
122+
if (payload instanceof byte[] bytes) {
123+
return new String(bytes);
124+
}
125+
return payload;
126+
}
127+
}
128+
129+
/**
130+
* Helper class for creating parameterized types for testing.
131+
*/
132+
static class TestParameterizedType implements java.lang.reflect.ParameterizedType {
133+
134+
private final Type rawType;
135+
136+
private final Type[] typeArguments;
137+
138+
TestParameterizedType(Type rawType, Type[] typeArguments) {
139+
this.rawType = rawType;
140+
this.typeArguments = typeArguments;
141+
}
142+
143+
public Type[] getActualTypeArguments() {
144+
return typeArguments;
145+
}
146+
147+
public Type getRawType() {
148+
return rawType;
149+
}
150+
151+
public Type getOwnerType() {
152+
return null;
153+
}
154+
}
155+
}

0 commit comments

Comments
 (0)