diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index 1d0979d7a2..77ceb1b523 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -32,6 +32,7 @@ import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.messaging.Message; +import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -55,6 +56,7 @@ * @author Venil Noronha * @author Wang ZhiYang * @author Sanghyeok An + * @author George Mahfoud * @since 1.1 */ public class BatchMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter @@ -107,6 +109,30 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter batchToRecordAdap this.batchToRecordAdapter = batchToRecordAdapter; } + /** + * Set the {@link SmartMessageConverter} to use with the batch message converter. + *

+ * When a {@code SmartMessageConverter} is configured via + * {@code @KafkaListener(contentTypeConverter = "...")}, this method ensures it is + * properly propagated to the batch converter's record converter for message conversion + * in batch listeners. + *

+ * This method cannot be called after {@link #setBatchMessageConverter(BatchMessageConverter)} + * as it would cause a mutation of the internal {@code batchMessageConverter}. Instead, the + * {@link SmartMessageConverter} has to be provided on the external {@link BatchMessageConverter}. + * Since {@link BatchMessagingMessageConverter} now + * always has a default {@link org.springframework.kafka.support.converter.MessagingMessageConverter}, + * users can configure the converter via the annotation without needing to set it on the factory. + * @param messageConverter the converter to set + */ + @Override + public void setMessagingConverter(SmartMessageConverter messageConverter) { + super.setMessagingConverter(messageConverter); + if (this.batchMessageConverter instanceof BatchMessagingMessageConverter batchConverter) { + batchConverter.setMessagingConverter(messageConverter); + } + } + /** * Return the {@link BatchMessagingMessageConverter} for this listener, * being able to convert {@link org.springframework.messaging.Message}. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 3a1e74f580..ec37d82075 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -43,6 +43,7 @@ import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.messaging.Message; +import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.support.MessageBuilder; /** @@ -67,6 +68,7 @@ * @author Borahm Lee * @author Artem Bilan * @author Soby Chacko + * @author George Mahfoud * * @since 1.1 */ @@ -87,10 +89,11 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter { private boolean rawRecordHeader; /** - * Create an instance that does not convert the record values. + * Create an instance with a default {@link MessagingMessageConverter} for record conversion. + * @since 3.3.11 */ public BatchMessagingMessageConverter() { - this(null); + this(new MessagingMessageConverter()); } /** @@ -142,6 +145,18 @@ public RecordMessageConverter getRecordMessageConverter() { return this.recordConverter; } + /** + * Set a {@link SmartMessageConverter} to convert the record value to + * the desired type. + * @param messagingConverter the converter. + * @since 3.3.11 + */ + public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) { + if (this.recordConverter instanceof MessagingMessageConverter messagingRecordConverter) { + messagingRecordConverter.setMessagingConverter(messagingConverter); + } + } + /** * Set to true to add the raw {@code List>} as a header * {@link KafkaHeaders#RAW_DATA}. @@ -275,13 +290,14 @@ protected Object extractAndConvertValue(ConsumerRecord record, Type type) * @param type the type - must be a {@link ParameterizedType} with a single generic * type parameter. * @param conversionFailures Conversion failures. - * @return the converted payload. + * @return the converted payload, potentially further processed by a {@link SmartMessageConverter}. */ protected @Nullable Object convert(ConsumerRecord record, Type type, List conversionFailures) { try { if (this.recordConverter != null) { + Type actualType = ((ParameterizedType) type).getActualTypeArguments()[0]; Object payload = this.recordConverter - .toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload(); + .toMessage(record, null, null, actualType).getPayload(); conversionFailures.add(null); return payload; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java new file mode 100644 index 0000000000..779c607a73 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java @@ -0,0 +1,257 @@ +/* + * Copyright 2016-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for SmartMessageConverter support in batch listeners. + * + * @author George Mahfoud + * @since 3.3.11 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(partitions = 1, topics = { "smartBatchTopic", "smartBatchTopic2" }) +class BatchSmartMessageConverterTests { + + @Autowired + private KafkaTemplate template; + + @Autowired + private Config config; + + @Test + void testContentTypeConverterWithBatchListener() throws Exception { + BatchListener listener = this.config.batchListener(); + listener.reset(2); + + this.template.send("smartBatchTopic", "hello".getBytes()); + this.template.send("smartBatchTopic", "world".getBytes()); + + assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.received).containsExactly("hello", "world"); + } + + @Test + void testMultipleListenersWithDifferentConverters() throws Exception { + BatchListener listener1 = this.config.batchListener(); + BatchListener2 listener2 = this.config.batchListener2(); + listener1.reset(1); + listener2.reset(1); + + String listener1Data = "listener1Data"; + String listener2Data = "listener2Data"; + this.template.send("smartBatchTopic", listener1Data.getBytes()); + this.template.send("smartBatchTopic2", listener2Data.getBytes()); + + assertThat(listener1.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(listener2.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(listener1.received).containsExactly(listener1Data); + assertThat(listener2.received).containsExactly(listener2Data.toUpperCase()); + } + + @Configuration + @EnableKafka + public static class Config { + + @Bean + public KafkaListenerContainerFactory kafkaListenerContainerFactory(EmbeddedKafkaBroker embeddedKafka) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(embeddedKafka)); + factory.setBatchListener(true); + return factory; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory(EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka)); + } + + @Bean + public Map consumerConfigs(EmbeddedKafkaBroker embeddedKafka) { + Map consumerProps = + KafkaTestUtils.consumerProps(embeddedKafka, "smartBatchGroup", false); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + return consumerProps; + } + + @Bean + public KafkaTemplate template(EmbeddedKafkaBroker embeddedKafka) { + return new KafkaTemplate<>(producerFactory(embeddedKafka)); + } + + @Bean + public ProducerFactory producerFactory(EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaProducerFactory<>(producerConfigs(embeddedKafka)); + } + + @Bean + public Map producerConfigs(EmbeddedKafkaBroker embeddedKafka) { + Map props = KafkaTestUtils.producerProps(embeddedKafka); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + return props; + } + + @Bean + public SmartMessageConverter byteArrayToStringConverter() { + return new ByteArrayConverter(bytes -> new String(bytes)); + } + + @Bean + public SmartMessageConverter byteArrayToUpperCaseConverter() { + return new ByteArrayConverter(bytes -> new String(bytes).toUpperCase()); + } + + @Bean + public BatchListener batchListener() { + return new BatchListener(); + } + + @Bean + public BatchListener2 batchListener2() { + return new BatchListener2(); + } + + } + + public static class BatchListener { + + private CountDownLatch latch = new CountDownLatch(2); + + private final List received = new ArrayList<>(); + + @KafkaListener( + id = "batchSmartListener", + topics = "smartBatchTopic", + groupId = "smartBatchGroup", + contentTypeConverter = "byteArrayToStringConverter", + batch = "true" + ) + public void listen(List messages) { + messages.forEach(message -> { + this.received.add(message); + this.latch.countDown(); + }); + } + + void reset(int expectedCount) { + this.received.clear(); + this.latch = new CountDownLatch(expectedCount); + } + + } + + public static class BatchListener2 { + + private CountDownLatch latch = new CountDownLatch(1); + + private final List received = new ArrayList<>(); + + @KafkaListener( + id = "batchSmartListener2", + topics = "smartBatchTopic2", + groupId = "smartBatchGroup2", + contentTypeConverter = "byteArrayToUpperCaseConverter", + batch = "true" + ) + public void listen(List messages) { + messages.forEach(message -> { + this.received.add(message); + this.latch.countDown(); + }); + } + + void reset(int expectedCount) { + this.received.clear(); + this.latch = new CountDownLatch(expectedCount); + } + + } + + /** + * Simple SmartMessageConverter for testing that converts byte[] to String using a function. + */ + static class ByteArrayConverter implements SmartMessageConverter { + + private final java.util.function.Function converter; + + ByteArrayConverter(java.util.function.Function converter) { + this.converter = converter; + } + + @Override + public Object fromMessage(Message message, Class targetClass) { + Object payload = message.getPayload(); + return (payload instanceof byte[] bytes) ? this.converter.apply(bytes) : payload; + } + + @Override + public Object fromMessage(Message message, Class targetClass, Object conversionHint) { + return fromMessage(message, targetClass); + } + + @Override + public Message toMessage(Object payload, MessageHeaders headers) { + return MessageBuilder.withPayload(payload).copyHeaders(headers).build(); + } + + @Override + public Message toMessage(Object payload, MessageHeaders headers, Object conversionHint) { + return toMessage(payload, headers); + } + + } +}