Skip to content

Commit 4caa14c

Browse files
authored
feat: add SNS notification resolver to SQS module (#1419)
1 parent 02e997c commit 4caa14c

File tree

12 files changed

+1162
-11
lines changed

12 files changed

+1162
-11
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,20 @@ public void listen(@SnsNotificationSubject String subject, @SnsNotificationMessa
694694
}
695695
----
696696

697+
Since 3.4.1, the complete SNS notification can be received with all its metadata using the `SnsNotification<T>` wrapper class. This provides access to both the message payload and all the metadata such as messageId, topicArn, subject, timestamp, and message attributes.
698+
699+
[source, java]
700+
----
701+
@SqsListener("my-queue")
702+
public void listen(SnsNotification<Pojo> notification) {
703+
String messageId = notification.getMessageId();
704+
String topicArn = notification.getTopicArn();
705+
notification.getSubject().ifPresent(subject -> System.out.println("Subject: " + subject));
706+
Pojo message = notification.getMessage();
707+
System.out.println("Message from topic " + topicArn + " with ID " + messageId + ": " + message);
708+
}
709+
----
710+
697711
===== Specifying a MessageListenerContainerFactory
698712
A `MessageListenerContainerFactory` can be specified through the `factory` property.
699713
Such factory will then be used to create the container for the annotated method.
@@ -738,6 +752,8 @@ A number of possible argument types are allowed in the listener method's signatu
738752
- `Message<MyPojo>` - Provides a `Message<MyPojo>` instance with the deserialized payload and `MessageHeaders`.
739753
- `List<MyPojo>` - Enables batch mode and receives the batch that was polled from SQS.
740754
- `List<Message<MyPojo>>` - Enables batch mode and receives the batch that was polled from SQS along with headers.
755+
- `SnsNotification<MyPojo>` - Provides access to both the message payload and all the metadata of an SNS notification.
756+
See <<SNS Messages>> for more information.
741757
- `@Header(String headerName)` - provides the specified header.
742758
- `@Headers` - provides the `MessageHeaders` or a `Map<String, Object>`
743759
- `Acknowledgement` - provides methods for manually acknowledging messages for single message listeners.
@@ -1948,7 +1964,7 @@ SqsListenerObservation.Convention sqsListenerObservationConvention() {
19481964
.and("messaging.operation", "receive")
19491965
.and("custom.tag", "custom-value");
19501966
}
1951-
1967+
19521968
@Override
19531969
public KeyValues getHighCardinalityKeyValues(SqsListenerObservation.Context context) {
19541970
String paymentId = MessageHeaderUtils.getHeaderAsString(context.getMessage(), "payment-id-header-name");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.sample;
17+
18+
import io.awspring.cloud.sqs.annotation.SqsListener;
19+
import io.awspring.cloud.sqs.operations.SqsTemplate;
20+
import io.awspring.cloud.sqs.support.converter.SnsNotification;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
import org.springframework.boot.ApplicationRunner;
24+
import org.springframework.context.annotation.Bean;
25+
import org.springframework.stereotype.Component;
26+
27+
/**
28+
* Sample demonstrating how to receive SNS notifications from SQS.
29+
*
30+
* @author Damien Chomat
31+
*/
32+
@Component
33+
public class SnsNotificationSample {
34+
35+
private static final Logger LOGGER = LoggerFactory.getLogger(SnsNotificationSample.class);
36+
37+
/**
38+
* Receives SNS notifications from the "sns-notification-custom-queue" SQS queue. The message payload is
39+
* automatically converted to a CustomMessage object.
40+
*
41+
* @param notification the SNS notification wrapper containing the message and metadata
42+
*/
43+
@SqsListener("sns-notification-custom-queue")
44+
public void receiveCustomMessage(SnsNotification<CustomMessage> notification) {
45+
LOGGER.info("Received SNS notification with ID: {}", notification.getMessageId());
46+
LOGGER.info("From topic: {}", notification.getTopicArn());
47+
notification.getSubject().ifPresent(subject -> LOGGER.info("Subject: {}", subject));
48+
49+
CustomMessage message = notification.getMessage();
50+
LOGGER.info("Message content: {}", message.content());
51+
LOGGER.info("Message timestamp: {}", message.timestamp());
52+
53+
LOGGER.info("Notification timestamp: {}", notification.getTimestamp());
54+
LOGGER.info("Message attributes: {}", notification.getMessageAttributes());
55+
}
56+
57+
/**
58+
* ApplicationRunner to send sample SNS messages to the queues for demonstration purposes. This simulates SNS
59+
* notifications being sent to SQS queues that are subscribed to SNS topics.
60+
*/
61+
@Bean
62+
public ApplicationRunner sendSnsNotificationMessage(SqsTemplate sqsTemplate) {
63+
return args -> {
64+
// Simulate an SNS notification for an order processing topic
65+
String orderNotificationMessage = """
66+
{
67+
"Type": "Notification",
68+
"MessageId": "order-12345-notification",
69+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:order-processing-topic",
70+
"Subject": "Order Processing Update",
71+
"Message": "{\\"content\\": \\"Order #12345 has been processed successfully\\", \\"timestamp\\": 1672531200000}",
72+
"Timestamp": "2023-01-01T12:00:00Z",
73+
"MessageAttributes": {
74+
"eventType": {
75+
"Type": "String",
76+
"Value": "ORDER_PROCESSED"
77+
},
78+
"priority": {
79+
"Type": "String",
80+
"Value": "high"
81+
}
82+
}
83+
}
84+
""";
85+
86+
LOGGER.info("Sending SNS notification messages to subscribed SQS queue...");
87+
sqsTemplate.send("sns-notification-custom-queue", orderNotificationMessage);
88+
};
89+
}
90+
91+
/**
92+
* A custom message record for demonstration purposes.
93+
*/
94+
public record CustomMessage(String content, long timestamp) {
95+
}
96+
}

spring-cloud-aws-samples/spring-cloud-aws-sqs-sample/src/main/java/io/awspring/cloud/sqs/sample/SpringSqsHandlerSample.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.springframework.context.annotation.Configuration;
2626

2727
/**
28-
* Sample class to demonstrate how to handle multiple message types in a single listener
29-
* with {@link SqsHandler} annotation.
28+
* Sample class to demonstrate how to handle multiple message types in a single listener with {@link SqsHandler}
29+
* annotation.
3030
*
3131
* @author José Iêdo
3232
*/
@@ -37,8 +37,12 @@ public class SpringSqsHandlerSample {
3737
public static final String QUEUE_NAME = "multi-method-queue";
3838
private static final Logger LOGGER = LoggerFactory.getLogger(SpringSqsHandlerSample.class);
3939

40-
private interface BaseMessage { }
41-
private record SampleRecord(String propertyOne, String propertyTwo) { }
40+
private interface BaseMessage {
41+
}
42+
43+
private record SampleRecord(String propertyOne, String propertyTwo) {
44+
}
45+
4246
private record AnotherSampleRecord(String propertyOne, String propertyTwo) implements BaseMessage { }
4347

4448
@SqsHandler
@@ -56,7 +60,6 @@ void handleMessage(Object message) {
5660
LOGGER.info("Received message of type Object: {}", message);
5761
}
5862

59-
6063
@Bean
6164
public ApplicationRunner sendMessageToQueueWithMultipleHandlers(SqsTemplate sqsTemplate) {
6265
return args -> {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
* <p>
3030
* Each payload type must have exactly one corresponding method.
3131
* <p>
32-
* If no method matches the payload type, a method marked as the default (using {@code isDefault = true}) will be invoked.
33-
* Only one method can be designated as the default.
32+
* If no method matches the payload type, a method marked as the default (using {@code isDefault = true}) will be
33+
* invoked. Only one method can be designated as the default.
3434
*
3535
* @author José Iêdo
3636
*/
@@ -41,8 +41,8 @@
4141
public @interface SqsHandler {
4242

4343
/**
44-
* Indicates whether this method should be used as the default fallback method if no other {@link SqsHandler}
45-
* method matches the payload type.
44+
* Indicates whether this method should be used as the default fallback method if no other {@link SqsHandler} method
45+
* matches the payload type.
4646
*
4747
* @return {@code true} if this is the default method, {@code false} otherwise
4848
*/

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.awspring.cloud.sqs.support.resolver.NotificationMessageArgumentResolver;
2626
import io.awspring.cloud.sqs.support.resolver.NotificationSubjectArgumentResolver;
2727
import io.awspring.cloud.sqs.support.resolver.QueueAttributesMethodArgumentResolver;
28+
import io.awspring.cloud.sqs.support.resolver.SnsNotificationArgumentResolver;
2829
import io.awspring.cloud.sqs.support.resolver.SqsMessageMethodArgumentResolver;
2930
import io.awspring.cloud.sqs.support.resolver.VisibilityHandlerMethodArgumentResolver;
3031
import java.lang.reflect.Method;
@@ -110,6 +111,7 @@ protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentReso
110111
if (objectMapper != null) {
111112
argumentResolvers.add(new NotificationMessageArgumentResolver(messageConverter, objectMapper));
112113
argumentResolvers.add(new NotificationSubjectArgumentResolver(objectMapper));
114+
argumentResolvers.add(new SnsNotificationArgumentResolver(messageConverter, objectMapper));
113115
}
114116
return argumentResolvers;
115117
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/MultiMethodSqsEndpoint.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ public <T> void setupContainer(MessageListenerContainer<T> container) {
140140
container.setMessageListener(createMessageListenerInstance(compositeInvocableHandler));
141141
}
142142

143-
protected <T> MessageListener<T> createMessageListenerInstance(CompositeInvocableHandler compositeInvocableHandler) {
143+
protected <T> MessageListener<T> createMessageListenerInstance(
144+
CompositeInvocableHandler compositeInvocableHandler) {
144145
return new MessagingMessageListenerAdapter<>(compositeInvocableHandler);
145146
}
146147
}

0 commit comments

Comments
 (0)