Skip to content

Commit 7f2580a

Browse files
author
Tejas Ganesh Naik
committed
Send and Receive SQS Java Follow
1 parent ea52583 commit 7f2580a

File tree

4 files changed

+472
-1
lines changed

4 files changed

+472
-1
lines changed

.doc_gen/metadata/sqs_metadata.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -866,7 +866,16 @@ sqs_Scenario_SendReceiveBatch:
866866
- python.example_code.sqs.DeleteMessageBatch
867867
- description: Use the wrapper functions to send and receive messages in batches.
868868
snippet_tags:
869-
- python.example_code.sqs.Scenario_SendReceiveBatch
869+
- python.example_code.sqs.Scenario_SendReceiveBatch
870+
Java:
871+
versions:
872+
- sdk_version: 2
873+
github: javav2/example_code/sqs
874+
sdkguide:
875+
excerpts:
876+
- description: Create functions to wrap &SQS; message functions and use them to send and receive messages in batches.
877+
snippet_tags:
878+
- sqs.java2.sendRecvBatch.main
870879
services:
871880
sqs: {CreateQueue, DeleteQueue, SendMessageBatch, ReceiveMessage, DeleteMessageBatch}
872881
sqs_GetQueueAttributes:

javav2/example_code/sqs/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ functions within the same service.
5959
- [Publish messages to queues](../../usecases/topics_and_queues/src/main/java/com/example/sns/SNSWorkflow.java)
6060
- [Use the Amazon SQS Java Messaging Library to work with the JMS interface](../sqs-jms/src/main/java/com/example/sqs/jms/stdqueue/TextMessageSender.java)
6161
- [Work with queue tags](src/main/java/com/example/sqs/TagExamples.java)
62+
- [Send and receive batches of messages](src/main/java/com/example/sqs/TagExamples.java)
6263

6364

