Skip to content

Commit 5716bf0

Browse files
filip-halembaartembilan
authored andcommitted
GH-1362: Add custom HandlerMethodArgumentResolvers
Fixes #1361 * Fix test * Change method name * Change field name * Checkstyle * Fixes after code reivew * Add comment * Fixes after code review * Added infoormation about the feature in the documentation * Small change in documentation * Fix documentation * Use only whitespaces in docs
1 parent a3de533 commit 5716bf0

File tree

4 files changed

+126
-28
lines changed

4 files changed

+126
-28
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,10 +78,7 @@
7878
import org.springframework.messaging.converter.GenericMessageConverter;
7979
import org.springframework.messaging.converter.MessageConverter;
8080
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
81-
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
82-
import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver;
8381
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
84-
import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver;
8582
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
8683
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
8784
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
@@ -116,6 +113,7 @@
116113
* @author Dariusz Szablinski
117114
* @author Venil Noronha
118115
* @author Dimitri Penner
116+
* @author Filip Halemba
119117
*
120118
* @see KafkaListener
121119
* @see KafkaListenerErrorHandler
@@ -806,32 +804,20 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
806804
defaultFactory.setValidator(validator);
807805
}
808806
defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
809-
810-
ConfigurableBeanFactory cbf =
811-
KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?
812-
(ConfigurableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory :
813-
null;
814-
815-
816807
this.defaultFormattingConversionService.addConverter(
817808
new BytesToStringConverter(KafkaListenerAnnotationBeanPostProcessor.this.charset));
818-
819809
defaultFactory.setConversionService(this.defaultFormattingConversionService);
810+
GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
811+
defaultFactory.setMessageConverter(messageConverter);
820812

821-
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
822-
823-
// Annotation-based argument resolution
824-
argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));
825-
argumentResolvers.add(new HeadersMethodArgumentResolver());
826-
827-
// Type-based argument resolution
828-
final GenericMessageConverter messageConverter =
829-
new GenericMessageConverter(this.defaultFormattingConversionService);
830-
argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
831-
argumentResolvers.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator));
832-
defaultFactory.setArgumentResolvers(argumentResolvers);
813+
List<HandlerMethodArgumentResolver> customArgumentsResolver =
814+
new ArrayList<>(KafkaListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
815+
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
816+
customArgumentsResolver.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator));
817+
defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);
833818

834819
defaultFactory.afterPropertiesSet();
820+
835821
return defaultFactory;
836822
}
837823

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,13 +17,17 @@
1717
package org.springframework.kafka.config;
1818

1919
import java.util.ArrayList;
20+
import java.util.Arrays;
21+
import java.util.Collections;
2022
import java.util.List;
2123

2224
import org.springframework.beans.factory.BeanFactory;
2325
import org.springframework.beans.factory.BeanFactoryAware;
2426
import org.springframework.beans.factory.InitializingBean;
27+
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
2528
import org.springframework.lang.Nullable;
2629
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
30+
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
2731
import org.springframework.util.Assert;
2832
import org.springframework.validation.Validator;
2933

@@ -35,13 +39,16 @@
3539
* @author Juergen Hoeller
3640
* @author Artem Bilan
3741
* @author Gary Russell
42+
* @author Filip Halemba
3843
*
3944
* @see org.springframework.kafka.annotation.KafkaListenerConfigurer
4045
*/
4146
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
4247

4348
private final List<KafkaListenerEndpointDescriptor> endpointDescriptors = new ArrayList<>();
4449

50+
private List<HandlerMethodArgumentResolver> customMethodArgumentResolvers = new ArrayList<>();
51+
4552
private KafkaListenerEndpointRegistry endpointRegistry;
4653

4754
private MessageHandlerMethodFactory messageHandlerMethodFactory;
@@ -74,6 +81,25 @@ public KafkaListenerEndpointRegistry getEndpointRegistry() {
7481
return this.endpointRegistry;
7582
}
7683

