Skip to content

Commit 382c0dc

Browse files
committed
Address review feedback: Use integration test and fix adapter implementation
Integration testing revealed the root cause of GH-4097. When Spring processes a @KafkaListener with contentTypeConverter and batch="true", the framework: 1. Calls setBatchMessageConverter() on the adapter 2. This internally calls setMessageConverter() which sets converterSet=true 3. Spring then tries to apply contentTypeConverter by calling setMessagingConverter() 4. The parent's validation Assert.isTrue(!this.converterSet, ...) blocks this The unit test didn't catch this because it bypassed the adapter and Spring framework integration entirely. Changes: - BatchMessagingMessageListenerAdapter.setMessagingConverter(): Override now directly applies SmartMessageConverter to batch converter (which propagates to record converter) without calling super, bypassing the validation that doesn't apply to the batch listener workflow - BatchSmartMessageConverterTests: Replaced unit test with full integration test using @SpringJUnitConfig, @embeddedkafka, ConcurrentKafkaListenerContainerFactory, and @KafkaListener to verify the complete framework flow - Added minimal ByteArrayToStringConverter (24 lines) for testing as no existing Spring Framework converter provides simple byte[] to String conversion needed for this test scenario All tests pass and checkstyle validation successful.
1 parent aba6ce3 commit 382c0dc

File tree

3 files changed

+147
-102
lines changed

3 files changed

+147
-102
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
* @author Venil Noronha
5757
* @author Wang ZhiYang
5858
* @author Sanghyeok An
59+
* @author George Mahfoud
5960
* @since 1.1
6061
*/
6162
public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
@@ -109,20 +110,24 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdap
109110
}
110111

