Skip to content

Commit 71c632b

Browse files
authored
feat(large-messages): Add function interface for Large Messages Utility (#2257)
* Add functional API for LargeMessages utility. * Add E2E tests for functional large messages. * Use format specifiers for logging in LargeMessageE2ET.java.
1 parent f543b68 commit 71c632b

File tree

15 files changed

+810
-105
lines changed

15 files changed

+810
-105
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<parent>
6+
<groupId>software.amazon.lambda</groupId>
7+
<artifactId>e2e-test-handlers-parent</artifactId>
8+
<version>2.5.0</version>
9+
</parent>
10+
11+
<artifactId>e2e-test-handler-largemessage-functional</artifactId>
12+
<packaging>jar</packaging>
13+
<name>E2E test handler – Large message functional</name>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>software.amazon.awssdk</groupId>
18+
<artifactId>dynamodb</artifactId>
19+
</dependency>
20+
<dependency>
21+
<groupId>software.amazon.lambda</groupId>
22+
<artifactId>powertools-large-messages</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>software.amazon.lambda</groupId>
26+
<artifactId>powertools-logging-log4j</artifactId>
27+
</dependency>
28+
<dependency>
29+
<groupId>software.amazon.lambda</groupId>
30+
<artifactId>powertools-logging</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>com.amazonaws</groupId>
34+
<artifactId>aws-lambda-java-events</artifactId>
35+
</dependency>
36+
</dependencies>
37+
38+
<build>
39+
<plugins>
40+
<plugin>
41+
<groupId>org.apache.maven.plugins</groupId>
42+
<artifactId>maven-shade-plugin</artifactId>
43+
</plugin>
44+
</plugins>
45+
</build>
46+
</project>
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2023 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package software.amazon.lambda.powertools.e2e;
16+
17+
import static software.amazon.lambda.powertools.logging.PowertoolsLogging.withLogging;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import com.amazonaws.services.lambda.runtime.Context;
24+
import com.amazonaws.services.lambda.runtime.RequestHandler;
25+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
26+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
27+
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
28+
29+
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
30+
import software.amazon.awssdk.regions.Region;
31+
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
32+
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
33+
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
34+
import software.amazon.awssdk.utils.BinaryUtils;
35+
import software.amazon.awssdk.utils.Md5Utils;
36+
import software.amazon.lambda.powertools.largemessages.LargeMessages;
37+
38+
public class Function implements RequestHandler<SQSEvent, SQSBatchResponse> {
39+
40+
private static final String TABLE_FOR_ASYNC_TESTS = System.getenv("TABLE_FOR_ASYNC_TESTS");
41+
private DynamoDbClient client;
42+
43+
public Function() {
44+
if (client == null) {
45+
client = DynamoDbClient.builder()
46+
.httpClient(UrlConnectionHttpClient.builder().build())
47+
.region(Region.of(System.getenv("AWS_REGION")))
48+
.build();
49+
}
50+
}
51+
52+
public SQSBatchResponse handleRequest(SQSEvent event, Context context) {
53+
return withLogging(context, () -> {
54+
for (SQSMessage message : event.getRecords()) {
55+
LargeMessages.processLargeMessage(message, msg -> processRawMessage(msg, context));
56+
}
57+
return SQSBatchResponse.builder().build();
58+
});
59+
}
60+
61+
private Void processRawMessage(SQSMessage sqsMessage, Context context) {
62+
String bodyMD5 = md5(sqsMessage.getBody());
63+
if (!sqsMessage.getMd5OfBody().equals(bodyMD5)) {
64+
throw new SecurityException(
65+
String.format("message digest does not match, expected %s, got %s", sqsMessage.getMd5OfBody(),
66+
bodyMD5));
67+
}
68+
69+
Map<String, AttributeValue> item = new HashMap<>();
70+
item.put("functionName", AttributeValue.builder().s(context.getFunctionName()).build());
71+
item.put("id", AttributeValue.builder().s(sqsMessage.getMessageId()).build());
72+
item.put("bodyMD5", AttributeValue.builder().s(bodyMD5).build());
73+
item.put("bodySize",
74+
AttributeValue.builder().n(String.valueOf(sqsMessage.getBody().getBytes(StandardCharsets.UTF_8).length))
75+
.build());
76+
77+
client.putItem(PutItemRequest.builder().tableName(TABLE_FOR_ASYNC_TESTS).item(item).build());
78+
79+
return null;
80+
}
81+
82+
private String md5(String message) {
83+
return BinaryUtils.toHex(Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8)));
84+
}
85+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<Configuration>
3+
<Appenders>
4+
<Console name="JsonAppender" target="SYSTEM_OUT">
5+
<JsonTemplateLayout eventTemplateUri="classpath:LambdaJsonLayout.json" />
6+
</Console>
7+
</Appenders>
8+
<Loggers>
9+
<Root level="INFO">
10+
<AppenderRef ref="JsonAppender"/>
11+
</Root>
12+
<Logger name="JsonLogger" level="INFO" additivity="false">
13+
<AppenderRef ref="JsonAppender"/>
14+
</Logger>
15+
</Loggers>
16+
</Configuration>

