Skip to content

Commit 4c2da2e

Browse files
authored
Add Spring Integration channel adapters for SNS (#1473)
* Add Spring Integration channel adapters for SNS * Add `spring-integration-http` as an optional dep into `spring-cloud-aws-sns`. * Add `SnsInboundChannelAdapter` to consume SNS notification over HTTP(S) * Add `SnsMessageHandler` to publish to an SNS topic. * The `SnsHeaderMapper` and `SnsBodyBuilder` are supporting classes for various SNS publish scenarios * The `SnsRequestFailureException` is a generic exception with a message a and request context to throw when publication fails * Add extra useful `SnsHeaders` constant for some Spring Integration message headers * Since `SnsMessageHandler` is based on the `SnsAsyncClient`, introduce an `SnsAsyncTopicArnResolver` for such an async use-case * Add `spring-cloud-aws-starter-integration-sns` * Document this new feature * Add missed `spring-cloud-aws-starter-integration-sqs` module int the root pom * * Fix typos in docs and Javadocs for SI SNS * Simplify `try..catch` logic in the `SnsMessageHandler` and add missed `currentThread().interrupt()` * Add `.fifo` logic to the `SnsAsyncTopicArnResolver`
1 parent da4b734 commit 4c2da2e

File tree

15 files changed

+1431
-0
lines changed

15 files changed

+1431
-0
lines changed

docs/src/main/asciidoc/sns.adoc

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,3 +296,93 @@ Sample IAM policy granting access to SNS:
296296
]
297297
}
298298
----
299+
300+
=== Spring Integration Support
301+
302+
Starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon SNS.
303+
304+
The `SnsInboundChannelAdapter` is an extension of `HttpRequestHandlingMessagingGateway` and must be as a part of Spring MVC application.
305+
Its URL must be used from the AWS Management Console to add this endpoint as a subscriber to the SNS Topic.
306+
However, before receiving any notification itself, this HTTP endpoint must confirm the subscription.
307+
308+
See `SnsInboundChannelAdapter` JavaDocs for more information.
309+
310+
An important option of this adapter to consider is `handleNotificationStatus`.
311+
This `boolean` flag indicates if the adapter should send `SubscriptionConfirmation/UnsubscribeConfirmation` message to the `output-channel` or not.
312+
If that is a case, the `SnsHeaders.NOTIFICATION_STATUS_HEADER` message header is present in the message with the `NotificationStatus` object, which can be used in the downstream flow to confirm subscription or not.
313+
Or "re-confirm" it in the case of `UnsubscribeConfirmation` message.
314+
315+
In addition, the `SnsHeaders.SNS_MESSAGE_TYPE_HEADER` message header is represented to simplify a routing in the downstream flow.
316+
317+
The Java Configuration is pretty simple:
318+
319+
[source,java]
320+
----
321+
@SpringBootApplication
322+
public static class MyConfiguration {
323+
324+
@Autowired
325+
private SnsClient amazonSns;
326+
327+
@Bean
328+
public PollableChannel inputChannel() {
329+
return new QueueChannel();
330+
}
331+
332+
@Bean
333+
public HttpRequestHandler sqsMessageDrivenChannelAdapter(PollableChannel inputChannel) {
334+
SnsInboundChannelAdapter adapter = new SnsInboundChannelAdapter(this.amazonSns, "/mySampleTopic");
335+
adapter.setRequestChannel(inputChannel);
336+
adapter.setHandleNotificationStatus(true);
337+
return adapter;
338+
}
339+
}
340+
----
341+
342+
Note: by default, the message `payload` is a `Map` converted from the received Topic JSON message.
343+
For the convenience a `payload-expression` is provided with the `Message` as a root object of the evaluation context.
344+
Hence, even some HTTP headers, populated by the `DefaultHttpHeaderMapper`, are available for the evaluation context.
345+
346+
347+
The `SnsMessageHandler` is a simple one-way Outbound Channel Adapter to send Topic Notification using `SnsAsyncClient` service.
348+
349+
This Channel Adapter (`MessageHandler`) accepts these options:
350+
351+
- `topic-arn` (`topic-arn-expression`) - the SNS Topic to send notification for.
352+
- `subject` (`subject-expression`) - the SNS Notification Subject;
353+
- `body-expression` - the SpEL expression to evaluate the `message` property for the `software.amazon.awssdk.services.sns.model.PublishRequest`.
354+
- `resource-id-resolver` - a `ResourceIdResolver` bean reference to resolve logical topic names to physical resource ids;
355+
356+
See `SnsMessageHandler` JavaDocs for more information.
357+
358+
The Java Config looks like:
359+
360+
[source,java]
361+
----
362+
@Bean
363+
public MessageHandler snsMessageHandler(SnsAsyncClient amazonSns) {
364+
SnsMessageHandler handler = new SnsMessageHandler(amazonSns);
365+
handler.setTopicArn("arn:aws:sns:eu-west:123456789012:test");
366+
String bodyExpression = "T(SnsBodyBuilder).withDefault(payload).forProtocols(payload.substring(0, 140), 'sms')";
367+
handler.setBodyExpression(spelExpressionParser.parseExpression(bodyExpression));
368+
369+
// message-group ID and deduplication ID are used for FIFO topics
370+
handler.setMessageGroupId("foo-messages");
371+
String deduplicationExpression = "headers.id";
372+
handler.setMessageDeduplicationIdExpression(new FunctionExpression<Message<?>>(m -> m.getHeaders().get(MessageHeaders.ID)));
373+
return handler;
374+
}
375+
----
376+
377+
NOTE: the `bodyExpression` can be evaluated to a `io.awspring.cloud.sns.integration.SnsBodyBuilder` allowing the configuration of a `json` `messageStructure` for the `PublishRequest` and provide separate messages for different protocols.
378+
The same `SnsBodyBuilder` rule is applied for the raw `payload` if the `bodyExpression` hasn't been configured.
379+
380+
NOTE: if the `payload` of `requestMessage` is a `software.amazon.awssdk.services.sns.model.PublishRequest` already, the `SnsMessageHandler` doesn't do anything with it, and it is sent as-is.
381+
382+
The `SnsMessageHandler` can be configured with the `HeaderMapper` to map message headers to the SNS message attributes.
383+
See `SnsHeaderMapper` implementation for more information and also consult with https://docs.aws.amazon.com/sns/latest/dg/SNSMessageAttributes.html[Amazon SNS Message Attributes] about value types and restrictions.
384+
385+
The `SnsMessageHandler` supports sending to SNS FIFO topics using the `messageGroupId`/`messageGroupIdExpression` and `messageDeduplicationIdExpression` properties.
386+
387+
The Spring Integration dependency in the `spring-cloud-aws-sns` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
388+
For convenience, a dedicated `spring-cloud-aws-starter-integration-sns` is provided managing all the required dependencies for Spring Integration support with Amazon SNS.

pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@
5555
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-secrets-manager</module>
5656
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-ses</module>
5757
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-sns</module>
58+
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sns</module>
5859
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-sqs</module>
60+
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sqs</module>
5961
<module>spring-cloud-aws-samples</module>
6062
<module>spring-cloud-aws-test</module>
6163
<module>spring-cloud-aws-modulith</module>

spring-cloud-aws-dependencies/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@
208208
<version>${project.version}</version>
209209
</dependency>
210210

211+
<dependency>
212+
<groupId>io.awspring.cloud</groupId>
213+
<artifactId>spring-cloud-aws-starter-integration-sns</artifactId>
214+
<version>${project.version}</version>
215+
</dependency>
216+
211217
<dependency>
212218
<groupId>io.awspring.cloud</groupId>
213219
<artifactId>spring-cloud-aws-starter-sqs</artifactId>

spring-cloud-aws-sns/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@
3636
<artifactId>spring-web</artifactId>
3737
<optional>true</optional>
3838
</dependency>
39+
<dependency>
40+
<groupId>org.springframework.integration</groupId>
41+
<artifactId>spring-integration-http</artifactId>
42+
<optional>true</optional>
43+
</dependency>
3944
<dependency>
4045
<groupId>org.springframework</groupId>
4146
<artifactId>spring-webmvc</artifactId>
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sns.core;
17+
18+
import java.util.Map;
19+
import org.springframework.util.Assert;
20+
import software.amazon.awssdk.arns.Arn;
21+
import software.amazon.awssdk.services.sns.SnsAsyncClient;
22+
import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
23+
24+
/**
25+
* A {@link TopicArnResolver} implementation to determine topic ARN by name against an {@link SnsAsyncClient}.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 4.0
30+
*/
31+
public class SnsAsyncTopicArnResolver implements TopicArnResolver {
32+
33+
private final SnsAsyncClient snsClient;
34+
35+
public SnsAsyncTopicArnResolver(SnsAsyncClient snsClient) {
36+
Assert.notNull(snsClient, "snsClient is required");
37+
this.snsClient = snsClient;
38+
}
39+
40+
/**
41+
* Resolve topic ARN by topic name. If topicName is already an ARN, it returns {@link Arn}. If topicName is just a
42+
* string with a topic name, it attempts to create a topic, or if the topic already exists, just returns its ARN.
43+
*/
44+
@Override
45+
public Arn resolveTopicArn(String topicName) {
46+
Assert.notNull(topicName, "topicName must not be null");
47+
if (topicName.toLowerCase().startsWith("arn:")) {
48+
return Arn.fromString(topicName);
49+
}
50+
else {
51+
CreateTopicRequest.Builder builder = CreateTopicRequest.builder().name(topicName);
52+
53+
// fix for https://github.com/awspring/spring-cloud-aws/issues/707
54+
if (topicName.endsWith(".fifo")) {
55+
builder.attributes(Map.of("FifoTopic", "true"));
56+
}
57+
58+
// if the topic exists, createTopic returns a successful response with the topic arn
59+
return Arn.fromString(this.snsClient.createTopic(builder.build()).join().topicArn());
60+
}
61+
}
62+
63+
}

spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsHeaders.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,26 @@
1515
*/
1616
package io.awspring.cloud.sns.core;
1717

18+
import io.awspring.cloud.sns.handlers.NotificationStatus;
19+
import io.awspring.cloud.sns.integration.SnsInboundChannelAdapter;
1820
import org.springframework.messaging.Message;
1921
import software.amazon.awssdk.services.sns.model.PublishRequest;
22+
import software.amazon.awssdk.services.sns.model.PublishResponse;
2023

2124
/**
2225
* SNS specific headers that can be applied to Spring Messaging {@link Message}.
2326
*
2427
* @author Matej Nedic
28+
* @author Artem Bilan
2529
* @since 3.0.0
2630
*/
2731
public final class SnsHeaders {
2832

33+
/**
34+
* SNS Headers prefix to be used by all headers added by the framework.
35+
*/
36+
public static final String SNS_HEADER_PREFIX = "Sns_";
37+
2938
/**
3039
* Notification subject. The value of this header is set to {@link PublishRequest#subject()}.
3140
*/
@@ -43,6 +52,29 @@ public final class SnsHeaders {
4352
*/
4453
public static final String MESSAGE_DEDUPLICATION_ID_HEADER = "message-deduplication-id";
4554

55+
/**
56+
* Topic ARN header where the message has been published. The value of this header is set from
57+
* {@link PublishRequest#topicArn()}.
58+
*/
59+
public static final String TOPIC_HEADER = SNS_HEADER_PREFIX + "topicArn";
60+
61+
/**
62+
* Message id header as a unique identifier assigned to the published message, or from a received message. The value
63+
* of this header is set from {@link PublishResponse#messageId()}.
64+
*/
65+
public static final String MESSAGE_ID_HEADER = SNS_HEADER_PREFIX + "messageId";
66+
67+
/**
68+
* The {@link NotificationStatus} header for manual confirmation on reception. The value of this header is set from
69+
* {@link SnsInboundChannelAdapter}.
70+
*/
71+
public static final String NOTIFICATION_STATUS_HEADER = SNS_HEADER_PREFIX + "notificationStatus";
72+
73+
/**
74+
* The {@value SNS_MESSAGE_TYPE_HEADER} header for the received SNS message type.
75+
*/
76+
public static final String SNS_MESSAGE_TYPE_HEADER = SNS_HEADER_PREFIX + "messageType";
77+
4678
private SnsHeaders() {
4779

4880
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sns.integration;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
import org.springframework.util.Assert;
21+
22+
/**
23+
* A utility class to simplify an SNS Message body building. Can be used from the
24+
* {@code SnsMessageHandler#bodyExpression} definition or directly in case of manual
25+
* {@link software.amazon.awssdk.services.sns.model.PublishRequest} building.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 4.0
30+
*/
31+
public final class SnsBodyBuilder {
32+
33+
private final Map<String, String> snsMessage = new HashMap<>();
34+
35+
private SnsBodyBuilder(String defaultMessage) {
36+
Assert.hasText(defaultMessage, "defaultMessage must not be empty.");
37+
this.snsMessage.put("default", defaultMessage);
38+
}
39+
40+
public SnsBodyBuilder forProtocols(String message, String... protocols) {
41+
Assert.hasText(message, "message must not be empty.");
42+
Assert.notEmpty(protocols, "protocols must not be empty.");
43+
for (String protocol : protocols) {
44+
Assert.hasText(protocol, "protocols must not contain empty elements.");
45+
this.snsMessage.put(protocol, message);
46+
}
47+
return this;
48+
}
49+
50+
public String build() {
51+
StringBuilder stringBuilder = new StringBuilder("{");
52+
for (Map.Entry<String, String> entry : this.snsMessage.entrySet()) {
53+
stringBuilder.append("\"").append(entry.getKey()).append("\":\"")
54+
.append(entry.getValue().replaceAll("\"", "\\\\\"")).append("\",");
55+
}
56+
return stringBuilder.substring(0, stringBuilder.length() - 1) + "}";
57+
}
58+
59+
public static SnsBodyBuilder withDefault(String defaultMessage) {
60+
return new SnsBodyBuilder(defaultMessage);
61+
}
62+
63+
}

0 commit comments

Comments
 (0)