Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

Expand All @@ -55,6 +56,7 @@
* @author Venil Noronha
* @author Wang ZhiYang
* @author Sanghyeok An
* @author George Mahfoud
* @since 1.1
*/
public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
Expand Down Expand Up @@ -107,6 +109,29 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdap
this.batchToRecordAdapter = batchToRecordAdapter;
}

/**
* Set the {@link SmartMessageConverter} to use with the batch message converter.
* <p>
* When a {@code SmartMessageConverter} is configured via
* {@code @KafkaListener(contentTypeConverter = "...")}, this method ensures it is
* properly propagated to the batch converter's record converter for message conversion
* in batch listeners.
* <p>
* Uses the same validation as the parent class to prevent the paradox of choice:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "paradox of choice" is a bit abrupt to be used in the docs.
That is something we can say when we fight for the code in review, but that is not what suppose to go to the official technical documentation.
It is better to get rid off of this paragraph altogether, or at least say something like:

This method cannot be called after setBatchMessageConverter() as it causes a mutation of the internal batchMessageConverter.
Instead, the SmartMessageConverter has to be provided on the external BatchMessageConverter.

Might not the best my English, but this is a gist what I'd like to see as method Javadoc.

* not allowed when a custom {@link #setBatchMessageConverter(BatchMessageConverter)
* batchMessageConverter} is provided. Since {@link BatchMessagingMessageConverter} now
* always has a default {@link org.springframework.kafka.support.converter.MessagingMessageConverter},
* users can configure the converter via the annotation without needing to set it on the factory.
* @param messageConverter the converter to set
*/
@Override
public void setMessagingConverter(SmartMessageConverter messageConverter) {
super.setMessagingConverter(messageConverter);
if (this.batchMessageConverter instanceof BatchMessagingMessageConverter batchConverter) {
batchConverter.setMessagingConverter(messageConverter);
}
}

/**
* Return the {@link BatchMessagingMessageConverter} for this listener,
* being able to convert {@link org.springframework.messaging.Message}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;

/**
Expand All @@ -67,6 +68,7 @@
* @author Borahm Lee
* @author Artem Bilan
* @author Soby Chacko
* @author George Mahfoud
*
* @since 1.1
*/
Expand All @@ -87,10 +89,11 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter {
private boolean rawRecordHeader;

/**
* Create an instance that does not convert the record values.
* Create an instance with a default {@link MessagingMessageConverter} for record conversion.
* @since 3.3.11
*/
public BatchMessagingMessageConverter() {
this(null);
this(new MessagingMessageConverter());
}

/**
Expand Down Expand Up @@ -142,6 +145,18 @@ public RecordMessageConverter getRecordMessageConverter() {
return this.recordConverter;
}

/**
* Set a spring-messaging {@link SmartMessageConverter} to convert the record value to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "spring-messaging" phrase is redundant here.
It is obvious from the {@link SmartMessageConverter} what class are we talking about.
And for some people who think about Spring from the whole Spring Boot auto-configuration as a single entity it is confusing what is a messaging if we deal with Apache Kafka in this context.

* the desired type.
* @param messagingConverter the converter.
* @since 3.3.11
*/
public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) {
if (this.recordConverter instanceof MessagingMessageConverter messagingRecordConverter) {
messagingRecordConverter.setMessagingConverter(messagingConverter);
}
}