111112
/**
112-
* Set the {@link SmartMessageConverter} to use with both the default record converter
113-
* and the batch message converter.
113+
* Set the {@link SmartMessageConverter} to use with the batch message converter.
114114
* <p>
115115
* When a {@code SmartMessageConverter} is configured via
116116
* {@code @KafkaListener(contentTypeConverter = "...")}, this method ensures it is
117-
* properly propagated to both the record converter (via the parent class) and the
118-
* batch converter to support message conversion in batch listeners.
117+
* properly propagated to the batch converter, which will then propagate it to the
118+
* record converter for message conversion in batch listeners.
119+
* <p>
120+
* This override does not call the parent implementation because the parent's validation
121+
* (checking {@code converterSet}) blocks setting the SmartMessageConverter after
122+
* {@code setBatchMessageConverter} has been called, which is the normal workflow for
123+
* batch listeners.
119124
* @param messageConverter the converter to set
120125
*/
121126
@Override
122127
public void setMessagingConverter(SmartMessageConverter messageConverter) {
123-
super.setMessagingConverter(messageConverter);
124-
if (this.batchMessageConverter instanceof BatchMessagingMessageConverter batchConverter) {
125-
batchConverter.setMessagingConverter(messageConverter);
128+
if (this.batchMessageConverter instanceof BatchMessagingMessageConverter) {
129+
((BatchMessagingMessageConverter) this.batchMessageConverter)
130+
.setMessagingConverter(messageConverter);
126131
}
127132
}
128133

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
* @author Borahm Lee
6969
* @author Artem Bilan
7070
* @author Soby Chacko
71+
* @author George Mahfoud
7172
*
7273
* @since 1.1
7374
*/
@@ -79,8 +80,6 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter {
7980
@Nullable
8081
private final RecordMessageConverter recordConverter;
8182

82-
private @Nullable SmartMessageConverter messagingConverter;
83-
8483
private boolean generateMessageId = false;
8584

8685
private boolean generateTimestamp = false;
@@ -152,8 +151,6 @@ public RecordMessageConverter getRecordMessageConverter() {
152151
* @since 3.3.11
153152
*/
154153
public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) {
155-
this.messagingConverter = messagingConverter;
156-
157154
if (this.recordConverter instanceof MessagingMessageConverter messagingRecordConverter) {
158155
messagingRecordConverter.setMessagingConverter(messagingConverter);
159156
}

spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java

Lines changed: 134 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -16,140 +16,183 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.lang.reflect.Type;
20-
import java.util.Arrays;
19+
import java.util.ArrayList;
2120
import java.util.List;
22-
23-
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import java.util.Map;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import org.apache.kafka.clients.consumer.ConsumerConfig;
26+
import org.apache.kafka.clients.producer.ProducerConfig;
27+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
28+
import org.apache.kafka.common.serialization.ByteArraySerializer;
29+
import org.apache.kafka.common.serialization.IntegerDeserializer;
30+
import org.apache.kafka.common.serialization.IntegerSerializer;
2431
import org.junit.jupiter.api.Test;
2532

33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.kafka.annotation.EnableKafka;
37+
import org.springframework.kafka.annotation.KafkaListener;
38+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
39+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
40+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
41+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
42+
import org.springframework.kafka.core.KafkaTemplate;
43+
import org.springframework.kafka.core.ProducerFactory;
2644
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
2745
import org.springframework.kafka.support.converter.MessagingMessageConverter;
46+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
47+
import org.springframework.kafka.test.context.EmbeddedKafka;
48+
import org.springframework.kafka.test.utils.KafkaTestUtils;
2849
import org.springframework.messaging.Message;
2950
import org.springframework.messaging.MessageHeaders;
3051
import org.springframework.messaging.converter.SmartMessageConverter;
3152
import org.springframework.messaging.support.MessageBuilder;
53+
import org.springframework.test.annotation.DirtiesContext;
54+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3255

3356
import static org.assertj.core.api.Assertions.assertThat;
3457

3558
/**
36-
* Tests for SmartMessageConverter support in batch listeners.
37-
* Reproduces the issue described in GH-4097.
59+
* Integration tests for SmartMessageConverter support in batch listeners.
60+
* Reproduces and verifies the fix for the issue described in GH-4097.
3861
*
39-
* @author Jujuwryy
62+
* @author George Mahfoud
4063
* @since 3.3.11
4164
*/
65+
@SpringJUnitConfig
66+
@DirtiesContext
67+
@EmbeddedKafka(partitions = 1, topics = { "smartBatchTopic" })
4268
class BatchSmartMessageConverterTests {
4369

44-
@Test
45-
void testSmartMessageConverterWorksInBatchConversion() {
46-
// Given: A BatchMessagingMessageConverter with a record converter and SmartMessageConverter
47-
MessagingMessageConverter recordConverter = new MessagingMessageConverter();
48-
BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter);
49-
50-
// Set up SmartMessageConverter that converts byte[] to String
51-
TestStringMessageConverter smartConverter = new TestStringMessageConverter();
52-
batchConverter.setMessagingConverter(smartConverter);
53-
54-
// Create test records with byte[] values that need conversion to String
55-
List<ConsumerRecord<?, ?>> records = Arrays.asList(
56-
new ConsumerRecord<>("topic", 0, 0, "key", "hello".getBytes()),
57-
new ConsumerRecord<>("topic", 0, 1, "key", "world".getBytes())
58-
);
59-
60-
// When: Convert batch with List<String> target type
61-
Type targetType = new TestParameterizedType(List.class, new Type[]{String.class});
62-
Message<?> result = batchConverter.toMessage(records, null, null, targetType);
63-
64-
// Then: Verify the SmartMessageConverter was applied and byte[] was converted to String
65-
assertThat(result).isNotNull();
66-
assertThat(result.getPayload()).isInstanceOf(List.class);
67-
68-
List<?> payloads = (List<?>) result.getPayload();
69-
assertThat(payloads).hasSize(2);
70-
assertThat(payloads.get(0)).isEqualTo("hello");
71-
assertThat(payloads.get(1)).isEqualTo("world");
72-
}
70+
@Autowired
71+
private KafkaTemplate<Integer, byte[]> template;
72+
73+
@Autowired
74+
private Config config;
7375

7476
@Test
75-
void testBatchConversionWithoutSmartMessageConverter() {
76-
// Given: A BatchMessagingMessageConverter without SmartMessageConverter
77-
MessagingMessageConverter recordConverter = new MessagingMessageConverter();
78-
BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter);
79-
80-
// Create test records with byte[] values
81-
List<ConsumerRecord<?, ?>> records = Arrays.asList(
82-
new ConsumerRecord<>("topic", 0, 0, "key", "test".getBytes())
83-
);
84-
85-
// When: Convert batch
86-
Type targetType = new TestParameterizedType(List.class, new Type[]{String.class});
87-
Message<?> result = batchConverter.toMessage(records, null, null, targetType);
88-
89-
// Then: Should work but payloads remain as byte[]
90-
assertThat(result).isNotNull();
91-
List<?> payloads = (List<?>) result.getPayload();
92-
assertThat(payloads.get(0)).isInstanceOf(byte[].class);
77+
void testContentTypeConverterWithBatchListener() throws Exception {
78+
// Given: A batch listener with contentTypeConverter configured
79+
BatchListener listener = this.config.batchListener();
80+
81+
// When: Send byte[] messages that should be converted to String
82+
this.template.send("smartBatchTopic", "hello".getBytes());
83+
this.template.send("smartBatchTopic", "world".getBytes());
84+
85+
// Then: SmartMessageConverter should convert byte[] to String for batch listener
86+
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
87+
assertThat(listener.received).hasSize(2).containsExactly("hello", "world");
9388
}
9489

95-
/**
96-
* Test SmartMessageConverter that converts byte[] to String.
97-
*/
98-
static class TestStringMessageConverter implements SmartMessageConverter {
90+
@Configuration
91+
@EnableKafka
92+
public static class Config {
93+
94+
@Bean
95+
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKafkaBroker embeddedKafka) {
96+
ConcurrentKafkaListenerContainerFactory<Integer, byte[]> factory =
97+
new ConcurrentKafkaListenerContainerFactory<>();
98+
factory.setConsumerFactory(consumerFactory(embeddedKafka));
99+
factory.setBatchListener(true);
100+
// Set up batch converter with record converter - framework will propagate SmartMessageConverter
101+
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(new MessagingMessageConverter()));
102+
return factory;
103+
}
99104

100-
@Override
101-
public Object fromMessage(Message<?> message, Class<?> targetClass) {
102-
return convertPayload(message.getPayload());
105+
@Bean
106+
public DefaultKafkaConsumerFactory<Integer, byte[]> consumerFactory(EmbeddedKafkaBroker embeddedKafka) {
107+
return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka));
103108
}
104109

105-
@Override
106-
public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {
107-
return convertPayload(message.getPayload());
110+
@Bean
111+
public Map<String, Object> consumerConfigs(EmbeddedKafkaBroker embeddedKafka) {
112+
Map<String, Object> consumerProps =
113+
KafkaTestUtils.consumerProps(embeddedKafka, "smartBatchGroup", false);
114+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
115+
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
116+
return consumerProps;
108117
}
109118

110-
@Override
111-
public Message<?> toMessage(Object payload, MessageHeaders headers) {
112-
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
119+
@Bean
120+
public KafkaTemplate<Integer, byte[]> template(EmbeddedKafkaBroker embeddedKafka) {
121+
return new KafkaTemplate<>(producerFactory(embeddedKafka));
113122
}
114123

115-
@Override
116-
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
117-
return toMessage(payload, headers);
124+
@Bean
125+
public ProducerFactory<Integer, byte[]> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
126+
return new DefaultKafkaProducerFactory<>(producerConfigs(embeddedKafka));
127+
}
128+
129+
@Bean
130+
public Map<String, Object> producerConfigs(EmbeddedKafkaBroker embeddedKafka) {
131+
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
132+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
133+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
134+
return props;
135+
}
136+
137+
@Bean
138+
public SmartMessageConverter byteArrayToStringConverter() {
139+
return new ByteArrayToStringConverter();
118140
}
119141

120-
private Object convertPayload(Object payload) {
121-
// Convert byte[] to String - this is the core functionality being tested
122-
if (payload instanceof byte[] bytes) {
123-
return new String(bytes);
124-
}
125-
return payload;
142+
@Bean
143+
public BatchListener batchListener() {
144+
return new BatchListener();
126145
}
146+
127147
}
128148

129-
/**
130-
* Helper class for creating parameterized types for testing.
131-
*/
132-
static class TestParameterizedType implements java.lang.reflect.ParameterizedType {
149+
public static class BatchListener {
150+
151+
private final CountDownLatch latch = new CountDownLatch(2);
133152

134-
private final Type rawType;
153+
private final List<String> received = new ArrayList<>();
135154

136-
private final Type[] typeArguments;
155+
@KafkaListener(
156+
id = "batchSmartListener",
157+
topics = "smartBatchTopic",
158+
groupId = "smartBatchGroup",
159+
contentTypeConverter = "byteArrayToStringConverter",
160+
batch = "true"
161+
)
162+
public void listen(List<String> messages) {
163+
messages.forEach(message -> {
164+
this.received.add(message);
165+
this.latch.countDown();
166+
});
167+
}
168+
169+
}
170+
171+
/**
172+
* Simple SmartMessageConverter for testing that converts byte[] to String.
173+
*/
174+
static class ByteArrayToStringConverter implements SmartMessageConverter {
137175

138-
TestParameterizedType(Type rawType, Type[] typeArguments) {
139-
this.rawType = rawType;
140-
this.typeArguments = typeArguments;
176+
@Override
177+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
178+
Object payload = message.getPayload();
179+
return (payload instanceof byte[] bytes) ? new String(bytes) : payload;
141180
}
142181

143-
public Type[] getActualTypeArguments() {
144-
return typeArguments;
182+
@Override
183+
public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {
184+
return fromMessage(message, targetClass);
145185
}
146186

147-
public Type getRawType() {
148-
return rawType;
187+
@Override
188+
public Message<?> toMessage(Object payload, MessageHeaders headers) {
189+
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
149190
}
150191

151-
public Type getOwnerType() {
152-
return null;
192+
@Override
193+
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
194+
return toMessage(payload, headers);
153195
}
196+
154197
}
155198
}

0 commit comments

Comments
 (0)