Skip to content

Commit 313d106

Browse files
authored
Add Spring Integration channel adapter for SQS (#1467)
* Add Spring Integration channel adapter for SQS * The `SqsMessageHandler` is for producing Spring Integration to SQS. Uses `SqsAsyncOperations` internally. * The `SqsMessageDrivenChannelAdapter` is for consuming messages from SQS. Uses `SqsMessageListenerContainer` internally. * Added `SqsHeaders.MESSAGE_ID` constant to represent SQS send result's `messageId` attribute * Mark The `BaseSqsIntegrationTest` as `disabledWithoutDocker = true` to avoid unnecessary build failure * Add `spring-cloud-aws-starter-integration-sqs` to manage all the required dependencies for Spring Integration with SQS
1 parent 287ce48 commit 313d106

File tree

10 files changed

+677
-5
lines changed

10 files changed

+677
-5
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2299,3 +2299,46 @@ Sample IAM policy granting access to SQS:
22992299
"Resource": "yourARN"
23002300
}
23012301
----
2302+
2303+
=== Spring Integration Support
2304+
2305+
Starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon SQS.
2306+
2307+
The `SqsMessageHandler` is for publishing a single message (or their batch) to SQS queue configured explicitly on the `SqsMessageHandler` or resolved via SpEL expression against the request message.
2308+
The logic of this `MessageHandler` is to consume Spring Integration messages from an `inputChannel` and internally it is heavily based on the `SqsAsyncOperations` mentioned above.
2309+
When the `SqsMessageHandler` is set into an `async` mode, the result of the send operation is produced as a reply message into the `outputChannel`.
2310+
For a single request, the reply message is created based on the `SendResult`.
2311+
With request message payload as a `Collection<Message<?>>`, the `SqsAsyncOperations.sendManyAsync()` is performed; the `SendResult.Batch` is produced as is as a payload of the reply message.
2312+
The minimal configuration for this channel adapter is as follows:
2313+
2314+
[source,java]
2315+
----
2316+
@Bean
2317+
@ServiceActivator(inputChannel = "sqsSendChannel")
2318+
MessageHandler sqsMessageHandler(SqsAsyncOperations sqsAsyncOperations) {
2319+
SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(sqsAsyncOperations);
2320+
sqsMessageHandler.setQueue("queue1");
2321+
return sqsMessageHandler;
2322+
}
2323+
----
2324+
2325+
The `SqsMessageDrivenChannelAdapter` is for consuming messages from one or more SQS queues.
2326+
This channel adapter requires an `SqsAsyncClient` and internally is heavily based on the `SqsMessageListenerContainer` mentioned above.
2327+
The `SqsContainerOptions` can be injected for further listener container customization.
2328+
The consumed messages are then produced to the `outputChannel` for further Spring Integration processing.
2329+
If the `ListenerMode` on the `SqsContainerOptions` is set to `BATCH`, the received `Collection<Message<?>>` is wrapped into a single message to produce.
2330+
The Spring Integration https://docs.spring.io/spring-integration/reference/splitter.html[splitter] pattern could be used downstream for per-message processing.
2331+
The minimal configuration for this channel adapter is as follows:
2332+
2333+
[source,java]
2334+
----
2335+
@Bean
2336+
MessageProducer sqsMessageDrivenChannelAdapter(SqsAsyncClient sqsAsyncClient) {
2337+
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(sqsAsyncClient, "testQueue");
2338+
adapter.setOutputChannelName("inputChannel");
2339+
return adapter;
2340+
}
2341+
----
2342+
2343+
The Spring Integration dependency in the `spring-cloud-aws-sqs` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
2344+
For convenience, a dedicated `spring-cloud-aws-starter-integration-sqs` is provided managing all the required dependencies for Spring Integration support with Amazon SQS.

spring-cloud-aws-dependencies/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@
214214
<version>${project.version}</version>
215215
</dependency>
216216

217+
<dependency>
218+
<groupId>io.awspring.cloud</groupId>
219+
<artifactId>spring-cloud-aws-starter-integration-sqs</artifactId>
220+
<version>${project.version}</version>
221+
</dependency>
222+
217223
<dependency>
218224
<groupId>io.awspring.cloud</groupId>
219225
<artifactId>spring-cloud-aws-test</artifactId>

