Skip to content

Commit a7f8b22

Browse files
Resolve listener payload type during container setup (#1548)
Resolve payload types during container initialization by analyzing @SqsListener method signatures, removing the need for type headers by default. With this change, the resolved payload type is available earlier in the flow, and components such as MessageInterceptors, ErrorHandlers, and AcknowledgementResultCallback receive the deserialized payload even when no type header is present. This removes coupling to producer-specific type names (which often differ across services) and reduces risk from trusting type headers. Supported patterns: - Simple types: MyEvent - Generic types: List<MyEvent>, Message<MyEvent>, List<Message<MyEvent>> - Explicit @payload annotations The SqsTemplate JavaType header is deprecated and ignored by default, with documented options to restore the previous behavior. Polymorphic payloads (interfaces, Object, @SqsHandler) require configuring a custom payloadTypeMapper in SqsMessagingMessageConverter for type resolution. Fixes #1546
1 parent 6ab5e41 commit a7f8b22

18 files changed

+2313
-36
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 113 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,18 @@ public class MyListener {
660660

661661
The `isDefault = true` parameter designates a method as the fallback handler for messages that don't match any other handler's parameter type.
662662

663+
To determine which handler method to invoke, the framework needs to know the payload type before deserialization.
664+
A custom `PayloadTypeMapper` should be configured to map incoming messages to their concrete types.
665+
See <<Custom Payload Type Mapping>> for an example.
666+
667+
[NOTE]
668+
====
669+
Since 4.0.0, the default type header sent by `SqsTemplate` is deprecated and is no longer used to deserialize the payload.
670+
A custom mapper is necessary to disambiguate between different payload types in multi-handler listeners.
671+
Alternatively, automatic inference can be disabled to restore header-based type resolution.
672+
See <<Automatic Payload Type Inference>> for more information.
673+
====
674+
663675
===== SNS Messages
664676

665677
Since 3.1.1, when receiving SNS messages through the `@SqsListener`, the message includes all attributes of the `SnsNotification`. To only receive need the `Message` part of the payload, you can utilize the `@SnsNotificationMessage` annotation.
@@ -1609,16 +1621,55 @@ NOTE: When using Spring Boot's auto-configuration, if there's a single `ObjectMa
16091621
This includes the one provided by Spring Boot's auto-configuration itself.
16101622
For configuring a different `ObjectMapper`, see <<Global Configuration for @SqsListeners>>.
16111623

1612-
For manually created `MessageListeners`, `MessageInterceptor` and `ErrorHandler` components, or more fine-grained conversion such as using `interfaces` or `inheritance` in listener methods, type mapping is required for payload deserialization.
1624+
==== Automatic Payload Type Inference
1625+
1626+
Since 4.0.0, by default, the framework automatically infers the payload type from the `@SqsListener` method signature at the `MessageSource` level.
1627+
This allows payloads to be deserialized early in the message processing flow without requiring type information in message headers.
1628+
1629+
This enables accessing the deserialized payload in components such as `MessageInterceptor`, `ErrorHandler`, and `AcknowledgementResultCallback` without type headers.
1630+
1631+
The inference supports simple types, generic types like `List<MyEvent>`, `Message<MyEvent>`, and `List<Message<MyEvent>>`.
1632+
Parameters annotated with `@Payload` are explicitly recognized as the payload parameter.
1633+
1634+
For polymorphic types (interfaces, `Object`, or `@SqsHandler` methods), a custom mapper is required.
1635+
See <<Custom Payload Type Mapping>>.
1636+
1637+
[NOTE]
1638+
====
1639+
The `JavaType` header automatically set by `SqsTemplate` is deprecated since 4.0.0 and will be removed in a future version.
1640+
When automatic payload type inference is enabled, the `JavaType` header is ignored by default.
1641+
For backward compatibility, `SqsTemplate` still sends this header, allowing applications to roll back if needed.
1642+
====
1643+
1644+
==== Customizing Type Resolution
1645+
1646+
Type resolution can be customized at two levels:
16131647

1614-
By default, the framework looks for a `MessageHeader` named `Sqs_MA_JavaType` containing the fully qualified class name (`FQCN`) for which the payload should be deserialized to.
1615-
If such header is found, the message is automatically deserialized to the provided class.
1648+
*Per-converter:* Configure `setPayloadTypeMapper` or `setPayloadTypeHeader` on a `MessagingMessageConverter`.
1649+
Any custom type mapper takes precedence over automatic inference for that converter.
1650+
Note that, by default, all containers share the same SqsMessagingMessageConverter instance.
16161651

1617-
Further configuration can be achieved by providing a configured `MessagingMessageConverter` instance in the `SqsContainerOptions`.
1652+
*Framework-wide:* Provide a custom `MethodPayloadTypeInferrer` implementation via a `SqsListenerConfigurer`.
1653+
See <<Global Configuration for @SqsListeners>>.
16181654

1619-
NOTE: If type mapping is setup or type information is added to the headers, payloads are deserialized right after the message is polled.
1620-
Otherwise, for `@SqsListener` annotated methods, payloads are deserialized right before the message is sent to the listener.
1621-
For providing custom `MessageConverter` instances to be used by `@SqsListener` methods, see <<Global Configuration for @SqsListeners>>
1655+
==== Disabling Automatic Inference
1656+
1657+
*Per-converter:* To restore header-based type mapping for specific converters, call `setPayloadTypeHeader` on the converter:
1658+
1659+
[source, java]
1660+
----
1661+
converter.setPayloadTypeHeader(SqsHeaders.SQS_DEFAULT_TYPE_HEADER);
1662+
----
1663+
1664+
*Framework-wide:* To disable automatic inference globally, set `methodPayloadTypeInferrer` to `null` via a `SqsListenerConfigurer`:
1665+
1666+
[source, java]
1667+
----
1668+
@Bean
1669+
SqsListenerConfigurer sqsListenerConfigurer() {
1670+
return registrar -> registrar.setMethodPayloadTypeInferrer(null);
1671+
}
1672+
----
16221673

16231674
==== Configuring a MessagingMessageConverter
16241675

@@ -1677,41 +1728,72 @@ messageConverter.setPayloadMessageConverter(payloadConverter);
16771728
factory.configure(options -> options.messageConverter(messageConverter));
16781729
----
16791730

1680-
==== Interfaces and Subclasses in Listener Methods
1731+
==== Custom Payload Type Mapping
16811732

1682-
Interfaces and subclasses can be used in `@SqsListener` annotated methods by configuring a `type mapper`:
1733+
When determining the payload type based on message content is necessary, a custom `payloadTypeMapper` can be configured:
16831734

16841735
[source, java]
16851736
----
1686-
messageConverter.setPayloadTypeMapper(message -> {
1687-
String eventTypeHeader = message.getHeaders().get("myEventTypeHeader", String.class);
1688-
return "eventTypeA".equals(eventTypeHeader)
1689-
? MyTypeA.class
1690-
: MyTypeB.class;
1691-
});
1737+
@Bean
1738+
SqsMessagingMessageConverter messageConverter() {
1739+
var converter = new SqsMessagingMessageConverter();
1740+
converter.setPayloadTypeMapper(message -> {
1741+
String payload = (String) message.getPayload();
1742+
if (payload.contains("\"type\":\"OrderEvent\"")) {
1743+
return OrderEvent.class;
1744+
}
1745+
if (payload.contains("\"type\":\"PaymentEvent\"")) {
1746+
return PaymentEvent.class;
1747+
}
1748+
return null;
1749+
});
1750+
return converter;
1751+
}
16921752
----
16931753

1694-
And then, in the listener method:
1754+
This enables using interfaces or subclasses in listener methods:
16951755

16961756
[source, java]
16971757
----
1698-
@SpringBootApplication
1699-
public class SqsApplication {
1758+
public interface DomainEvent {}
1759+
public record OrderEvent(String orderId, String status) implements DomainEvent {}
1760+
public record PaymentEvent(String paymentId, BigDecimal amount) implements DomainEvent {}
17001761
1701-
public static void main(String[] args) {
1702-
SpringApplication.run(SqsApplication.class, args);
1762+
@SqsListener("events-queue")
1763+
void handleEvent(DomainEvent event) {
1764+
// event will be the correct concrete type based on the mapper
1765+
}
1766+
----
1767+
1768+
The same configuration enables routing to different `@SqsHandler` methods:
1769+
1770+
[source, java]
1771+
----
1772+
@SqsListener("events-queue")
1773+
public class EventListener {
1774+
1775+
@SqsHandler
1776+
void handle(OrderEvent event) {
1777+
// handles OrderEvent
17031778
}
17041779
1705-
// Retrieve the converted payload
1706-
@SqsListener("myQueue")
1707-
public void listen(MyInterface message) {
1708-
System.out.println(message);
1780+
@SqsHandler
1781+
void handle(PaymentEvent event) {
1782+
// handles PaymentEvent
17091783
}
1784+
}
1785+
----
17101786

1711-
// Or retrieve a Message with the converted payload
1712-
@SqsListener("myOtherQueue")
1713-
public void listen(Message<MyInterface> message) {
1714-
System.out.println(message);
1787+
Or when using `Object` as the parameter type:
1788+
1789+
[source, java]
1790+
----
1791+
@SqsListener("events-queue")
1792+
void handleEvent(Object event) {
1793+
if (event instanceof OrderEvent orderEvent) {
1794+
// handle order
1795+
} else if (event instanceof PaymentEvent paymentEvent) {
1796+
// handle payment
17151797
}
17161798
}
17171799
----
@@ -1933,6 +2015,9 @@ By default, `StringMessageConverter`, `SimpleMessageConverter` and `MappingJacks
19332015

19342016
- `manageArgumentResolvers` - gives access to the list of argument resolvers that will be used to resolve the listener method arguments.
19352017
The order of resolvers is important - `PayloadMethodArgumentResolver` should generally be last since it's used as default.
2018+
- `setMethodPayloadTypeInferrer` - set the `MethodPayloadTypeInferrer` instance to be used for automatic payload type inference.
2019+
Set to `null` to disable automatic inference and rely on header-based type mapping.
2020+
See <<Automatic Payload Type Inference>> for more information.
19362021

19372022
A simple example would be:
19382023

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/ConfigUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,12 @@ public <T> ConfigUtils acceptMany(Collection<T> values, Consumer<T> consumer) {
9292
return this;
9393
}
9494

95+
public <T, V> ConfigUtils acceptManyIfNotNullAndInstance(@Nullable T value, Collection<?> values, Class<V> clazz,
96+
BiConsumer<T, V> consumer) {
97+
if (value != null) {
98+
values.forEach(v -> acceptIfInstance(v, clazz, instance -> consumer.accept(value, instance)));
99+
}
100+
return this;
101+
}
102+
95103
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/AbstractListenerAnnotationBeanPostProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ public abstract class AbstractListenerAnnotationBeanPostProcessor<A extends Anno
9999
@Nullable
100100
private BeanExpressionContext expressionContext;
101101

102+
private final List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
103+
102104
@Override
103105
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
104106
Class<?> targetClass = AopUtils.getTargetClass(bean);
@@ -155,6 +157,7 @@ private Endpoint createAndConfigureEndpoint(Object bean, Method method, A annota
155157
hme.setBean(bean);
156158
hme.setMethod(method);
157159
hme.setHandlerMethodFactory(this.delegatingHandlerMethodFactory);
160+
hme.setArgumentResolvers(this.argumentResolvers);
158161
});
159162
return endpoint;
160163
}
@@ -320,6 +323,7 @@ protected void configureDefaultHandlerMethodFactory(DefaultMessageHandlerMethodF
320323
this.endpointRegistrar.getMethodArgumentResolversConsumer().accept(methodArgumentResolvers);
321324
handlerMethodFactory.setArgumentResolvers(methodArgumentResolvers);
322325
handlerMethodFactory.afterPropertiesSet();
326+
this.argumentResolvers.addAll(methodArgumentResolvers);
323327
}
324328

