Skip to content

Commit da9dc28

Browse files
garyrussellartembilan
authored andcommitted
GH-1225: @kl - skip conversion when not needed
Resolves #1225 When no argument is sourced from a `Message<?>` (no `@Headers`, `@Payload`, etc - only `ConsumerRecord`, `Acknowledgment` or `Consumer`) then don't convert. * Fix logging - don't use supplier with is...().
1 parent f86110e commit da9dc28

File tree

4 files changed

+101
-32
lines changed

4 files changed

+101
-32
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
2929
import org.springframework.kafka.listener.ListenerExecutionFailedException;
3030
import org.springframework.kafka.support.Acknowledgment;
31-
import org.springframework.kafka.support.KafkaNull;
3231
import org.springframework.kafka.support.converter.BatchMessageConverter;
3332
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
3433
import org.springframework.messaging.Message;
@@ -59,8 +58,6 @@
5958
public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
6059
implements BatchAcknowledgingConsumerAwareMessageListener<K, V> {
6160

62-
private static final Message<KafkaNull> NULL_MESSAGE = new GenericMessage<>(KafkaNull.INSTANCE);
63-
6461
private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
6562

6663
private KafkaListenerErrorHandler errorHandler;

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.kafka.listener.ListenerExecutionFailedException;
5050
import org.springframework.kafka.support.Acknowledgment;
5151
import org.springframework.kafka.support.KafkaHeaders;
52+
import org.springframework.kafka.support.KafkaNull;
5253
import org.springframework.kafka.support.KafkaUtils;
5354
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5455
import org.springframework.kafka.support.converter.RecordMessageConverter;
@@ -59,6 +60,7 @@
5960
import org.springframework.messaging.converter.MessageConversionException;
6061
import org.springframework.messaging.handler.annotation.Header;
6162
import org.springframework.messaging.handler.annotation.Payload;
63+
import org.springframework.messaging.support.GenericMessage;
6264
import org.springframework.messaging.support.MessageBuilder;
6365
import org.springframework.util.Assert;
6466
import org.springframework.util.ObjectUtils;
@@ -83,6 +85,11 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
8385

8486
private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
8587

88+
/**
89+
* Message used when no conversion is needed.
90+
*/
91+
protected static final Message<KafkaNull> NULL_MESSAGE = new GenericMessage<>(KafkaNull.INSTANCE); // NOSONAR
92+
8693
private final Object bean;
8794

8895
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR
@@ -99,6 +106,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
99106

100107
private boolean isMessageList;
101108

109+
private boolean conversionNeeded = true;
110+
102111
private RecordMessageConverter messageConverter = new MessagingMessageConverter();
103112

104113
private Type fallbackType = Object.class;
@@ -174,6 +183,10 @@ public boolean isConsumerRecords() {
174183
return this.isConsumerRecords;
175184
}
176185

186+
public boolean isConversionNeeded() {
187+
return this.conversionNeeded;
188+
}
189+
177190
/**
178191
* Set the topic to which to send any result from the method invocation.
179192
* May be a SpEL expression {@code !{...}} evaluated at runtime.
@@ -481,14 +494,26 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
481494

482495
Type genericParameterType = null;
483496
int allowedBatchParameters = 1;
497+
int notConvertibleParameters = 0;
484498

485499
for (int i = 0; i < method.getParameterCount(); i++) {
486500
MethodParameter methodParameter = new MethodParameter(method, i);
487501
/*
488502
* We're looking for a single non-annotated parameter, or one annotated with @Payload.
489-
* We ignore parameters with type Message because they are not involved with conversion.
503+
* We ignore parameters with type Message, Consumer, Ack, ConsumerRecord because they
504+
* are not involved with conversion.
490505
*/
491-
if (eligibleParameter(methodParameter)
506+
Type parameterType = methodParameter.getGenericParameterType();
507+
boolean isNotConvertible = parameterIsType(parameterType, ConsumerRecord.class);
508+
boolean isAck = parameterIsType(parameterType, Acknowledgment.class);
509+
this.hasAckParameter |= isAck;
510+
isNotConvertible |= isAck;
511+
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
512+
isNotConvertible |= isConsumer;
513+
if (isNotConvertible) {
514+
notConvertibleParameters++;
515+
}
516+
if (!isNotConvertible && !isMessageWithNoTypeInfo(parameterType)
492517
&& (methodParameter.getParameterAnnotations().length == 0
493518
|| methodParameter.hasParameterAnnotation(Payload.class))) {
494519
if (genericParameterType == null) {
@@ -500,8 +525,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
500525
break;
501526
}
502527
}
503-
else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class)) {
504-
this.hasAckParameter = true;
528+
else if (isAck) {
505529
allowedBatchParameters++;
506530
}
507531
else if (methodParameter.hasParameterAnnotation(Header.class)) {
@@ -511,11 +535,10 @@ else if (methodParameter.hasParameterAnnotation(Header.class)) {
511535
}
512536
}
513537
else {
514-
if (methodParameter.getGenericParameterType().equals(Consumer.class)) {
538+
if (isConsumer) {
515539
allowedBatchParameters++;
516540
}
517541
else {
518-
Type parameterType = methodParameter.getGenericParameterType();
519542
if (parameterType instanceof ParameterizedType
520543
&& ((ParameterizedType) parameterType).getRawType().equals(Consumer.class)) {
521544
allowedBatchParameters++;
@@ -524,6 +547,9 @@ else if (methodParameter.hasParameterAnnotation(Header.class)) {
524547
}
525548
}
526549

550+
if (notConvertibleParameters == method.getParameterCount()) {
551+
this.conversionNeeded = false;
552+
}
527553
boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters;
528554

529555
if (!validParametersForBatch) {
@@ -587,27 +613,26 @@ private boolean isWildCardWithUpperBound(Type paramType) {
587613
&& ((WildcardType) paramType).getUpperBounds().length > 0;
588614
}
589615

590-
/*
591-
* Don't consider parameter types that are available after conversion.
592-
* Acknowledgment, ConsumerRecord, Consumer, ConsumerRecord<...>, Consumer<...>, and Message<?>.
593-
*/
594-
private boolean eligibleParameter(MethodParameter methodParameter) {
595-
Type parameterType = methodParameter.getGenericParameterType();
596-
if (parameterType.equals(Acknowledgment.class) || parameterType.equals(ConsumerRecord.class)
597-
|| parameterType.equals(Consumer.class)) {
598-
return false;
599-
}
616+
private boolean isMessageWithNoTypeInfo(Type parameterType) {
600617
if (parameterType instanceof ParameterizedType) {
601618
ParameterizedType parameterizedType = (ParameterizedType) parameterType;
602619
Type rawType = parameterizedType.getRawType();
603-
if (rawType.equals(ConsumerRecord.class) || rawType.equals(Consumer.class)) {
604-
return false;
620+
if (rawType.equals(Message.class)) {
621+
return parameterizedType.getActualTypeArguments()[0] instanceof WildcardType;
605622
}
606-
else if (rawType.equals(Message.class)) {
607-
return !(parameterizedType.getActualTypeArguments()[0] instanceof WildcardType);
623+
}
624+
return parameterType.equals(Message.class); // could be Message without a generic type
625+
}
626+
627+
private boolean parameterIsType(Type parameterType, Type type) {
628+
if (parameterType instanceof ParameterizedType) {
629+
ParameterizedType parameterizedType = (ParameterizedType) parameterType;
630+
Type rawType = parameterizedType.getRawType();
631+
if (rawType.equals(type)) {
632+
return true;
608633
}
609634
}
610-
return !parameterType.equals(Message.class); // could be Message without a generic type
635+
return parameterType.equals(type);
611636
}
612637

613638
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.kafka.listener.ListenerExecutionFailedException;
2727
import org.springframework.kafka.support.Acknowledgment;
2828
import org.springframework.messaging.Message;
29+
import org.springframework.messaging.support.GenericMessage;
2930

3031

3132
/**
@@ -71,8 +72,16 @@ public RecordMessagingMessageListenerAdapter(Object bean, Method method, KafkaLi
7172
*/
7273
@Override
7374
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
74-
Message<?> message = toMessagingMessage(record, acknowledgment, consumer);
75-
logger.debug(() -> "Processing [" + message + "]");
75+
Message<?> message;
76+
if (isConversionNeeded()) {
77+
message = toMessagingMessage(record, acknowledgment, consumer);
78+
}
79+
else {
80+
message = NULL_MESSAGE;
81+
}
82+
if (logger.isDebugEnabled()) {
83+
logger.debug("Processing [" + message + "]");
84+
}
7685
try {
7786
Object result = invokeHandler(record, acknowledgment, message, consumer);
7887
if (result != null) {
@@ -82,6 +91,9 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
8291
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
8392
if (this.errorHandler != null) {
8493
try {
94+
if (message.equals(NULL_MESSAGE)) {
95+
message = new GenericMessage<>(record);
96+
}
8597
Object result = this.errorHandler.handleError(message, e, consumer);
8698
if (result != null) {
8799
handleResult(result, record, message);

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.mockito.Mockito.mock;
2828
import static org.mockito.Mockito.spy;
2929

30+
import java.lang.reflect.Type;
3031
import java.util.Collection;
3132
import java.util.Collections;
3233
import java.util.HashMap;
@@ -53,6 +54,7 @@
5354
import org.apache.kafka.clients.consumer.ConsumerRecords;
5455
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
5556
import org.apache.kafka.clients.producer.ProducerConfig;
57+
import org.apache.kafka.clients.producer.ProducerRecord;
5658
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
5759
import org.apache.kafka.common.serialization.ByteArraySerializer;
5860
import org.junit.jupiter.api.Test;
@@ -102,6 +104,7 @@
102104
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper.TypePrecedence;
103105
import org.springframework.kafka.support.converter.JsonMessageConverter;
104106
import org.springframework.kafka.support.converter.ProjectingMessageConverter;
107+
import org.springframework.kafka.support.converter.RecordMessageConverter;
105108
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
106109
import org.springframework.kafka.test.EmbeddedKafkaBroker;
107110
import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -246,13 +249,13 @@ public void testSimple() throws Exception {
246249
template.send("annotated3", 0, "foo");
247250
template.flush();
248251
assertThat(this.listener.latch3.await(60, TimeUnit.SECONDS)).isTrue();
249-
assertThat(this.listener.record.value()).isEqualTo("foo");
252+
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
250253
assertThat(this.config.listen3Exception).isNotNull();
251254

252255
template.send("annotated4", 0, "foo");
253256
template.flush();
254257
assertThat(this.listener.latch4.await(60, TimeUnit.SECONDS)).isTrue();
255-
assertThat(this.listener.record.value()).isEqualTo("foo");
258+
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
256259
assertThat(this.listener.ack).isNotNull();
257260
assertThat(this.listener.eventLatch.await(60, TimeUnit.SECONDS)).isTrue();
258261
assertThat(this.listener.event.getListenerId().startsWith("qux-"));
@@ -847,6 +850,37 @@ public ChainedKafkaTransactionManager<Integer, String> cktm() {
847850
return factory;
848851
}
849852

853+
@Bean
854+
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
855+
factoryWithBadConverter() {
856+
857+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
858+
new ConcurrentKafkaListenerContainerFactory<>();
859+
factory.setConsumerFactory(consumerFactory());
860+
factory.setRecordFilterStrategy(recordFilter());
861+
factory.setReplyTemplate(partitionZeroReplyingTemplate());
862+
factory.setErrorHandler((ConsumerAwareErrorHandler) (t, d, c) -> {
863+
this.globalErrorThrowable = t;
864+
c.seek(new org.apache.kafka.common.TopicPartition(d.topic(), d.partition()), d.offset());
865+
});
866+
factory.getContainerProperties().setMicrometerTags(Collections.singletonMap("extraTag", "foo"));
867+
factory.setMessageConverter(new RecordMessageConverter() {
868+
869+
@Override
870+
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
871+
Consumer<?, ?> consumer, Type payloadType) {
872+
873+
throw new UnsupportedOperationException();
874+
}
875+
876+
@Override
877+
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
878+
throw new UnsupportedOperationException(); }
879+
880+
});
881+
return factory;
882+
}
883+
850884
@Bean
851885
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
852886
withNoReplyTemplateContainerFactory() {
@@ -1425,7 +1459,7 @@ static class Listener implements ConsumerSeekAware {
14251459

14261460
volatile Integer partition;
14271461

1428-
volatile ConsumerRecord<?, ?> record;
1462+
volatile ConsumerRecord<?, ?> capturedRecord;
14291463

14301464
volatile Acknowledgment ack;
14311465

@@ -1502,12 +1536,13 @@ public void listen2(@Payload String foo,
15021536
private final AtomicBoolean reposition3 = new AtomicBoolean();
15031537

15041538
@KafkaListener(id = "baz", topicPartitions = @TopicPartition(topic = "${topicThree:annotated3}",
1505-
partitions = "${zero:0}"), errorHandler = "listen3ErrorHandler")
1539+
partitions = "${zero:0}"), errorHandler = "listen3ErrorHandler",
1540+
containerFactory = "factoryWithBadConverter")
15061541
public void listen3(ConsumerRecord<?, ?> record) {
15071542
if (this.reposition3.compareAndSet(false, true)) {
15081543
throw new RuntimeException("reposition");
15091544
}
1510-
this.record = record;
1545+
this.capturedRecord = record;
15111546
this.latch3.countDown();
15121547
}
15131548

@@ -1537,7 +1572,7 @@ public void eventHandler(ListenerContainerIdleEvent event) {
15371572
relativeToCurrent = "${zzz:true}"))
15381573
}, clientIdPrefix = "${foo.xxx:clientIdViaAnnotation}")
15391574
public void listen5(ConsumerRecord<?, ?> record) {
1540-
this.record = record;
1575+
this.capturedRecord = record;
15411576
this.latch5.countDown();
15421577
}
15431578

0 commit comments

Comments
 (0)