6465
<!--custom.examples.start-->
Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package com.example.sqs;
5+
6+
// snippet-start:[sqs.java2.sendRecvBatch.main]
7+
// snippet-start:[sqs.java2.sendRecvBatch.import]
8+
import software.amazon.awssdk.regions.Region;
9+
import software.amazon.awssdk.services.sqs.SqsClient;
10+
import software.amazon.awssdk.services.sqs.model.*;
11+
12+
import java.io.IOException;
13+
import java.nio.file.Files;
14+
import java.nio.file.Path;
15+
import java.nio.file.Paths;
16+
import java.util.ArrayList;
17+
import java.util.HashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.logging.Level;
21+
import java.util.logging.Logger;
22+
// snippet-end:[sqs.java2.sendRecvBatch.import]
23+
24+
25+
/**
26+
* Before running this Java V2 code example, set up your development
27+
* environment, including your credentials.
28+
*
29+
* For more information, see the following documentation topic:
30+
*
31+
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
32+
*/
33+
34+
/**
35+
* This code demonstrates basic message operations in Amazon Simple Queue Service (Amazon SQS).
36+
*/
37+
38+
public class SendRecvBatch {
39+
private static final Logger logger = Logger.getLogger(SendRecvBatch.class.getName());
40+
static {
41+
Logger rootLogger = Logger.getLogger("");
42+
rootLogger.setLevel(Level.WARNING);
43+
}
44+
private static final SqsClient sqsClient = SqsClient.builder()
45+
.region(Region.US_WEST_2)
46+
.build();
47+
48+
// snippet-start:[sqs.java2.sendRecvBatch.sendBatch]
49+
/**
50+
* Send a batch of messages in a single request to an SQS queue.
51+
* This request may return overall success even when some messages were not sent.
52+
* The caller must inspect the Successful and Failed lists in the response and
53+
* resend any failed messages.
54+
*
55+
* @param queueUrl The URL of the queue to receive the messages.
56+
* @param messages The messages to send to the queue. Each message contains a body and attributes.
57+
* @return The response from SQS that contains the list of successful and failed messages.
58+
*/
59+
public static SendMessageBatchResponse sendMessages(
60+
String queueUrl, List<MessageEntry> messages) {
61+
62+
try {
63+
List<SendMessageBatchRequestEntry> entries = new ArrayList<>();
64+
65+
for (int i = 0; i < messages.size(); i++) {
66+
MessageEntry message = messages.get(i);
67+
entries.add(SendMessageBatchRequestEntry.builder()
68+
.id(String.valueOf(i))
69+
.messageBody(message.getBody())
70+
.messageAttributes(message.getAttributes())
71+
.build());
72+
}
73+
74+
SendMessageBatchRequest sendBatchRequest = SendMessageBatchRequest.builder()
75+
.queueUrl(queueUrl)
76+
.entries(entries)
77+
.build();
78+
79+
SendMessageBatchResponse response = sqsClient.sendMessageBatch(sendBatchRequest);
80+
81+
if (!response.successful().isEmpty()) {
82+
for (SendMessageBatchResultEntry resultEntry : response.successful()) {
83+
logger.info("Message sent: " + resultEntry.messageId() + ": " +
84+
messages.get(Integer.parseInt(resultEntry.id())).getBody());
85+
}
86+
}
87+
88+
if (!response.failed().isEmpty()) {
89+
for (BatchResultErrorEntry errorEntry : response.failed()) {
90+
logger.warning("Failed to send: " + errorEntry.id() + ": " +
91+
messages.get(Integer.parseInt(errorEntry.id())).getBody());
92+
}
93+
}
94+
95+
return response;
96+
97+
} catch (SqsException e) {
98+
logger.log(Level.SEVERE, "Send messages failed to queue: " + queueUrl, e);
99+
throw e;
100+
}
101+
}
102+
// snippet-end:[sqs.java2.sendRecvBatch.sendBatch]
103+
104+
// snippet-start:[sqs.java2.sendRecvBatch.recvBatch]
105+
/**
106+
* Receive a batch of messages in a single request from an SQS queue.
107+
*
108+
* @param queueUrl The URL of the queue from which to receive messages.
109+
* @param maxNumber The maximum number of messages to receive. The actual number
110+
* of messages received might be less.
111+
* @param waitTime The maximum time to wait (in seconds) before returning. When
112+
* this number is greater than zero, long polling is used. This
113+
* can result in reduced costs and fewer false empty responses.
114+
* @return The list of Message objects received. These each contain the body
115+
* of the message and metadata and custom attributes.
116+
*/
117+
public static List<Message> receiveMessages(String queueUrl, int maxNumber, int waitTime) {
118+
try {
119+
ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
120+
.queueUrl(queueUrl)
121+
.maxNumberOfMessages(maxNumber)
122+
.waitTimeSeconds(waitTime)
123+
.messageAttributeNames("All")
124+
.build();
125+
126+
List<Message> messages = sqsClient.receiveMessage(receiveRequest).messages();
127+
128+
for (Message message : messages) {
129+
logger.info("Received message: " + message.messageId() + ": " + message.body());
130+
}
131+
132+
return messages;
133+
134+
} catch (SqsException e) {
135+
logger.log(Level.SEVERE, "Couldn't receive messages from queue: " + queueUrl, e);
136+
throw e;
137+
}
138+
}
139+
// snippet-end:[sqs.java2.sendRecvBatch.recvBatch]
140+
141+
// snippet-start:[sqs.java2.sendRecvBatch.delBatch]
142+
/**
143+
* Delete a batch of messages from a queue in a single request.
144+
*
145+
* @param queueUrl The URL of the queue from which to delete the messages.
146+
* @param messages The list of messages to delete.
147+
* @return The response from SQS that contains the list of successful and failed
148+
* message deletions.
149+
*/
150+
public static DeleteMessageBatchResponse deleteMessages(String queueUrl, List<Message> messages) {
151+
try {
152+
List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
153+
154+
for (int i = 0; i < messages.size(); i++) {
155+
entries.add(DeleteMessageBatchRequestEntry.builder()
156+
.id(String.valueOf(i))
157+
.receiptHandle(messages.get(i).receiptHandle())
158+
.build());
159+
}
160+
161+
DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder()
162+
.queueUrl(queueUrl)
163+
.entries(entries)
164+
.build();
165+
166+
DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(deleteRequest);
167+
168+
if (!response.successful().isEmpty()) {
169+
for (DeleteMessageBatchResultEntry resultEntry : response.successful()) {
170+
logger.info("Deleted " + messages.get(Integer.parseInt(resultEntry.id())).receiptHandle());
171+
}
172+
}
173+
174+
if (!response.failed().isEmpty()) {
175+
for (BatchResultErrorEntry errorEntry : response.failed()) {
176+
logger.warning("Could not delete " + messages.get(Integer.parseInt(errorEntry.id())).receiptHandle());
177+
}
178+
}
179+
180+
return response;
181+
182+
} catch (SqsException e) {
183+
logger.log(Level.SEVERE, "Couldn't delete messages from queue " + queueUrl, e);
184+
throw e;
185+
}
186+
}
187+
// snippet-end:[sqs.java2.sendRecvBatch.delBatch]
188+
189+
// snippet-start:[sqs.java2.sendRecvBatch.scenario]
190+
/**
191+
* Helper class to represent a message with body and attributes.
192+
*/
193+
public static class MessageEntry {
194+
private final String body;
195+
private final Map<String, MessageAttributeValue> attributes;
196+
197+
public MessageEntry(String body, Map<String, MessageAttributeValue> attributes) {
198+
this.body = body;
199+
this.attributes = attributes != null ? attributes : new HashMap<>();
200+
}
201+
202+
public String getBody() {
203+
return body;
204+
}
205+
206+
public Map<String, MessageAttributeValue> getAttributes() {
207+
return attributes;
208+
}
209+
}
210+
211+
/**
212+
* Shows how to:
213+
* * Read the lines from this Java file and send the lines in
214+
* batches of 10 as messages to a queue.
215+
* * Receive the messages in batches until the queue is empty.
216+
* * Reassemble the lines of the file and verify they match the original file.
217+
*/
218+
public static void usageDemo() {
219+
System.out.println("-".repeat(88));
220+
System.out.println("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!");
221+
System.out.println("-".repeat(88));
222+
223+
// Create a queue for the demo
224+
String queueName = "sqs-usage-demo-message-wrapper-"+System.currentTimeMillis();
225+
CreateQueueRequest createRequest = CreateQueueRequest.builder()
226+
.queueName(queueName)
227+
.build();
228+
String queueUrl = sqsClient.createQueue(createRequest).queueUrl();
229+
System.out.println("Created queue: " + queueUrl);
230+
231+
try {
232+
// Read the lines from this Java file
233+
Path projectRoot = Paths.get(System.getProperty("user.dir"));
234+
Path filePath = projectRoot.resolve("src/main/java/com/example/sqs/SendRecvBatch.java");
235+
List<String> lines = Files.readAllLines(filePath);
236+
237+
238+
// Send file lines in batches
239+
int batchSize = 10;
240+
System.out.println("Sending file lines in batches of " + batchSize + " as messages.");
241+
242+
for (int i = 0; i < lines.size(); i += batchSize) {
243+
List<MessageEntry> messageBatch = new ArrayList<>();
244+
245+
for (int j = i; j < Math.min(i + batchSize, lines.size()); j++) {
246+
String line = lines.get(j);
247+
if (line == null || line.trim().isEmpty()) {
248+
continue; // Skip empty lines
249+
}
250+
251+
Map<String, MessageAttributeValue> attributes = new HashMap<>();
252+
attributes.put("path", MessageAttributeValue.builder()
253+
.dataType("String")
254+
.stringValue(filePath.toString())
255+
.build());
256+
attributes.put("line", MessageAttributeValue.builder()
257+
.dataType("String")
258+
.stringValue(String.valueOf(j))
259+
.build());
260+
261+
messageBatch.add(new MessageEntry(lines.get(j), attributes));
262+
}
263+
264+
sendMessages(queueUrl, messageBatch);
265+
System.out.print(".");
266+
System.out.flush();
267+
}
268+
269+
System.out.println("\nDone. Sent " + lines.size() + " messages.");
270+
271+
// Receive and process messages
272+
System.out.println("Receiving, handling, and deleting messages in batches of " + batchSize + ".");
273+
String[] receivedLines = new String[lines.size()];
274+
boolean moreMessages = true;
275+
276+
while (moreMessages) {
277+
List<Message> receivedMessages = receiveMessages(queueUrl, batchSize, 5);
278+
System.out.print(".");
279+
System.out.flush();
280+
281+
for (Message message : receivedMessages) {
282+
int lineNumber = Integer.parseInt(message.messageAttributes().get("line").stringValue());
283+
receivedLines[lineNumber] = message.body();
284+
}
285+
286+
if (!receivedMessages.isEmpty()) {
287+
deleteMessages(queueUrl, receivedMessages);
288+
} else {
289+
moreMessages = false;
290+
}
291+
}
292+
293+
System.out.println("\nDone.");
294+
295+
// Verify all lines were received correctly
296+
boolean allLinesMatch = true;
297+
for (int i = 0; i < lines.size(); i++) {
298+
String originalLine = lines.get(i);
299+
String receivedLine = receivedLines[i] == null ? "" : receivedLines[i];
300+
301+
if (!originalLine.equals(receivedLine)) {
302+
allLinesMatch = false;
303+
break;
304+
}
305+
}
306+
307+
if (allLinesMatch) {
308+
System.out.println("Successfully reassembled all file lines!");
309+
} else {
310+
System.out.println("Uh oh, some lines were missed!");
311+
}
312+
313+
} catch (IOException e) {
314+
logger.log(Level.SEVERE, "Error reading file", e);
315+
} finally {
316+
// Clean up by deleting the queue
317+
DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder()
318+
.queueUrl(queueUrl)
319+
.build();
320+
sqsClient.deleteQueue(deleteQueueRequest);
321+
System.out.println("Deleted queue: " + queueUrl);
322+
}
323+
324+
System.out.println("Thanks for watching!");
325+
System.out.println("-".repeat(88));
326+
}
327+
328+
public static void main(String[] args) {
329+
usageDemo();
330+
}
331+
}
332+
// snippet-end:[sqs.java2.sendRecvBatch.scenario]
333+
// snippet-end:[sqs.java2.sendRecvBatch.main]

0 commit comments

Comments
 (0)