powertools-e2e-tests/handlers/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
<modules>
2828
<module>batch</module>
2929
<module>largemessage</module>
30+
<module>largemessage-functional</module>
3031
<module>largemessage_idempotent</module>
3132
<module>logging-log4j</module>
3233
<module>logging-logback</module>

powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
import org.apache.commons.io.IOUtils;
1717
import org.junit.jupiter.api.AfterAll;
1818
import org.junit.jupiter.api.AfterEach;
19-
import org.junit.jupiter.api.BeforeAll;
20-
import org.junit.jupiter.api.Test;
19+
import org.junit.jupiter.api.TestInstance;
2120
import org.junit.jupiter.api.Timeout;
21+
import org.junit.jupiter.params.ParameterizedTest;
22+
import org.junit.jupiter.params.provider.ValueSource;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

@@ -40,6 +41,7 @@
4041
import software.amazon.lambda.powertools.testutils.Infrastructure;
4142
import software.amazon.lambda.powertools.testutils.RetryUtils;
4243

44+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
4345
class LargeMessageE2ET {
4446

4547
private static final Logger LOG = LoggerFactory.getLogger(LargeMessageE2ET.class);
@@ -55,45 +57,59 @@ class LargeMessageE2ET {
5557
.region(region)
5658
.build();
5759

58-
private static Infrastructure infrastructure;
59-
private static String functionName;
60-
private static String bucketName;
61-
private static String queueUrl;
62-
private static String tableName;
60+
private Infrastructure infrastructure;
61+
private String functionName;
62+
private String bucketName;
63+
private String queueUrl;
64+
private String tableName;
6365
private String messageId;
66+
private String currentPathToFunction;
67+
68+
private void setupInfrastructure(String pathToFunction) {
69+
// Do not re-deploy the same function
70+
if (pathToFunction.equals(currentPathToFunction)) {
71+
return;
72+
}
73+
74+
// Destroy any existing infrastructure before re-deploying
75+
if (infrastructure != null) {
76+
infrastructure.destroy();
77+
}
6478

65-
@BeforeAll
66-
@Timeout(value = 5, unit = TimeUnit.MINUTES)
67-
static void setup() {
6879
String random = UUID.randomUUID().toString().substring(0, 6);
6980
bucketName = "largemessagebucket" + random;
7081
String queueName = "largemessagequeue" + random;
7182

7283
infrastructure = Infrastructure.builder()
73-
.testName(LargeMessageE2ET.class.getSimpleName())
84+
.testName(LargeMessageE2ET.class.getSimpleName() + "-" + pathToFunction)
7485
.queue(queueName)
7586
.largeMessagesBucket(bucketName)
76-
.pathToFunction("largemessage")
87+
.pathToFunction(pathToFunction)
7788
.timeoutInSeconds(60)
7889
.build();
7990

8091
Map<String, String> outputs = infrastructure.deploy();
8192
functionName = outputs.get(FUNCTION_NAME_OUTPUT);
8293
queueUrl = outputs.get("QueueURL");
8394
tableName = outputs.get("TableNameForAsyncTests");
95+
currentPathToFunction = pathToFunction;
8496

85-
LOG.info("Testing '" + LargeMessageE2ET.class.getSimpleName() + "'");
97+
LOG.info("Testing '{}' with {}", LargeMessageE2ET.class.getSimpleName(), pathToFunction);
8698
}
8799

88100
@AfterAll
89-
static void tearDown() {
101+
void cleanup() {
90102
if (infrastructure != null) {
91103
infrastructure.destroy();
92104
}
93105
}
94106

95107
@AfterEach
96-
void reset() {
108+
void tearDown() {
109+
reset();
110+
}
111+
112+
private void reset() {
97113
if (messageId != null) {
98114
Map<String, AttributeValue> itemToDelete = new HashMap<>();
99115
itemToDelete.put("functionName", AttributeValue.builder().s(functionName).build());
@@ -103,8 +119,12 @@ void reset() {
103119
}
104120
}
105121

106-
@Test
107-
void bigSQSMessageOffloadedToS3_shouldLoadFromS3() throws IOException {
122+
@ParameterizedTest
123+
@ValueSource(strings = { "largemessage", "largemessage-functional" })
124+
@Timeout(value = 5, unit = TimeUnit.MINUTES)
125+
void bigSQSMessageOffloadedToS3_shouldLoadFromS3(String pathToFunction) throws IOException {
126+
setupInfrastructure(pathToFunction);
127+
108128
// GIVEN
109129
final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration()
110130
.withPayloadSupportEnabled(s3Client, bucketName);
@@ -146,8 +166,12 @@ void bigSQSMessageOffloadedToS3_shouldLoadFromS3() throws IOException {
146166
assertThat(items.get(0).get("bodyMD5").s()).isEqualTo("22bde5e7b05fa80bc7be45bdd4bc6c75");
147167
}
148168

149-
@Test
150-
void smallSQSMessage_shouldNotReadFromS3() {
169+
@ParameterizedTest
170+
@ValueSource(strings = { "largemessage", "largemessage-functional" })
171+
@Timeout(value = 5, unit = TimeUnit.MINUTES)
172+
void smallSQSMessage_shouldNotReadFromS3(String pathToFunction) {
173+
setupInfrastructure(pathToFunction);
174+
151175
// GIVEN
152176
final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration()
153177
.withPayloadSupportEnabled(s3Client, bucketName);

powertools-large-messages/pom.xml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
-->
1515

1616
<project xmlns="http://maven.apache.org/POM/4.0.0"
17-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
17+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1919
<modelVersion>4.0.0</modelVersion>
2020

21-
<description>A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier.</description>
21+
<description>A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier.</description>
2222

2323
<parent>
2424
<groupId>software.amazon.lambda</groupId>
@@ -41,6 +41,13 @@
4141
<groupId>software.amazon.lambda</groupId>
4242
<artifactId>powertools-common</artifactId>
4343
</dependency>
44+
<dependency>
45+
<groupId>software.amazon.lambda</groupId>
46+
<artifactId>powertools-common</artifactId>
47+
<version>${project.version}</version>
48+
<type>test-jar</type>
49+
<scope>test</scope>
50+
</dependency>
4451
<dependency>
4552
<groupId>com.amazonaws</groupId>
4653
<artifactId>aws-lambda-java-events</artifactId>
@@ -102,6 +109,11 @@
102109
<artifactId>mockito-core</artifactId>
103110
<scope>test</scope>
104111
</dependency>
112+
<dependency>
113+
<groupId>org.mockito</groupId>
114+
<artifactId>mockito-junit-jupiter</artifactId>
115+
<scope>test</scope>
116+
</dependency>
105117
<dependency>
106118
<groupId>org.slf4j</groupId>
107119
<artifactId>slf4j-simple</artifactId>

powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
import java.lang.annotation.Target;
2121

2222
/**
23-
* <p>Use this annotation to handle large messages (> 256 KB) from SQS or SNS.
23+
* <p>Use this annotation to handle large messages (> 1 MB) from SQS or SNS.
2424
* When large messages are sent to an SQS Queue or SNS Topic, they are offloaded to S3 and only a reference is passed in the message/record.</p>
2525
*
2626
* <p>{@code @LargeMessage} automatically retrieves and deletes messages
2727
* which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} or {@code amazon-sns-java-extended-client-lib}
2828
* client libraries.</p>
2929
*
30-
* <p>This version of the {@code @LargeMessage} is compatible with version
31-
* 1.1.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.</p>
30+
* <p>This version of the {@code @LargeMessage} is compatible with version 1.1.0+ and 2.0.0+
31+
* of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.</p>
3232
* <br/>
3333
* <p>Put this annotation on a method where the first parameter is either a {@link com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage} or {@link com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord}.
3434
* <br/>
@@ -54,9 +54,11 @@
5454
* </pre>
5555
* </p>
5656
*
57-
* <p><b>Note 1</b>: Retrieving payloads and deleting objects from S3 will increase the duration of the
57+
* <p><b>Note 1</b>: The message object (SQSMessage or SNSRecord) is modified in-place to avoid duplicating
58+
* the large blob in memory. The message body will be replaced with the S3 object content.</p>
59+
* <p><b>Note 2</b>: Retrieving payloads and deleting objects from S3 will increase the duration of the
5860
* Lambda function.</p>
59-
* <p><b>Note 2</b>: Make sure to configure your function with enough memory to be able to retrieve S3 objects.</p>
61+
* <p><b>Note 3</b>: Make sure to configure your function with enough memory to be able to retrieve S3 objects.</p>
6062
*/
6163
@Retention(RetentionPolicy.RUNTIME)
6264
@Target(ElementType.METHOD)

0 commit comments

Comments
 (0)