84+
/**
85+
* Return the list of {@link HandlerMethodArgumentResolver}.
86+
* @return the list of {@link HandlerMethodArgumentResolver}.
87+
* @since 2.4.2
88+
*/
89+
public List<HandlerMethodArgumentResolver> getCustomMethodArgumentResolvers() {
90+
return Collections.unmodifiableList(this.customMethodArgumentResolvers);
91+
}
92+
93+
/**
94+
* Add custom methods arguments resolvers to {@link KafkaListenerAnnotationBeanPostProcessor}
95+
* Default empty list.
96+
* @param methodArgumentResolvers the methodArgumentResolvers to assign.
97+
* @since 2.4.2
98+
*/
99+
public void setCustomMethodArgumentResolvers(HandlerMethodArgumentResolver... methodArgumentResolvers) {
100+
this.customMethodArgumentResolvers = Arrays.asList(methodArgumentResolvers);
101+
}
102+
77103
/**
78104
* Set the {@link MessageHandlerMethodFactory} to use to configure the message
79105
* listener responsible to serve an endpoint detected by this processor.

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -68,6 +68,7 @@
6868
import org.springframework.context.annotation.Primary;
6969
import org.springframework.context.event.EventListener;
7070
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
71+
import org.springframework.core.MethodParameter;
7172
import org.springframework.core.convert.converter.Converter;
7273
import org.springframework.data.web.JsonPath;
7374
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@@ -121,6 +122,7 @@
121122
import org.springframework.messaging.handler.annotation.Header;
122123
import org.springframework.messaging.handler.annotation.Payload;
123124
import org.springframework.messaging.handler.annotation.SendTo;
125+
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
124126
import org.springframework.messaging.support.GenericMessage;
125127
import org.springframework.messaging.support.MessageBuilder;
126128
import org.springframework.retry.support.RetryTemplate;
@@ -156,7 +158,7 @@
156158
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
157159
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
158160
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
159-
"annotated38", "annotated38reply" })
161+
"annotated38", "annotated38reply", "annotated39"})
160162
public class EnableKafkaIntegrationTests {
161163

162164
private static final String DEFAULT_TEST_GROUP_ID = "testAnnot";
@@ -841,6 +843,14 @@ public void testReplyingBatchListenerReturnCollection() {
841843
consumer.close();
842844
}
843845

846+
@Test
847+
public void testCustomMethodArgumentResovlerListener() throws InterruptedException {
848+
template.send("annotated39", "foo");
849+
assertThat(this.listener.customMethodArgumentResolverLatch.await(30, TimeUnit.SECONDS)).isTrue();
850+
assertThat(this.listener.customMethodArgument.body).isEqualTo("foo");
851+
assertThat(this.listener.customMethodArgument.topic).isEqualTo("annotated39");
852+
}
853+
844854
@Configuration
845855
@EnableKafka
846856
@EnableTransactionManagement(proxyTargetClass = true)
@@ -1466,8 +1476,26 @@ public boolean supports(Class<?> clazz) {
14661476
}
14671477

14681478
});
1469-
}
1479+
registrar.setCustomMethodArgumentResolvers(
1480+
new HandlerMethodArgumentResolver() {
1481+
1482+
@Override
1483+
public boolean supportsParameter(MethodParameter parameter) {
1484+
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
1485+
}
1486+
1487+
@Override
1488+
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
1489+
return new CustomMethodArgument(
1490+
(String) message.getPayload(),
1491+
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
1492+
);
1493+
}
1494+
1495+
}
1496+
);
14701497

1498+
}
14711499

14721500
@Bean
14731501
public KafkaListenerErrorHandler consumeMultiMethodException(MultiListenerBean listener) {
@@ -1540,6 +1568,8 @@ static class Listener implements ConsumerSeekAware {
15401568

15411569
final CountDownLatch projectionLatch = new CountDownLatch(1);
15421570

1571+
final CountDownLatch customMethodArgumentResolverLatch = new CountDownLatch(1);
1572+
15431573
volatile Integer partition;
15441574

15451575
volatile ConsumerRecord<?, ?> capturedRecord;
@@ -1584,6 +1614,8 @@ static class Listener implements ConsumerSeekAware {
15841614

15851615
volatile String name;
15861616

1617+
volatile CustomMethodArgument customMethodArgument;
1618+
15871619
@KafkaListener(id = "manualStart", topics = "manualStart",
15881620
containerFactory = "kafkaAutoStartFalseListenerContainerFactory")
15891621
public void manualStart(String foo) {
@@ -1876,6 +1908,12 @@ public void projectionListener(ProjectionSample sample) {
18761908
this.projectionLatch.countDown();
18771909
}
18781910

1911+
@KafkaListener(id = "customMethodArgumentResolver", topics = "annotated39")
1912+
public void customMethodArgumentResolverListener(String data, CustomMethodArgument customMethodArgument) {
1913+
this.customMethodArgument = customMethodArgument;
1914+
this.customMethodArgumentResolverLatch.countDown();
1915+
}
1916+
18791917
@Override
18801918
public void registerSeekCallback(ConsumerSeekCallback callback) {
18811919
this.seekCallBack.set(callback);
@@ -2192,5 +2230,18 @@ interface ProjectionSample {
21922230

21932231
}
21942232

2233+
static class CustomMethodArgument {
2234+
2235+
final String body;
2236+
2237+
final String topic;
2238+
2239+
CustomMethodArgument(String body, String topic) {
2240+
this.body = body;
2241+
this.topic = topic;
2242+
}
2243+
2244+
}
2245+
21952246

21962247
}

src/reference/asciidoc/kafka.adoc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3157,6 +3157,41 @@ This lets you further customize listener deserialization without changing the de
31573157

31583158
IMPORTANT: Setting a custom `MessageHandlerMethodFactory` on the `KafkaListenerEndpointRegistrar` through a `KafkaListenerConfigurer` bean disables this feature.
31593159

3160+
===== Adding custom `HandlerMethodArgumentResolver` to `@KafkaListener`
3161+
3162+
Starting with version 2.4.2 you are able to add your own `HandlerMethodArgumentResolver` and resolve custom method parameters.
3163+
All you need is to implement `KafkaListenerConfigurer` and use method `setCustomMethodArgumentResolvers()` from class `KafkaListenerEndpointRegistrar`.
3164+
3165+
====
3166+
[source, java]
3167+
----
3168+
@Configuration
3169+
class CustomKafkaConfig implements KafkaListenerConfigurer {
3170+
3171+
@Override
3172+
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
3173+
registrar.setCustomMethodArgumentResolvers(
3174+
new HandlerMethodArgumentResolver() {
3175+
3176+
@Override
3177+
public boolean supportsParameter(MethodParameter parameter) {
3178+
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
3179+
}
3180+
3181+
@Override
3182+
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
3183+
return new CustomMethodArgument(
3184+
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
3185+
);
3186+
}
3187+
}
3188+
);
3189+
}
3190+
3191+
}
3192+
----
3193+
====
3194+
31603195
[[headers]]
31613196
==== Message Headers
31623197

0 commit comments

Comments
 (0)