1
+ /*
2
+ * Copyright 2013-2023 the original author or authors.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * https://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+ package io .awspring .cloud .sqs .integration ;
17
+
18
+ import io .awspring .cloud .sqs .operations .BatchingSqsClientAdapter ;
19
+ import org .junit .jupiter .api .Test ;
20
+ import org .springframework .beans .factory .annotation .Autowired ;
21
+ import org .springframework .boot .test .context .SpringBootTest ;
22
+ import org .springframework .context .annotation .Bean ;
23
+ import org .springframework .context .annotation .Configuration ;
24
+ import software .amazon .awssdk .services .sqs .SqsAsyncClient ;
25
+ import software .amazon .awssdk .services .sqs .batchmanager .SqsAsyncBatchManager ;
26
+ import software .amazon .awssdk .services .sqs .model .*;
27
+
28
+ import java .time .Duration ;
29
+ import java .util .UUID ;
30
+ import java .util .concurrent .CompletableFuture ;
31
+ import java .util .concurrent .Executors ;
32
+
33
+ import static org .assertj .core .api .Assertions .assertThat ;
34
+
35
+ /**
36
+ * Integration tests for the Sqs Batching Client Adapter.
37
+ *
38
+ * @author khc41
39
+ */
40
+ @ SpringBootTest
41
+ public class BatchingSqsClientAdapterIntegrationTests extends BaseSqsIntegrationTest {
42
+
43
+ private static final String BASE_QUEUE_NAME = "batching-test-queue" ;
44
+
45
+ @ Autowired
46
+ private SqsAsyncClient asyncClient ;
47
+
48
+ @ Test
49
+ void shouldReturnCorrectServiceName () {
50
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
51
+ String serviceName = adapter .serviceName ();
52
+ assertThat (serviceName ).isEqualTo (SqsAsyncClient .SERVICE_NAME );
53
+ }
54
+ }
55
+
56
+ @ Test
57
+ void shouldSendMessageThroughBatchManager () {
58
+ String queueName = createUniqueQueueName ();
59
+ createQueue (this .asyncClient , queueName ).join ();
60
+
61
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
62
+ String messageBody = "Test message for batching" ;
63
+ SendMessageRequest request = SendMessageRequest .builder ()
64
+ .queueUrl (queueName )
65
+ .messageBody (messageBody )
66
+ .build ();
67
+
68
+ SendMessageResponse response = adapter .sendMessage (request ).join ();
69
+
70
+ assertThat (response .messageId ()).isNotNull ();
71
+
72
+ ReceiveMessageResponse received = this .asyncClient .receiveMessage (ReceiveMessageRequest .builder ()
73
+ .queueUrl (queueName )
74
+ .maxNumberOfMessages (1 )
75
+ .build ()).join ();
76
+
77
+ assertThat (received .messages ()).hasSize (1 );
78
+ assertThat (received .messages ().get (0 ).body ()).isEqualTo (messageBody );
79
+ }
80
+ }
81
+
82
+ @ Test
83
+ void shouldSendMessageWithConsumer () {
84
+ String queueName = createUniqueQueueName ();
85
+ createQueue (this .asyncClient , queueName ).join ();
86
+
87
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
88
+ String messageBody = "Test message with consumer" ;
89
+
90
+ SendMessageResponse response = adapter .sendMessage (builder ->
91
+ builder .queueUrl (queueName ).messageBody (messageBody )).join ();
92
+
93
+ assertThat (response .messageId ()).isNotNull ();
94
+
95
+ ReceiveMessageResponse received = this .asyncClient .receiveMessage (ReceiveMessageRequest .builder ()
96
+ .queueUrl (queueName )
97
+ .maxNumberOfMessages (1 )
98
+ .build ()).join ();
99
+
100
+ assertThat (received .messages ()).hasSize (1 );
101
+ assertThat (received .messages ().get (0 ).body ()).isEqualTo (messageBody );
102
+ }
103
+ }
104
+
105
+ @ Test
106
+ void shouldReceiveMessageThroughBatchManager () throws InterruptedException {
107
+ String queueName = createUniqueQueueName ();
108
+ createQueue (this .asyncClient , queueName ).join ();
109
+
110
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
111
+ String messageBody = "Test message for receiving" ;
112
+ this .asyncClient .sendMessage (SendMessageRequest .builder ()
113
+ .queueUrl (queueName )
114
+ .messageBody (messageBody )
115
+ .build ()).join ();
116
+
117
+ Thread .sleep (200 );
118
+
119
+ ReceiveMessageResponse response = adapter .receiveMessage (ReceiveMessageRequest .builder ()
120
+ .queueUrl (queueName )
121
+ .maxNumberOfMessages (1 )
122
+ .build ()).join ();
123
+
124
+ assertThat (response .messages ()).hasSize (1 );
125
+ assertThat (response .messages ().get (0 ).body ()).isEqualTo (messageBody );
126
+ }
127
+ }
128
+
129
+ @ Test
130
+ void shouldReceiveMessageWithConsumer () throws InterruptedException {
131
+ String queueName = createUniqueQueueName ();
132
+ createQueue (this .asyncClient , queueName ).join ();
133
+
134
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
135
+ String messageBody = "Test message for receiving with consumer" ;
136
+ this .asyncClient .sendMessage (SendMessageRequest .builder ()
137
+ .queueUrl (queueName )
138
+ .messageBody (messageBody )
139
+ .build ()).join ();
140
+
141
+ Thread .sleep (200 );
142
+
143
+ ReceiveMessageResponse response = adapter .receiveMessage (builder ->
144
+ builder .queueUrl (queueName ).maxNumberOfMessages (1 )).join ();
145
+
146
+ assertThat (response .messages ()).hasSize (1 );
147
+ assertThat (response .messages ().get (0 ).body ()).isEqualTo (messageBody );
148
+ }
149
+ }
150
+
151
+ @ Test
152
+ void shouldDeleteMessageThroughBatchManager () {
153
+ String queueName = createUniqueQueueName ();
154
+ createQueue (this .asyncClient , queueName ).join ();
155
+
156
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
157
+ String messageBody = "Test message for deletion" ;
158
+ this .asyncClient .sendMessage (SendMessageRequest .builder ()
159
+ .queueUrl (queueName )
160
+ .messageBody (messageBody )
161
+ .build ()).join ();
162
+
163
+ ReceiveMessageResponse received = this .asyncClient .receiveMessage (ReceiveMessageRequest .builder ()
164
+ .queueUrl (queueName )
165
+ .maxNumberOfMessages (1 )
166
+ .build ()).join ();
167
+
168
+ assertThat (received .messages ()).hasSize (1 );
169
+ String receiptHandle = received .messages ().get (0 ).receiptHandle ();
170
+
171
+ DeleteMessageResponse deleteResponse = adapter .deleteMessage (DeleteMessageRequest .builder ()
172
+ .queueUrl (queueName )
173
+ .receiptHandle (receiptHandle )
174
+ .build ()).join ();
175
+
176
+ assertThat (deleteResponse ).isNotNull ();
177
+
178
+ ReceiveMessageResponse afterDelete = this .asyncClient .receiveMessage (ReceiveMessageRequest .builder ()
179
+ .queueUrl (queueName )
180
+ .maxNumberOfMessages (1 )
181
+ .waitTimeSeconds (1 )
182
+ .build ()).join ();
183
+
184
+ assertThat (afterDelete .messages ()).isEmpty ();
185
+ }
186
+ }
187
+
188
+ @ Test
189
+ void shouldDeleteMessageWithConsumer () {
190
+ String queueName = createUniqueQueueName ();
191
+ createQueue (this .asyncClient , queueName ).join ();
192
+
193
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
194
+ String messageBody = "Test message for deletion with consumer" ;
195
+ this .asyncClient .sendMessage (SendMessageRequest .builder ()
196
+ .queueUrl (queueName )
197
+ .messageBody (messageBody )
198
+ .build ()).join ();
199
+
200
+ ReceiveMessageResponse received = this .asyncClient .receiveMessage (ReceiveMessageRequest .builder ()
201
+ .queueUrl (queueName )
202
+ .maxNumberOfMessages (1 )
203
+ .build ()).join ();
204
+
205
+ String receiptHandle = received .messages ().get (0 ).receiptHandle ();
206
+
207
+ DeleteMessageResponse deleteResponse = adapter .deleteMessage (builder ->
208
+ builder .queueUrl (queueName ).receiptHandle (receiptHandle )).join ();
209
+
210
+ assertThat (deleteResponse ).isNotNull ();
211
+ }
212
+ }
213
+
214
+ @ Test
215
+ void shouldChangeMessageVisibilityThroughBatchManager () {
216
+ String queueName = createUniqueQueueName ();
217
+ createQueue (this .asyncClient , queueName ).join ();
218
+
219
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
220
+ String messageBody = "Test message for visibility change" ;
221
+ this .asyncClient .sendMessage (SendMessageRequest .builder ()
222
+ .queueUrl (queueName )
223
+ .messageBody (messageBody )
224
+ .build ()).join ();
225
+
226
+ ReceiveMessageResponse received = this .asyncClient .receiveMessage (ReceiveMessageRequest .builder ()
227
+ .queueUrl (queueName )
228
+ .maxNumberOfMessages (1 )
229
+ .visibilityTimeout (5 )
230
+ .build ()).join ();
231
+
232
+ String receiptHandle = received .messages ().get (0 ).receiptHandle ();
233
+
234
+ ChangeMessageVisibilityResponse response = adapter .changeMessageVisibility (
235
+ ChangeMessageVisibilityRequest .builder ()
236
+ .queueUrl (queueName )
237
+ .receiptHandle (receiptHandle )
238
+ .visibilityTimeout (30 )
239
+ .build ()).join ();
240
+
241
+ assertThat (response ).isNotNull ();
242
+ }
243
+ }
244
+
245
+ @ Test
246
+ void shouldChangeMessageVisibilityWithConsumer () {
247
+ String queueName = createUniqueQueueName ();
248
+ createQueue (this .asyncClient , queueName ).join ();
249
+
250
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
251
+ String messageBody = "Test message for visibility change with consumer" ;
252
+ this .asyncClient .sendMessage (SendMessageRequest .builder ()
253
+ .queueUrl (queueName )
254
+ .messageBody (messageBody )
255
+ .build ()).join ();
256
+
257
+ ReceiveMessageResponse received = this .asyncClient .receiveMessage (ReceiveMessageRequest .builder ()
258
+ .queueUrl (queueName )
259
+ .maxNumberOfMessages (1 )
260
+ .visibilityTimeout (5 )
261
+ .build ()).join ();
262
+
263
+ String receiptHandle = received .messages ().get (0 ).receiptHandle ();
264
+
265
+ ChangeMessageVisibilityResponse response = adapter .changeMessageVisibility (builder ->
266
+ builder .queueUrl (queueName ).receiptHandle (receiptHandle ).visibilityTimeout (30 )).join ();
267
+
268
+ assertThat (response ).isNotNull ();
269
+ }
270
+ }
271
+
272
+ @ Test
273
+ void shouldHandleBatchingEfficiently () throws InterruptedException {
274
+ String queueName = createUniqueQueueName ();
275
+ createQueue (this .asyncClient , queueName ).join ();
276
+
277
+ try (BatchingSqsClientAdapter adapter = createBatchingAdapter ()) {
278
+ int messageCount = 5 ;
279
+ String messageBodyPrefix = "Batch test message " ;
280
+
281
+ CompletableFuture <SendMessageResponse >[] futures = new CompletableFuture [messageCount ];
282
+
283
+ for (int i = 0 ; i < messageCount ; i ++) {
284
+ futures [i ] = adapter .sendMessage (SendMessageRequest .builder ()
285
+ .queueUrl (queueName )
286
+ .messageBody (messageBodyPrefix + i )
287
+ .build ());
288
+ }
289
+
290
+ CompletableFuture .allOf (futures ).join ();
291
+
292
+ for (CompletableFuture <SendMessageResponse > future : futures ) {
293
+ assertThat (future .join ().messageId ()).isNotNull ();
294
+ }
295
+
296
+ Thread .sleep (200 );
297
+
298
+ ReceiveMessageResponse received = this .asyncClient .receiveMessage (ReceiveMessageRequest .builder ()
299
+ .queueUrl (queueName )
300
+ .maxNumberOfMessages (10 )
301
+ .build ()).join ();
302
+
303
+ assertThat (received .messages ()).hasSize (messageCount );
304
+ }
305
+ }
306
+
307
+ private String createUniqueQueueName () {
308
+ return BASE_QUEUE_NAME + "-" + UUID .randomUUID ().toString ().substring (0 , 8 );
309
+ }
310
+
311
+ private BatchingSqsClientAdapter createBatchingAdapter () {
312
+ SqsAsyncBatchManager batchManager = SqsAsyncBatchManager .builder ()
313
+ .client (this .asyncClient )
314
+ .scheduledExecutor (Executors .newScheduledThreadPool (2 ))
315
+ .overrideConfiguration (builder -> builder
316
+ .maxBatchSize (10 )
317
+ .sendRequestFrequency (Duration .ofMillis (100 )))
318
+ .build ();
319
+
320
+ return new BatchingSqsClientAdapter (batchManager );
321
+ }
322
+
323
+ @ Configuration
324
+ static class SQSConfiguration {
325
+
326
+ @ Bean
327
+ SqsAsyncClient client () {
328
+ return createAsyncClient ();
329
+ }
330
+ }
331
+ }
0 commit comments