/**
* Set to true to add the raw {@code List<ConsumerRecord<?, ?>>} as a header
* {@link KafkaHeaders#RAW_DATA}.
Expand Down Expand Up @@ -275,13 +290,14 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
* @param type the type - must be a {@link ParameterizedType} with a single generic
* type parameter.
* @param conversionFailures Conversion failures.
* @return the converted payload.
* @return the converted payload, potentially further processed by a {@link SmartMessageConverter}.
*/
protected @Nullable Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
try {
if (this.recordConverter != null) {
Type actualType = ((ParameterizedType) type).getActualTypeArguments()[0];
Object payload = this.recordConverter
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
.toMessage(record, null, null, actualType).getPayload();
conversionFailures.add(null);
return payload;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Integration tests for SmartMessageConverter support in batch listeners.
* Reproduces and verifies the fix for the issue described in GH-4097.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I'm about to add new tests about smart conversion with the batch in the future?
Should I start a new test class since you have locked this into that specific issue?

I didn't mean to offend you.
Just questioning the reasoning behind issues numbers in the constantly evolving code.
😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, good point. I've removed the GH-4097 reference so the test class isn't locked to a specific issue. Future SmartMessageConverter tests can use this class without issue. Thanks for the catch.

*
* @author George Mahfoud
* @since 3.3.11
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = { "smartBatchTopic", "smartBatchTopic2" })
class BatchSmartMessageConverterTests {

@Autowired
private KafkaTemplate<Integer, byte[]> template;

@Autowired
private Config config;

@Test
void testContentTypeConverterWithBatchListener() throws Exception {
BatchListener listener = this.config.batchListener();
listener.reset(2);

this.template.send("smartBatchTopic", "hello".getBytes());
this.template.send("smartBatchTopic", "world".getBytes());

assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener.received).hasSize(2).containsExactly("hello", "world");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the containsExactly() API, the hasSize() is redundant.

}

@Test
void testMultipleListenersWithDifferentConverters() throws Exception {
BatchListener listener1 = this.config.batchListener();
BatchListener2 listener2 = this.config.batchListener2();
listener1.reset(1);
listener2.reset(1);

this.template.send("smartBatchTopic", "foo".getBytes());
this.template.send("smartBatchTopic2", "bar".getBytes());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have a neutral open source code, without foo/bar.
I understand that there is:

Cultural significance:
Their presence is a part of programming culture and history, signaling a common understanding among developers.

But that does not mean that our language should be not literary-correct to emphasize our diversion from the rest of the world.

Please, consider to use other words.
The listener1Data, listener2Data might be OK.

Thanks


assertThat(listener1.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener2.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener1.received).hasSize(1).containsExactly("foo");
assertThat(listener2.received).hasSize(1).containsExactly("BAR");
}

@Configuration
@EnableKafka
public static class Config {

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKafkaBroker embeddedKafka) {
ConcurrentKafkaListenerContainerFactory<Integer, byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(embeddedKafka));
factory.setBatchListener(true);
return factory;
}

@Bean
public DefaultKafkaConsumerFactory<Integer, byte[]> consumerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka));
}

@Bean
public Map<String, Object> consumerConfigs(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(embeddedKafka, "smartBatchGroup", false);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
return consumerProps;
}

@Bean
public KafkaTemplate<Integer, byte[]> template(EmbeddedKafkaBroker embeddedKafka) {
return new KafkaTemplate<>(producerFactory(embeddedKafka));
}

@Bean
public ProducerFactory<Integer, byte[]> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaProducerFactory<>(producerConfigs(embeddedKafka));
}

@Bean
public Map<String, Object> producerConfigs(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
return props;
}

@Bean
public SmartMessageConverter byteArrayToStringConverter() {
return new ByteArrayConverter(bytes -> new String(bytes));
}

@Bean
public SmartMessageConverter byteArrayToUpperCaseConverter() {
return new ByteArrayConverter(bytes -> new String(bytes).toUpperCase());
}

@Bean
public BatchListener batchListener() {
return new BatchListener();
}

@Bean
public BatchListener2 batchListener2() {
return new BatchListener2();
}

}

public static class BatchListener {

private CountDownLatch latch = new CountDownLatch(2);

private final List<String> received = new ArrayList<>();

@KafkaListener(
id = "batchSmartListener",
topics = "smartBatchTopic",
groupId = "smartBatchGroup",
contentTypeConverter = "byteArrayToStringConverter",
batch = "true"
)
public void listen(List<String> messages) {
messages.forEach(message -> {
this.received.add(message);
this.latch.countDown();
});
}

void reset(int expectedCount) {
this.received.clear();
this.latch = new CountDownLatch(expectedCount);
}

}

public static class BatchListener2 {

private CountDownLatch latch = new CountDownLatch(1);

private final List<String> received = new ArrayList<>();

@KafkaListener(
id = "batchSmartListener2",
topics = "smartBatchTopic2",
groupId = "smartBatchGroup2",
contentTypeConverter = "byteArrayToUpperCaseConverter",
batch = "true"
)
public void listen(List<String> messages) {
messages.forEach(message -> {
this.received.add(message);
this.latch.countDown();
});
}

void reset(int expectedCount) {
this.received.clear();
this.latch = new CountDownLatch(expectedCount);
}

}

/**
* Simple SmartMessageConverter for testing that converts byte[] to String using a function.
*/
static class ByteArrayConverter implements SmartMessageConverter {

private final java.util.function.Function<byte[], String> converter;

ByteArrayConverter(java.util.function.Function<byte[], String> converter) {
this.converter = converter;
}

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
Object payload = message.getPayload();
return (payload instanceof byte[] bytes) ? this.converter.apply(bytes) : payload;
}

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {
return fromMessage(message, targetClass);
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
return toMessage(payload, headers);
}

}
}