325329
protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentResolvers(

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.awspring.cloud.sqs.config;
1717

18+
import io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer;
1819
import io.awspring.cloud.sqs.listener.AsyncMessageListener;
1920
import io.awspring.cloud.sqs.listener.BatchVisibility;
2021
import io.awspring.cloud.sqs.listener.ListenerMode;
@@ -23,6 +24,7 @@
2324
import io.awspring.cloud.sqs.listener.acknowledgement.BatchAcknowledgement;
2425
import io.awspring.cloud.sqs.listener.adapter.AsyncMessagingMessageListenerAdapter;
2526
import io.awspring.cloud.sqs.listener.adapter.MessagingMessageListenerAdapter;
27+
import io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter;
2628
import java.lang.reflect.Method;
2729
import java.util.Collection;
2830
import java.util.List;
@@ -35,6 +37,7 @@
3537
import org.springframework.core.MethodParameter;
3638
import org.springframework.lang.Nullable;
3739
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
40+
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
3841
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
3942
import org.springframework.util.Assert;
4043

@@ -59,6 +62,12 @@ public abstract class AbstractEndpoint implements HandlerMethodEndpoint {
5962

6063
private MessageHandlerMethodFactory handlerMethodFactory;
6164

65+
@Nullable
66+
private MethodPayloadTypeInferrer methodPayloadTypeInferrer;
67+
68+
@Nullable
69+
private List<HandlerMethodArgumentResolver> argumentResolvers;
70+
6271
protected AbstractEndpoint(Collection<String> queueNames, @Nullable String listenerContainerFactoryName,
6372
String id) {
6473
Assert.notEmpty(queueNames, "queueNames cannot be empty.");
@@ -127,6 +136,26 @@ public MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
127136
return this.handlerMethodFactory;
128137
}
129138

139+
/**
140+
* Set the {@link MethodPayloadTypeInferrer} to be used for inferring payload types from method signatures.
141+
*
142+
* @param inferrer the inferrer instance, or null to disable inference.
143+
*/
144+
@Override
145+
public void setMethodPayloadTypeInferrer(@Nullable MethodPayloadTypeInferrer inferrer) {
146+
this.methodPayloadTypeInferrer = inferrer;
147+
}
148+
149+
/**
150+
* Set the argument resolvers to be used for inferring payload types if a methodPayloadTypeInferrer is set.
151+
*
152+
* @param argumentResolvers the argument resolvers, may be null.
153+
*/
154+
@Override
155+
public void setArgumentResolvers(@Nullable List<HandlerMethodArgumentResolver> argumentResolvers) {
156+
this.argumentResolvers = argumentResolvers;
157+
}
158+
130159
@Override
131160
public void configureListenerMode(Consumer<ListenerMode> consumer) {
132161
List<MethodParameter> parameters = getMethodParameters();
@@ -179,6 +208,17 @@ public <T> void setupContainer(MessageListenerContainer<T> container) {
179208
Assert.notNull(this.handlerMethodFactory, "No handlerMethodFactory has been set");
180209
InvocableHandlerMethod handlerMethod = this.handlerMethodFactory.createInvocableHandlerMethod(this.bean,
181210
this.method);
211+
212+
if (this.methodPayloadTypeInferrer != null
213+
&& container instanceof AbstractMessageListenerContainer<?, ?, ?> amlc) {
214+
Class<?> inferredType = this.methodPayloadTypeInferrer.inferPayloadType(this.method,
215+
this.argumentResolvers);
216+
if (inferredType != null) {
217+
amlc.setPayloadDeserializationType(inferredType);
218+
}
219+
disableDefaultPayloadTypeMapper(amlc);
220+
}
221+
182222
if (CompletionStage.class.isAssignableFrom(handlerMethod.getReturnType().getParameterType())) {
183223
container.setAsyncMessageListener(createAsyncMessageListenerInstance(handlerMethod));
184224
}
@@ -187,6 +227,14 @@ public <T> void setupContainer(MessageListenerContainer<T> container) {
187227
}
188228
}
189229

230+
private void disableDefaultPayloadTypeMapper(AbstractMessageListenerContainer<?, ?, ?> container) {
231+
var converter = container.getContainerOptions().getMessageConverter();
232+
if (converter instanceof AbstractMessagingMessageConverter<?> amc && amc.isUsingDefaultPayloadTypeMapper()) {
233+
// Disable JavaType header based mapping
234+
amc.setPayloadTypeMapper(msg -> null);
235+
}
236+
}
237+
190238
protected <T> MessageListener<T> createMessageListenerInstance(InvocableHandlerMethod handlerMethod) {
191239
return new MessagingMessageListenerAdapter<>(handlerMethod);
192240
}

0 commit comments

Comments
 (0)