37
37
import io .awspring .cloud .sqs .listener .acknowledgement .AcknowledgementResultCallback ;
38
38
import io .awspring .cloud .sqs .listener .acknowledgement .handler .AcknowledgementHandler ;
39
39
import io .awspring .cloud .sqs .listener .acknowledgement .handler .OnSuccessAcknowledgementHandler ;
40
+ import io .awspring .cloud .sqs .operations .SqsTemplate ;
40
41
import java .time .Duration ;
41
42
import java .util .ArrayList ;
42
43
import java .util .Collection ;
48
49
import java .util .concurrent .CompletableFuture ;
49
50
import java .util .concurrent .ConcurrentHashMap ;
50
51
import java .util .concurrent .CountDownLatch ;
51
- import java .util .concurrent .ExecutionException ;
52
52
import java .util .concurrent .TimeUnit ;
53
53
import java .util .concurrent .atomic .AtomicBoolean ;
54
54
import java .util .concurrent .atomic .AtomicInteger ;
59
59
import org .slf4j .LoggerFactory ;
60
60
import org .springframework .beans .factory .SmartInitializingSingleton ;
61
61
import org .springframework .beans .factory .annotation .Autowired ;
62
- import org .springframework .beans .factory .annotation .Qualifier ;
63
62
import org .springframework .boot .test .context .SpringBootTest ;
64
63
import org .springframework .context .annotation .Bean ;
65
64
import org .springframework .context .annotation .Configuration ;
66
65
import org .springframework .context .annotation .Import ;
67
66
import org .springframework .messaging .Message ;
68
67
import org .springframework .messaging .handler .annotation .Header ;
68
+ import org .springframework .messaging .support .MessageBuilder ;
69
69
import org .springframework .util .Assert ;
70
70
import org .springframework .util .StopWatch ;
71
71
import software .amazon .awssdk .services .sqs .SqsAsyncClient ;
72
72
import software .amazon .awssdk .services .sqs .model .QueueAttributeName ;
73
- import software .amazon .awssdk .services .sqs .model .SendMessageBatchRequestEntry ;
74
- import software .amazon .awssdk .services .sqs .model .SendMessageBatchResponse ;
75
73
76
74
/**
77
75
* Integration tests for handling SQS FIFO queues.
78
76
*
79
77
* @author Tomaz Fernandes
78
+ * @author Mikhail Strokov
80
79
*/
81
80
@ SpringBootTest
82
81
class SqsFifoIntegrationTests extends BaseSqsIntegrationTest {
@@ -101,16 +100,13 @@ class SqsFifoIntegrationTests extends BaseSqsIntegrationTest {
101
100
102
101
static final String FIFO_MANUALLY_CREATE_BATCH_FACTORY_QUEUE_NAME = "fifo_manually_create_batch_factory_test_queue.fifo" ;
103
102
104
- private static final String TEST_SQS_ASYNC_CLIENT_BEAN_NAME = "testSqsAsyncClient" ;
105
-
106
103
private static final String ERROR_ON_ACK_FACTORY = "errorOnAckFactory" ;
107
104
108
105
@ Autowired
109
106
LatchContainer latchContainer ;
110
107
111
108
@ Autowired
112
- @ Qualifier (TEST_SQS_ASYNC_CLIENT_BEAN_NAME )
113
- SqsAsyncClient sqsAsyncClient ;
109
+ SqsTemplate sqsTemplate ;
114
110
115
111
@ Autowired
116
112
ObjectMapper objectMapper ;
@@ -176,6 +172,7 @@ public void afterSingletonsInstantiated() {
176
172
loadSimulator .setBound (1000 );
177
173
loadSimulator .setRandom (true );
178
174
}
175
+
179
176
}
180
177
181
178
@ Test
@@ -184,15 +181,14 @@ void receivesMessagesInOrder() throws Exception {
184
181
String messageGroupId = UUID .randomUUID ().toString ();
185
182
List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
186
183
.collect (toList ());
187
- String queueUrl = fetchQueueUrl (FIFO_RECEIVES_MESSAGES_IN_ORDER_QUEUE_NAME );
188
- sendMessageTo ( queueUrl , values , messageGroupId );
184
+ sqsTemplate . sendMany (FIFO_RECEIVES_MESSAGES_IN_ORDER_QUEUE_NAME ,
185
+ createMessagesFromValues ( messageGroupId , values ) );
189
186
assertThat (latchContainer .receivesMessageLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS )).isTrue ();
190
187
assertThat (receivesMessageInOrderListener .receivedMessages ).containsExactlyElementsOf (values );
191
188
}
192
189
193
190
@ Test
194
191
void receivesMessagesInOrderFromManyMessageGroups () throws Exception {
195
- String queueUrl = fetchQueueUrl (FIFO_RECEIVES_MESSAGE_IN_ORDER_MANY_GROUPS_QUEUE_NAME );
196
192
int messagesPerTest = Math .max (this .settings .messagesPerTest , 30 );
197
193
int numberOfMessageGroups = messagesPerTest / Math .max (this .settings .messagesPerMessageGroup , 10 );
198
194
int messagesPerMessageGroup = Math .max (messagesPerTest / numberOfMessageGroups , 1 );
@@ -206,7 +202,21 @@ void receivesMessagesInOrderFromManyMessageGroups() throws Exception {
206
202
LoadSimulator loadSimulator = new LoadSimulator ().setLoadEnabled (true ).setRandom (true ).setBound (20 );
207
203
IntStream .range (0 , messageGroups .size ()).forEach (index -> {
208
204
if (this .settings .sendMessages ) {
209
- sendMessageTo (queueUrl , values , messageGroups .get (index ));
205
+ try {
206
+ if (useLocalStackClient ) {
207
+ sqsTemplate .sendMany (FIFO_RECEIVES_MESSAGE_IN_ORDER_MANY_GROUPS_QUEUE_NAME ,
208
+ createMessagesFromValues (messageGroups .get (index ), values ));
209
+ }
210
+ else {
211
+ sqsTemplate .sendManyAsync (FIFO_RECEIVES_MESSAGE_IN_ORDER_MANY_GROUPS_QUEUE_NAME ,
212
+ createMessagesFromValues (messageGroups .get (index ), values ));
213
+ }
214
+ }
215
+ catch (Exception e ) {
216
+ logger .error ("Error sending messages to queue {}" ,
217
+ FIFO_RECEIVES_MESSAGE_IN_ORDER_MANY_GROUPS_QUEUE_NAME , e );
218
+ throw (RuntimeException ) e ;
219
+ }
210
220
}
211
221
if (index % 10 == 0 ) {
212
222
loadSimulator .runLoad ();
@@ -233,8 +243,8 @@ void stopsProcessingAfterException() throws Exception {
233
243
List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
234
244
.collect (toList ());
235
245
String messageGroupId = UUID .randomUUID ().toString ();
236
- String queueUrl = fetchQueueUrl (FIFO_STOPS_PROCESSING_ON_ERROR_QUEUE_NAME );
237
- sendMessageTo ( queueUrl , values , messageGroupId );
246
+ sqsTemplate . sendMany (FIFO_STOPS_PROCESSING_ON_ERROR_QUEUE_NAME ,
247
+ createMessagesFromValues ( messageGroupId , values ) );
238
248
assertThat (latchContainer .stopsProcessingOnErrorLatch1 .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
239
249
.isTrue ();
240
250
logger .debug ("receivedMessagesBeforeException: {}" , stopsOnErrorListener .receivedMessagesBeforeException );
@@ -263,8 +273,8 @@ void stopsProcessingAfterAckException() throws Exception {
263
273
List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
264
274
.collect (toList ());
265
275
String messageGroupId = UUID .randomUUID ().toString ();
266
- String queueUrl = fetchQueueUrl (FIFO_STOPS_PROCESSING_ON_ACK_ERROR_ERROR_QUEUE_NAME );
267
- sendMessageTo ( queueUrl , values , messageGroupId );
276
+ sqsTemplate . sendMany (FIFO_STOPS_PROCESSING_ON_ACK_ERROR_ERROR_QUEUE_NAME ,
277
+ createMessagesFromValues ( messageGroupId , values ) );
268
278
assertThat (latchContainer .stopsProcessingOnAckErrorLatch1 .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
269
279
.isTrue ();
270
280
logger .debug ("Messages consumed before error: {}" , messagesContainer .stopsProcessingOnAckErrorBeforeThrown );
@@ -289,10 +299,12 @@ void receivesBatchesManyGroups() throws Exception {
289
299
String messageGroupId1 = UUID .randomUUID ().toString ();
290
300
String messageGroupId2 = UUID .randomUUID ().toString ();
291
301
String messageGroupId3 = UUID .randomUUID ().toString ();
292
- String queueUrl = fetchQueueUrl (FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME );
293
- sendMessageTo (queueUrl , values , messageGroupId1 );
294
- sendMessageTo (queueUrl , values , messageGroupId2 );
295
- sendMessageTo (queueUrl , values , messageGroupId3 );
302
+ sqsTemplate .sendMany (FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME ,
303
+ createMessagesFromValues (messageGroupId1 , values ));
304
+ sqsTemplate .sendMany (FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME ,
305
+ createMessagesFromValues (messageGroupId2 , values ));
306
+ sqsTemplate .sendMany (FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME ,
307
+ createMessagesFromValues (messageGroupId3 , values ));
296
308
assertThat (latchContainer .receivesBatchManyGroupsLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
297
309
.isTrue ();
298
310
assertThat (receivesBatchesFromManyGroupsListener .receivedMessages .get (messageGroupId1 ))
@@ -305,21 +317,21 @@ void receivesBatchesManyGroups() throws Exception {
305
317
306
318
@ Test
307
319
void manuallyCreatesContainer () throws Exception {
308
- String queueUrl = fetchQueueUrl (FIFO_MANUALLY_CREATE_CONTAINER_QUEUE_NAME );
309
320
List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
310
321
.collect (toList ());
311
- sendMessageTo (queueUrl , values , UUID .randomUUID ().toString ());
322
+ sqsTemplate .sendMany (FIFO_MANUALLY_CREATE_CONTAINER_QUEUE_NAME ,
323
+ createMessagesFromValues (UUID .randomUUID ().toString (), values ));
312
324
assertThat (latchContainer .manuallyCreatedContainerLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
313
325
.isTrue ();
314
326
assertThat (messagesContainer .manuallyCreatedContainerMessages ).containsExactlyElementsOf (values );
315
327
}
316
328
317
329
@ Test
318
330
void manuallyCreatesBatchContainer () throws Exception {
319
- String queueUrl = fetchQueueUrl (FIFO_MANUALLY_CREATE_BATCH_CONTAINER_QUEUE_NAME );
320
331
List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
321
332
.collect (toList ());
322
- sendMessageTo (queueUrl , values , UUID .randomUUID ().toString ());
333
+ sqsTemplate .sendMany (FIFO_MANUALLY_CREATE_BATCH_CONTAINER_QUEUE_NAME ,
334
+ createMessagesFromValues (UUID .randomUUID ().toString (), values ));
323
335
assertThat (
324
336
latchContainer .manuallyCreatedBatchContainerLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
325
337
.isTrue ();
@@ -328,27 +340,39 @@ void manuallyCreatesBatchContainer() throws Exception {
328
340
329
341
@ Test
330
342
void manuallyCreatesFactory () throws Exception {
331
- String queueUrl = fetchQueueUrl (FIFO_MANUALLY_CREATE_FACTORY_QUEUE_NAME );
332
343
List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
333
344
.collect (toList ());
334
- sendMessageTo (queueUrl , values , UUID .randomUUID ().toString ());
345
+ sqsTemplate .sendMany (FIFO_MANUALLY_CREATE_FACTORY_QUEUE_NAME ,
346
+ createMessagesFromValues (UUID .randomUUID ().toString (), values ));
335
347
assertThat (latchContainer .manuallyCreatedFactoryLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
336
348
.isTrue ();
337
349
assertThat (messagesContainer .manuallyCreatedFactoryMessages ).containsExactlyElementsOf (values );
338
350
}
339
351
340
352
@ Test
341
353
void manuallyCreatesBatchFactory () throws Exception {
342
- String queueUrl = fetchQueueUrl (FIFO_MANUALLY_CREATE_BATCH_FACTORY_QUEUE_NAME );
343
354
List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
344
355
.collect (toList ());
345
- sendMessageTo (queueUrl , values , UUID .randomUUID ().toString ());
356
+ sqsTemplate .sendMany (FIFO_MANUALLY_CREATE_BATCH_FACTORY_QUEUE_NAME ,
357
+ createMessagesFromValues (UUID .randomUUID ().toString (), values ));
346
358
assertThat (
347
359
latchContainer .manuallyCreatedBatchFactoryLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
348
360
.isTrue ();
349
361
assertThat (messagesContainer .manuallyCreatedBatchFactoryMessages ).containsExactlyElementsOf (values );
350
362
}
351
363
364
+ private Message <String > createMessage (String body , String messageGroupId ) {
365
+ return MessageBuilder .withPayload (body )
366
+ .setHeader (SqsHeaders .MessageSystemAttributes .SQS_MESSAGE_GROUP_ID_HEADER , messageGroupId )
367
+ .setHeader (SqsHeaders .MessageSystemAttributes .SQS_MESSAGE_DEDUPLICATION_ID_HEADER ,
368
+ UUID .randomUUID ().toString ())
369
+ .build ();
370
+ }
371
+
372
+ private List <Message <String >> createMessagesFromValues (String messageGroupId , List <String > values ) {
373
+ return values .stream ().map (value -> createMessage (value , messageGroupId )).toList ();
374
+ }
375
+
352
376
static class ReceivesMessageInOrderListener {
353
377
354
378
List <String > receivedMessages = Collections .synchronizedList (new ArrayList <>());
@@ -466,57 +490,7 @@ void listen(List<Message<String>> messages) {
466
490
messages .forEach (msg -> latchContainer .receivesBatchManyGroupsLatch .countDown ());
467
491
logger .trace ("Finished processing messages {} for group id {}" , values , messageGroupId );
468
492
}
469
- }
470
493
471
- private void sendMessageTo (String queueUrl , List <String > messageBodies , String messageGroupId ) {
472
- try {
473
- if (useLocalStackClient ) {
474
- sendManyTo (queueUrl , messageBodies , messageGroupId ).join ();
475
- }
476
- else {
477
- sendManyTo (queueUrl , messageBodies , messageGroupId );
478
- }
479
- }
480
- catch (Exception e ) {
481
- logger .error ("Error sending messages to queue {}" , queueUrl , e );
482
- throw (RuntimeException ) e ;
483
- }
484
- }
485
-
486
- private CompletableFuture <Void > sendManyTo (String queueUrl , List <String > messageBodies , String messageGroupId ) {
487
- return IntStream .range (0 , (int ) Math .ceil (messageBodies .size () / 10. ))
488
- .mapToObj (index -> messageBodies .subList (index * 10 , Math .min ((index + 1 ) * 10 , messageBodies .size ())))
489
- .reduce (CompletableFuture .completedFuture (null ), (previousFuture , messages ) -> previousFuture
490
- .thenCompose (theVoid -> doSendMessageTo (queueUrl , messages , messageGroupId ).thenRun (() -> {
491
- })), (a , b ) -> a );
492
- }
493
-
494
- AtomicInteger messagesSent = new AtomicInteger ();
495
-
496
- private CompletableFuture <SendMessageBatchResponse > doSendMessageTo (String queueUrl , List <String > messageBodies ,
497
- String messageGroupId ) {
498
- return sqsAsyncClient .sendMessageBatch (req -> req
499
- .entries (messageBodies .stream ().map (body -> createEntry (body , messageGroupId )).collect (toList ()))
500
- .queueUrl (queueUrl ).build ()).whenComplete ((v , t ) -> {
501
- if (t != null ) {
502
- logger .error ("Error sending messages" , t );
503
- }
504
- else {
505
- int sent = messagesSent .addAndGet (messageBodies .size ());
506
- if (sent % 1000 == 0 ) {
507
- logger .debug ("Sent {} messages to queue {}" , sent , queueUrl );
508
- }
509
- }
510
- });
511
- }
512
-
513
- private SendMessageBatchRequestEntry createEntry (String body , String messageGroupId ) {
514
- return SendMessageBatchRequestEntry .builder ().messageBody (body ).id (UUID .randomUUID ().toString ())
515
- .messageGroupId (messageGroupId ).messageDeduplicationId (UUID .randomUUID ().toString ()).build ();
516
- }
517
-
518
- private String fetchQueueUrl (String receivesMessageQueueName ) throws InterruptedException , ExecutionException {
519
- return this .sqsAsyncClient .getQueueUrl (req -> req .queueName (receivesMessageQueueName )).get ().queueUrl ();
520
494
}
521
495
522
496
static class LatchContainer {
@@ -799,9 +773,10 @@ ObjectMapper objectMapper() {
799
773
return new ObjectMapper ();
800
774
}
801
775
802
- @ Bean (name = TEST_SQS_ASYNC_CLIENT_BEAN_NAME )
803
- SqsAsyncClient sqsAsyncClientProducer () {
804
- return BaseSqsIntegrationTest .createHighThroughputAsyncClient ();
776
+ @ Bean
777
+ SqsTemplate sqsTemplate () {
778
+ return SqsTemplate .builder ().sqsAsyncClient (BaseSqsIntegrationTest .createHighThroughputAsyncClient ())
779
+ .build ();
805
780
}
806
781
807
782
}
0 commit comments