55
66// snippet-start:[sqs.java2.sendRecvBatch.main]
77// snippet-start:[sqs.java2.sendRecvBatch.import]
8-
98import org .slf4j .Logger ;
109import org .slf4j .LoggerFactory ;
1110import software .amazon .awssdk .services .sqs .SqsClient ;
2524import software .amazon .awssdk .services .sqs .model .SendMessageBatchResultEntry ;
2625import software .amazon .awssdk .services .sqs .model .SqsException ;
2726
27+ import java .io .BufferedReader ;
2828import java .io .IOException ;
29- import java .nio .file .Files ;
30- import java .nio .file .Path ;
31- import java .nio .file .Paths ;
29+ import java .io .InputStream ;
30+ import java .io .InputStreamReader ;
3231import java .util .ArrayList ;
3332import java .util .HashMap ;
3433import java .util .List ;
@@ -55,24 +54,7 @@ public class SendRecvBatch {
5554
5655
5756 public static void main (String [] args ) {
58- String queueName = "testQueue" + System .currentTimeMillis ();
59- String queueUrl = createQueue (queueName );
60-
61- // Send messages to the queue
62- List <MessageEntry > messages = new ArrayList <>();
63- messages .add (new MessageEntry ("Hello world!" , null ));
64- messages .add (new MessageEntry ("Hello world 2!" , null ));
65- messages .add (new MessageEntry ("Hello world 3!" , null ));
66- SendMessageBatchResponse response = sendMessages (queueUrl , messages );
67-
68- // Receive messages from the queue
69- List <Message > receivedMessages = receiveMessages (queueUrl , 10 , 10 );
70-
71- // Delete messages from the queue
72- deleteMessages (queueUrl , receivedMessages );
73-
74- // Delete the queue
75- deleteQueue (queueUrl );
57+ usageDemo ();
7658 }
7759 // snippet-start:[sqs.java2.sendRecvBatch.sendBatch]
7860 /**
@@ -109,22 +91,22 @@ public static SendMessageBatchResponse sendMessages(
10991
11092 if (!response .successful ().isEmpty ()) {
11193 for (SendMessageBatchResultEntry resultEntry : response .successful ()) {
112- LOGGER .info ("Message sent: " + resultEntry .messageId () + ": " +
94+ LOGGER .info ("Message sent: {}: {}" , resultEntry .messageId (),
11395 messages .get (Integer .parseInt (resultEntry .id ())).getBody ());
11496 }
11597 }
11698
11799 if (!response .failed ().isEmpty ()) {
118100 for (BatchResultErrorEntry errorEntry : response .failed ()) {
119- LOGGER .warn ("Failed to send: " + errorEntry .id () + ": " +
101+ LOGGER .warn ("Failed to send: {}: {}" , errorEntry .id (),
120102 messages .get (Integer .parseInt (errorEntry .id ())).getBody ());
121103 }
122104 }
123105
124106 return response ;
125107
126108 } catch (SqsException e ) {
127- LOGGER .error ("Send messages failed to queue: " + queueUrl , e );
109+ LOGGER .error ("Send messages failed to queue: {}" , queueUrl , e );
128110 throw e ;
129111 }
130112 }
@@ -135,8 +117,8 @@ public static SendMessageBatchResponse sendMessages(
135117 * Receive a batch of messages in a single request from an SQS queue.
136118 *
137119 * @param queueUrl The URL of the queue from which to receive messages.
138- * @param maxNumber The maximum number of messages to receive. The actual number
139- * of messages received might be less.
120+ * @param maxNumber The maximum number of messages to receive (capped at 10 by SQS).
121+ * The actual number of messages received might be less.
140122 * @param waitTime The maximum time to wait (in seconds) before returning. When
141123 * this number is greater than zero, long polling is used. This
142124 * can result in reduced costs and fewer false empty responses.
@@ -155,13 +137,13 @@ public static List<Message> receiveMessages(String queueUrl, int maxNumber, int
155137 List <Message > messages = sqsClient .receiveMessage (receiveRequest ).messages ();
156138
157139 for (Message message : messages ) {
158- LOGGER .info ("Received message: " + message .messageId () + ": " + message .body ());
140+ LOGGER .info ("Received message: {}: {}" , message .messageId (), message .body ());
159141 }
160142
161143 return messages ;
162144
163145 } catch (SqsException e ) {
164- LOGGER .error ("Couldn't receive messages from queue: " + queueUrl , e );
146+ LOGGER .error ("Couldn't receive messages from queue: {}" , queueUrl , e );
165147 throw e ;
166148 }
167149 }
@@ -196,20 +178,20 @@ public static DeleteMessageBatchResponse deleteMessages(String queueUrl, List<Me
196178
197179 if (!response .successful ().isEmpty ()) {
198180 for (DeleteMessageBatchResultEntry resultEntry : response .successful ()) {
199- LOGGER .info ("Deleted " + messages .get (Integer .parseInt (resultEntry .id ())).receiptHandle ());
181+ LOGGER .info ("Deleted {}" , messages .get (Integer .parseInt (resultEntry .id ())).receiptHandle ());
200182 }
201183 }
202184
203185 if (!response .failed ().isEmpty ()) {
204186 for (BatchResultErrorEntry errorEntry : response .failed ()) {
205- LOGGER .warn ("Could not delete " + messages .get (Integer .parseInt (errorEntry .id ())).receiptHandle ());
187+ LOGGER .warn ("Could not delete {}" , messages .get (Integer .parseInt (errorEntry .id ())).receiptHandle ());
206188 }
207189 }
208190
209191 return response ;
210192
211193 } catch (SqsException e ) {
212- LOGGER .error ("Couldn't delete messages from queue " + queueUrl , e );
194+ LOGGER .error ("Couldn't delete messages from queue {}" , queueUrl , e );
213195 throw e ;
214196 }
215197 }
@@ -239,133 +221,121 @@ public Map<String, MessageAttributeValue> getAttributes() {
239221
240222 /**
241223 * Shows how to:
242- * * Read the lines from this Java file and send the lines in
224+ * * Read the lines from a file and send the lines in
243225 * batches of 10 as messages to a queue.
244226 * * Receive the messages in batches until the queue is empty.
245227 * * Reassemble the lines of the file and verify they match the original file.
246228 */
247229 public static void usageDemo () {
248- System .out .println ("-" .repeat (88 ));
249- System .out .println ("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!" );
250- System .out .println ("-" .repeat (88 ));
251-
252- // Create a queue for the demo.
253- String queueName = "sqs-usage-demo-message-wrapper-" +System .currentTimeMillis ();
254- CreateQueueRequest createRequest = CreateQueueRequest .builder ()
255- .queueName (queueName )
256- .build ();
257- String queueUrl = sqsClient .createQueue (createRequest ).queueUrl ();
258- System .out .println ("Created queue: " + queueUrl );
230+ LOGGER .info ("-" .repeat (88 ));
231+ LOGGER .info ("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!" );
232+ LOGGER .info ("-" .repeat (88 ));
259233
234+ String queueUrl = null ;
260235 try {
261- // Read the lines from this Java file.
262- Path projectRoot = Paths .get (System .getProperty ("user.dir" ));
263- Path filePath = projectRoot .resolve ("src/main/java/com/example/sqs/SendRecvBatch.java" );
264- List <String > lines = Files .readAllLines (filePath );
236+ // Create a queue for the demo.
237+ String queueName = "sqs-usage-demo-message-wrapper-" + System .currentTimeMillis ();
238+ CreateQueueRequest createRequest = CreateQueueRequest .builder ()
239+ .queueName (queueName )
240+ .build ();
241+ queueUrl = sqsClient .createQueue (createRequest ).queueUrl ();
242+ LOGGER .info ("Created queue: {}" , queueUrl );
243+
244+ try (InputStream inputStream = SendRecvBatch .class .getResourceAsStream ("/log4j2.xml" );
245+ BufferedReader reader = new BufferedReader (new InputStreamReader (inputStream ))) {
246+
247+ List <String > lines = reader .lines ().toList ();
248+
249+ // Send file lines in batches.
250+ int batchSize = 10 ;
251+ LOGGER .info ("Sending file lines in batches of {} as messages." , batchSize );
252+
253+ for (int i = 0 ; i < lines .size (); i += batchSize ) {
254+ List <MessageEntry > messageBatch = new ArrayList <>();
255+
256+ for (int j = i ; j < Math .min (i + batchSize , lines .size ()); j ++) {
257+ String line = lines .get (j );
258+ if (line == null || line .trim ().isEmpty ()) {
259+ continue ; // Skip empty lines.
260+ }
261+
262+ Map <String , MessageAttributeValue > attributes = new HashMap <>();
263+ attributes .put ("line" , MessageAttributeValue .builder ()
264+ .dataType ("String" )
265+ .stringValue (String .valueOf (j ))
266+ .build ());
267+
268+ messageBatch .add (new MessageEntry (lines .get (j ), attributes ));
269+ }
265270
271+ sendMessages (queueUrl , messageBatch );
272+ System .out .print ("." );
273+ System .out .flush ();
274+ }
275+
276+ LOGGER .info ("\n Done. Sent {} messages." , lines .size ());
266277
267- // Send file lines in batches.
268- int batchSize = 10 ;
269- System .out .println ("Sending file lines in batches of " + batchSize + " as messages." );
278+ // Receive and process messages.
279+ LOGGER .info ("Receiving, handling, and deleting messages in batches of {}." , batchSize );
280+ String [] receivedLines = new String [lines .size ()];
281+ boolean moreMessages = true ;
270282
271- for ( int i = 0 ; i < lines . size (); i += batchSize ) {
272- List <MessageEntry > messageBatch = new ArrayList <>( );
283+ while ( moreMessages ) {
284+ List <Message > receivedMessages = receiveMessages ( queueUrl , batchSize , 5 );
273285
274- for (int j = i ; j < Math .min (i + batchSize , lines .size ()); j ++) {
275- String line = lines .get (j );
276- if (line == null || line .trim ().isEmpty ()) {
277- continue ; // Skip empty lines.
286+ for (Message message : receivedMessages ) {
287+ int lineNumber = Integer .parseInt (message .messageAttributes ().get ("line" ).stringValue ());
288+ receivedLines [lineNumber ] = message .body ();
278289 }
279290
280- Map <String , MessageAttributeValue > attributes = new HashMap <>();
281- attributes .put ("path" , MessageAttributeValue .builder ()
282- .dataType ("String" )
283- .stringValue (filePath .toString ())
284- .build ());
285- attributes .put ("line" , MessageAttributeValue .builder ()
286- .dataType ("String" )
287- .stringValue (String .valueOf (j ))
288- .build ());
289-
290- messageBatch .add (new MessageEntry (lines .get (j ), attributes ));
291+ if (!receivedMessages .isEmpty ()) {
292+ deleteMessages (queueUrl , receivedMessages );
293+ } else {
294+ moreMessages = false ;
295+ }
291296 }
292297
293- sendMessages (queueUrl , messageBatch );
294- System .out .print ("." );
295- System .out .flush ();
296- }
298+ LOGGER .info ("\n Done." );
297299
298- System .out .println ("\n Done. Sent " + lines .size () + " messages." );
300+ // Verify that all lines were received correctly.
301+ boolean allLinesMatch = true ;
302+ for (int i = 0 ; i < lines .size (); i ++) {
303+ String originalLine = lines .get (i );
304+ String receivedLine = receivedLines [i ] == null ? "" : receivedLines [i ];
299305
300- // Receive and process messages.
301- System .out .println ("Receiving, handling, and deleting messages in batches of " + batchSize + "." );
302- String [] receivedLines = new String [lines .size ()];
303- boolean moreMessages = true ;
304-
305- while (moreMessages ) {
306- List <Message > receivedMessages = receiveMessages (queueUrl , batchSize , 5 );
307- System .out .print ("." );
308- System .out .flush ();
309-
310- for (Message message : receivedMessages ) {
311- int lineNumber = Integer .parseInt (message .messageAttributes ().get ("line" ).stringValue ());
312- receivedLines [lineNumber ] = message .body ();
306+ if (!originalLine .equals (receivedLine )) {
307+ allLinesMatch = false ;
308+ break ;
309+ }
313310 }
314311
315- if (! receivedMessages . isEmpty () ) {
316- deleteMessages ( queueUrl , receivedMessages );
312+ if (allLinesMatch ) {
313+ LOGGER . info ( "Successfully reassembled all file lines!" );
317314 } else {
318- moreMessages = false ;
315+ LOGGER . info ( "Uh oh, some lines were missed!" ) ;
319316 }
320317 }
321-
322- System .out .println ("\n Done." );
323-
324- // Verify all lines were received correctly.
325- boolean allLinesMatch = true ;
326- for (int i = 0 ; i < lines .size (); i ++) {
327- String originalLine = lines .get (i );
328- String receivedLine = receivedLines [i ] == null ? "" : receivedLines [i ];
329-
330- if (!originalLine .equals (receivedLine )) {
331- allLinesMatch = false ;
332- break ;
318+ } catch (SqsException e ) {
319+ LOGGER .error ("SQS operation failed" , e );
320+ } catch (RuntimeException | IOException e ) {
321+ LOGGER .error ("Unexpected runtime error during demo" , e );
322+ } finally {
323+ // Clean up by deleting the queue if it was created.
324+ if (queueUrl != null ) {
325+ try {
326+ DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest .builder ()
327+ .queueUrl (queueUrl )
328+ .build ();
329+ sqsClient .deleteQueue (deleteQueueRequest );
330+ LOGGER .info ("Deleted queue: {}" , queueUrl );
331+ } catch (SqsException e ) {
332+ LOGGER .error ("Failed to delete queue: {}" , queueUrl , e );
333333 }
334334 }
335-
336- if (allLinesMatch ) {
337- System .out .println ("Successfully reassembled all file lines!" );
338- } else {
339- System .out .println ("Uh oh, some lines were missed!" );
340- }
341-
342- } catch (IOException e ) {
343- LOGGER .error ("Error reading file" , e );
344- } finally {
345- // Clean up by deleting the queue.
346- DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest .builder ()
347- .queueUrl (queueUrl )
348- .build ();
349- sqsClient .deleteQueue (deleteQueueRequest );
350- System .out .println ("Deleted queue: " + queueUrl );
351335 }
352336
353- System .out .println ("Thanks for watching!" );
354- System .out .println ("-" .repeat (88 ));
355- }
356-
357- private static String createQueue (String queueName ) {
358- CreateQueueRequest createRequest = CreateQueueRequest .builder ()
359- .queueName (queueName )
360- .build ();
361- return sqsClient .createQueue (createRequest ).queueUrl ();
362- }
363-
364- private static void deleteQueue (String queueUrl ) {
365- DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest .builder ()
366- .queueUrl (queueUrl )
367- .build ();
368- sqsClient .deleteQueue (deleteQueueRequest );
337+ LOGGER .info ("Thanks for watching!" );
338+ LOGGER .info ("-" .repeat (88 ));
369339 }
370340 }
371341// snippet-end:[sqs.java2.sendRecvBatch.scenario]
0 commit comments