spring-cloud-aws-sqs/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@
3030
<groupId>org.springframework</groupId>
3131
<artifactId>spring-context</artifactId>
3232
</dependency>
33+
<dependency>
34+
<groupId>org.springframework.integration</groupId>
35+
<artifactId>spring-integration-core</artifactId>
36+
<optional>true</optional>
37+
</dependency>
3338
<dependency>
3439
<groupId>org.springframework.retry</groupId>
3540
<artifactId>spring-retry</artifactId>
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.awspring.cloud.sqs.integration;
18+
19+
import java.util.Arrays;
20+
import java.util.Collection;
21+
22+
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
23+
import io.awspring.cloud.sqs.listener.MessageListener;
24+
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
25+
import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer;
26+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
27+
28+
import org.springframework.integration.endpoint.MessageProducerSupport;
29+
import org.springframework.integration.support.management.IntegrationManagedResource;
30+
import org.springframework.jmx.export.annotation.ManagedAttribute;
31+
import org.springframework.jmx.export.annotation.ManagedResource;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.messaging.support.GenericMessage;
34+
import org.springframework.util.Assert;
35+
36+
/**
37+
* The {@link MessageProducerSupport} implementation for the Amazon SQS
38+
* {@code receiveMessage}. Works in 'listener' manner and delegates hard work to the
39+
* {@link SqsMessageListenerContainer}.
40+
*
41+
* @author Artem Bilan
42+
* @author Patrick Fitzsimons
43+
*
44+
* @since 4.0
45+
*
46+
* @see SqsMessageListenerContainerFactory
47+
* @see MessageListener
48+
*/
49+
@ManagedResource
50+
@IntegrationManagedResource
51+
public class SqsMessageDrivenChannelAdapter extends MessageProducerSupport {
52+
53+
private final SqsMessageListenerContainerFactory.Builder<Object> sqsMessageListenerContainerFactory =
54+
SqsMessageListenerContainerFactory.builder();
55+
56+
private final String[] queues;
57+
58+
private SqsContainerOptions sqsContainerOptions;
59+
60+
private SqsMessageListenerContainer<?> listenerContainer;
61+
62+
public SqsMessageDrivenChannelAdapter(SqsAsyncClient amazonSqs, String... queues) {
63+
Assert.noNullElements(queues, "'queues' must not be empty");
64+
this.sqsMessageListenerContainerFactory.sqsAsyncClient(amazonSqs);
65+
this.queues = Arrays.copyOf(queues, queues.length);
66+
}
67+
68+
public void setSqsContainerOptions(SqsContainerOptions sqsContainerOptions) {
69+
this.sqsContainerOptions = sqsContainerOptions;
70+
}
71+
72+
@Override
73+
protected void onInit() {
74+
super.onInit();
75+
if (this.sqsContainerOptions != null) {
76+
this.sqsMessageListenerContainerFactory.configure(sqsContainerOptionsBuilder ->
77+
sqsContainerOptionsBuilder.fromBuilder(this.sqsContainerOptions.toBuilder()));
78+
}
79+
this.sqsMessageListenerContainerFactory.messageListener(new IntegrationMessageListener());
80+
this.listenerContainer = this.sqsMessageListenerContainerFactory.build().createContainer(this.queues);
81+
}
82+
83+
@Override
84+
public String getComponentType() {
85+
return "aws:sqs-message-driven-channel-adapter";
86+
}
87+
88+
@Override
89+
protected void doStart() {
90+
this.listenerContainer.start();
91+
}
92+
93+
@Override
94+
protected void doStop() {
95+
this.listenerContainer.stop();
96+
}
97+
98+
@ManagedAttribute
99+
public String[] getQueues() {
100+
return Arrays.copyOf(this.queues, this.queues.length);
101+
}
102+
103+
private class IntegrationMessageListener implements MessageListener<Object> {
104+
105+
IntegrationMessageListener() {
106+
}
107+
108+
@Override
109+
public void onMessage(Message<Object> message) {
110+
sendMessage(message);
111+
}
112+
113+
@Override
114+
public void onMessage(Collection<Message<Object>> messages) {
115+
onMessage(new GenericMessage<>(messages));
116+
}
117+
118+
}
119+
120+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.awspring.cloud.sqs.integration;
18+
19+
import java.util.Collection;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.TimeoutException;
24+
25+
import io.awspring.cloud.sqs.listener.SqsHeaders;
26+
import io.awspring.cloud.sqs.operations.SendResult;
27+
import io.awspring.cloud.sqs.operations.SqsAsyncOperations;
28+
29+
import org.springframework.expression.EvaluationContext;
30+
import org.springframework.expression.Expression;
31+
import org.springframework.expression.common.LiteralExpression;
32+
import org.springframework.integration.MessageTimeoutException;
33+
import org.springframework.integration.expression.ExpressionUtils;
34+
import org.springframework.integration.expression.ValueExpression;
35+
import org.springframework.integration.handler.AbstractMessageProducingHandler;
36+
import org.springframework.messaging.Message;
37+
import org.springframework.util.Assert;
38+
39+
/**
40+
* The {@link AbstractMessageProducingHandler} implementation for the Amazon SQS.
41+
* All the logic based on the {@link SqsAsyncOperations#sendAsync(String, Message)}
42+
* or {@link SqsAsyncOperations#sendManyAsync(String, Collection)} if the request message's payload
43+
* is a collection of {@link Message} instances.
44+
* <p>
45+
* All the SQS-specific message attributes have to be provided in the respective message headers
46+
* via {@link SqsHeaders.MessageSystemAttributes} constant values or with the {@link SqsAsyncOperations}.
47+
* <p>
48+
* This {@link AbstractMessageProducingHandler} produces a reply only in the {@link #isAsync()} mode.
49+
* For a single request message the {@link SendResult} is converted to the reply message with respective headers.
50+
* The {@link SendResult.Batch} is sent as a reply message's payload as is.
51+
*
52+
* @author Artem Bilan
53+
*
54+
* @since 4.0
55+
*
56+
* @see SqsAsyncOperations#sendAsync
57+
* @see SqsAsyncOperations#sendManyAsync
58+
* @see SqsHeaders.MessageSystemAttributes
59+
*/
60+
public class SqsMessageHandler extends AbstractMessageProducingHandler {
61+
62+
public static final long DEFAULT_SEND_TIMEOUT = 10000;
63+
64+
private final SqsAsyncOperations sqsAsyncOperations;
65+
66+
private Expression queueExpression;
67+
68+
private EvaluationContext evaluationContext;
69+
70+
private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
71+
72+
public SqsMessageHandler(SqsAsyncOperations sqsAsyncOperations) {
73+
this.sqsAsyncOperations = sqsAsyncOperations;
74+
}
75+
76+
public void setQueue(String queue) {
77+
setQueueExpression(new LiteralExpression(queue));
78+
}
79+
80+
public void setQueueExpressionString(String queueExpression) {
81+
setQueueExpression(EXPRESSION_PARSER.parseExpression(queueExpression));
82+
}
83+
84+
public void setQueueExpression(Expression queueExpression) {
85+
this.queueExpression = queueExpression;
86+
}
87+
88+
public void setSendTimeout(long sendTimeout) {
89+
setSendTimeoutExpression(new ValueExpression<>(sendTimeout));
90+
}
91+
92+
public void setSendTimeoutExpressionString(String sendTimeoutExpression) {
93+
setSendTimeoutExpression(EXPRESSION_PARSER.parseExpression(sendTimeoutExpression));
94+
}
95+
96+
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
97+
Assert.notNull(sendTimeoutExpression, "'sendTimeoutExpression' must not be null");
98+
this.sendTimeoutExpression = sendTimeoutExpression;
99+
}
100+
101+
@Override
102+
protected void onInit() {
103+
Assert.notNull(this.queueExpression, "The SQS queue must be provided.");
104+
super.onInit();
105+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
106+
}
107+
108+
@Override
109+
protected boolean shouldCopyRequestHeaders() {
110+
return false;
111+
}
112+
113+
@Override
114+
@SuppressWarnings("unchecked")
115+
protected void handleMessageInternal(Message<?> message) {
116+
String queueName = this.queueExpression.getValue(this.evaluationContext, message, String.class);
117+
Assert.hasText(queueName, "The 'queueExpression' must not evaluate to empty String.");
118+
CompletableFuture<?> resultFuture;
119+
if (message.getPayload() instanceof Collection<?> collection) {
120+
Assert.notEmpty(collection, "The payload with a collection of messages must not be empty.");
121+
Object next = collection.iterator().next();
122+
Assert.isInstanceOf(Message.class, next,
123+
"The payload with a collection of messages must contain 'Message' instances only.");
124+
Collection<Message<Object>> messages = (Collection<Message<Object>>) collection;
125+
126+
resultFuture = this.sqsAsyncOperations.sendManyAsync(queueName, messages)
127+
.thenApply((batchResult) -> getMessageBuilderFactory().withPayload(batchResult).build());
128+
}
129+
else {
130+
resultFuture = this.sqsAsyncOperations.sendAsync(queueName, message)
131+
.thenApply((sendResult) ->
132+
getMessageBuilderFactory()
133+
.fromMessage(sendResult.message())
134+
.setHeader(SqsHeaders.SQS_QUEUE_NAME_HEADER, sendResult.endpoint())
135+
.setHeader(SqsHeaders.MessageSystemAttributes.MESSAGE_ID, sendResult.messageId())
136+
.copyHeaders(sendResult.additionalInformation())
137+
.build());
138+
}
139+
140+
if (isAsync()) {
141+
sendOutputs(resultFuture, message);
142+
return;
143+
}
144+
145+
Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
146+
if (sendTimeout == null || sendTimeout < 0) {
147+
try {
148+
resultFuture.get();
149+
}
150+
catch (InterruptedException | ExecutionException ex) {
151+
throw new IllegalStateException(ex);
152+
}
153+
}
154+
else {
155+
try {
156+
resultFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
157+
}
158+
catch (TimeoutException te) {
159+
throw new MessageTimeoutException(message, "Timeout waiting for response from Amazon SQS", te);
160+
}
161+
catch (InterruptedException ex) {
162+
Thread.currentThread().interrupt();
163+
throw new IllegalStateException(ex);
164+
}
165+
catch (ExecutionException ex) {
166+
throw new IllegalStateException(ex);
167+
}
168+
}
169+
}
170+
171+
}

0 commit comments

Comments
 (0)