33import static org .assertj .core .api .Assertions .assertThat ;
44import static software .amazon .lambda .powertools .testutils .Infrastructure .FUNCTION_NAME_OUTPUT ;
55
6- import com .amazon .sqs .javamessaging .AmazonSQSExtendedClient ;
7- import com .amazon .sqs .javamessaging .ExtendedClientConfiguration ;
86import java .io .IOException ;
97import java .io .InputStream ;
108import java .nio .charset .StandardCharsets ;
1412import java .util .Map ;
1513import java .util .UUID ;
1614import java .util .concurrent .TimeUnit ;
15+
1716import org .apache .commons .io .IOUtils ;
1817import org .junit .jupiter .api .AfterAll ;
1918import org .junit .jupiter .api .AfterEach ;
2221import org .junit .jupiter .api .Timeout ;
2322import org .slf4j .Logger ;
2423import org .slf4j .LoggerFactory ;
24+
25+ import com .amazon .sqs .javamessaging .AmazonSQSExtendedClient ;
26+ import com .amazon .sqs .javamessaging .ExtendedClientConfiguration ;
27+
2528import software .amazon .awssdk .http .SdkHttpClient ;
2629import software .amazon .awssdk .http .urlconnection .UrlConnectionHttpClient ;
2730import software .amazon .awssdk .regions .Region ;
@@ -101,21 +104,20 @@ public void reset() {
101104 @ Test
102105 public void bigSQSMessageOffloadedToS3_shouldLoadFromS3 () throws IOException , InterruptedException {
103106 // given
104- final ExtendedClientConfiguration extendedClientConfig =
105- new ExtendedClientConfiguration ()
106- .withPayloadSupportEnabled (s3Client , bucketName );
107- AmazonSQSExtendedClient client =
108- new AmazonSQSExtendedClient (SqsClient .builder ().httpClient (httpClient ).build (), extendedClientConfig );
109- InputStream inputStream = this .getClass ().getResourceAsStream ("/large_sqs_message.txt" );
110- String bigMessage = IOUtils .toString (inputStream , StandardCharsets .UTF_8 );
111-
112- // when
113- client .sendMessage (SendMessageRequest
114- .builder ()
115- .queueUrl (queueUrl )
116- .messageBody (bigMessage )
117- .build ());
118-
107+ final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration ()
108+ .withPayloadSupportEnabled (s3Client , bucketName );
109+ try (AmazonSQSExtendedClient client = new AmazonSQSExtendedClient (
110+ SqsClient .builder ().region (region ).httpClient (httpClient ).build (), extendedClientConfig )) {
111+ InputStream inputStream = this .getClass ().getResourceAsStream ("/large_sqs_message.txt" );
112+ String bigMessage = IOUtils .toString (inputStream , StandardCharsets .UTF_8 );
113+
114+ // when
115+ client .sendMessage (SendMessageRequest
116+ .builder ()
117+ .queueUrl (queueUrl )
118+ .messageBody (bigMessage )
119+ .build ());
120+ }
119121 Thread .sleep (30000 ); // wait for function to be executed
120122
121123 // then
@@ -137,36 +139,37 @@ public void bigSQSMessageOffloadedToS3_shouldLoadFromS3() throws IOException, In
137139 @ Test
138140 public void smallSQSMessage_shouldNotReadFromS3 () throws IOException , InterruptedException {
139141 // given
140- final ExtendedClientConfiguration extendedClientConfig =
141- new ExtendedClientConfiguration ()
142- .withPayloadSupportEnabled (s3Client , bucketName );
143- AmazonSQSExtendedClient client =
144- new AmazonSQSExtendedClient (SqsClient .builder ().httpClient (httpClient ).build (), extendedClientConfig );
145- String message = "Hello World" ;
146-
147- // when
148- client .sendMessage (SendMessageRequest
149- .builder ()
150- .queueUrl (queueUrl )
151- .messageBody (message )
152- .build ());
153-
154- Thread .sleep (30000 ); // wait for function to be executed
155-
156- // then
157- QueryRequest request = QueryRequest
158- .builder ()
159- .tableName (tableName )
160- .keyConditionExpression ("functionName = :func" )
161- .expressionAttributeValues (
162- Collections .singletonMap (":func" , AttributeValue .builder ().s (functionName ).build ()))
163- .build ();
164- QueryResponse response = dynamoDbClient .query (request );
165- List <Map <String , AttributeValue >> items = response .items ();
166- assertThat (items ).hasSize (1 );
167- messageId = items .get (0 ).get ("id" ).s ();
168- assertThat (Integer .valueOf (items .get (0 ).get ("bodySize" ).n ())).isEqualTo (
169- message .getBytes (StandardCharsets .UTF_8 ).length );
170- assertThat (items .get (0 ).get ("bodyMD5" ).s ()).isEqualTo ("b10a8db164e0754105b7a99be72e3fe5" );
142+ final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration ()
143+ .withPayloadSupportEnabled (s3Client , bucketName );
144+ try (AmazonSQSExtendedClient client = new AmazonSQSExtendedClient (
145+ SqsClient .builder ().region (region ).httpClient (httpClient ).build (),
146+ extendedClientConfig )) {
147+ String message = "Hello World" ;
148+
149+ // when
150+ client .sendMessage (SendMessageRequest
151+ .builder ()
152+ .queueUrl (queueUrl )
153+ .messageBody (message )
154+ .build ());
155+
156+ Thread .sleep (30000 ); // wait for function to be executed
157+
158+ // then
159+ QueryRequest request = QueryRequest
160+ .builder ()
161+ .tableName (tableName )
162+ .keyConditionExpression ("functionName = :func" )
163+ .expressionAttributeValues (
164+ Collections .singletonMap (":func" , AttributeValue .builder ().s (functionName ).build ()))
165+ .build ();
166+ QueryResponse response = dynamoDbClient .query (request );
167+ List <Map <String , AttributeValue >> items = response .items ();
168+ assertThat (items ).hasSize (1 );
169+ messageId = items .get (0 ).get ("id" ).s ();
170+ assertThat (Integer .valueOf (items .get (0 ).get ("bodySize" ).n ())).isEqualTo (
171+ message .getBytes (StandardCharsets .UTF_8 ).length );
172+ assertThat (items .get (0 ).get ("bodyMD5" ).s ()).isEqualTo ("b10a8db164e0754105b7a99be72e3fe5" );
173+ }
171174 }
172175}
0 commit comments