1+ /*
2+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License").
5+ * You may not use this file except in compliance with the License.
6+ * A copy of the License is located at
7+ *
8+ * http://aws.amazon.com/apache2.0
9+ *
10+ * or in the "license" file accompanying this file. This file is distributed
11+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+ * express or implied. See the License for the specific language governing
13+ * permissions and limitations under the License.
14+ */
15+
16+ package software .amazon .awssdk .services .sqs .batchmanager ;
17+
18+ import static com .github .tomakehurst .wiremock .client .WireMock .*;
19+ import static com .github .tomakehurst .wiremock .core .WireMockConfiguration .wireMockConfig ;
20+ import static org .assertj .core .api .Assertions .assertThat ;
21+
22+ import com .github .tomakehurst .wiremock .junit5 .WireMockExtension ;
23+ import com .github .tomakehurst .wiremock .verification .LoggedRequest ;
24+ import com .google .common .util .concurrent .RateLimiter ;
25+ import java .net .URI ;
26+ import java .time .Duration ;
27+ import java .util .List ;
28+ import java .util .concurrent .ExecutorService ;
29+ import java .util .concurrent .Executors ;
30+ import java .util .concurrent .TimeUnit ;
31+ import java .util .stream .Collectors ;
32+ import org .junit .jupiter .api .AfterEach ;
33+ import org .junit .jupiter .api .BeforeEach ;
34+ import org .junit .jupiter .api .Test ;
35+ import org .junit .jupiter .api .extension .RegisterExtension ;
36+ import software .amazon .awssdk .auth .credentials .AwsBasicCredentials ;
37+ import software .amazon .awssdk .auth .credentials .StaticCredentialsProvider ;
38+ import software .amazon .awssdk .services .sqs .SqsAsyncClient ;
39+
40+
41+ /**
42+ * Tests the batching efficiency of {@link SqsAsyncBatchManager} under various load scenarios.
43+ */
44+ public class BatchingEfficiencyUnderLoadTest {
45+
46+ private static final String QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue" ;
47+ private static final int CONCURRENT_THREADS = 50 ;
48+ private static final int MAX_BATCH_SIZE = 10 ;
49+ private static final int SEND_FREQUENCY_MILLIS = 5 ;
50+
51+ @ RegisterExtension
52+ static WireMockExtension wireMock = WireMockExtension .newInstance ()
53+ .options (wireMockConfig ().dynamicPort ())
54+ .configureStaticDsl (true )
55+ .build ();
56+
57+ private SqsAsyncClient client ;
58+ private SqsAsyncBatchManager batchManager ;
59+
60+ @ BeforeEach
61+ void setUp () {
62+ client = SqsAsyncClient .builder ()
63+ .endpointOverride (URI .create ("http://localhost:" + wireMock .getPort ()))
64+ .checksumValidationEnabled (false )
65+ .credentialsProvider (StaticCredentialsProvider .create (
66+ AwsBasicCredentials .create ("key" , "secret" )))
67+ .build ();
68+
69+ batchManager = SqsAsyncBatchManager .builder ()
70+ .client (client )
71+ .scheduledExecutor (Executors .newScheduledThreadPool (10 ))
72+ .overrideConfiguration (config -> config
73+ .sendRequestFrequency (Duration .ofMillis (SEND_FREQUENCY_MILLIS ))
74+ .maxBatchSize (MAX_BATCH_SIZE ))
75+ .build ();
76+ }
77+
78+ @ AfterEach
79+ void tearDown () {
80+ batchManager .close ();
81+ client .close ();
82+ }
83+
84+ /**
85+ * Test runs heavy load and expects average batch sizes to be close to max.
86+ */
87+ @ Test
88+ void sendMessage_whenHighLoadScenario_shouldEfficientlyBatchMessages () throws Exception {
89+ int expectedBatchSize = 25 ; // more than double the actual max of 10
90+ int rateLimit = 1000 / SEND_FREQUENCY_MILLIS * expectedBatchSize ;
91+ int messageCount = rateLimit * 2 ; // run it for 2 seconds
92+ runThroughputTest (messageCount , rateLimit );
93+
94+ // Then: Verify messages were efficiently batched
95+ List <LoggedRequest > batchRequests = findAll (postRequestedFor (anyUrl ()));
96+
97+ // Calculate batching metrics
98+ List <Integer > batchSizes = batchRequests .stream ()
99+ .map (req -> req .getBodyAsString ().split ("\" Id\" " ).length - 1 )
100+ .collect (Collectors .toList ());
101+
102+ double avgBatchSize = batchSizes .stream ()
103+ .mapToInt (Integer ::intValue )
104+ .average ()
105+ .orElse (0 );
106+
107+ double fullBatchRatio = batchSizes .stream ()
108+ .filter (size -> size >= 9 )
109+ .count () / (double ) batchSizes .size ();
110+
111+ // Assert efficient batching
112+ assertThat (avgBatchSize )
113+ .as ("Average batch size" )
114+ .isGreaterThan (8.0 );
115+
116+
117+ assertThat (fullBatchRatio )
118+ .as ("Ratio of nearly full batches (9-10 messages)" )
119+ .isGreaterThan (0.8 );
120+
121+ assertThat ((double )batchRequests .size ())
122+ .as ("Total batch requests for %d messages" , messageCount )
123+ .isLessThan (messageCount / 5d );
124+ }
125+
126+ /**
127+ * Test runs a load that should cause an average batch size of 5.
128+ */
129+ @ Test
130+ void sendMessage_whenMediumLoadScenario_shouldCreateHalfSizeBatches () throws Exception {
131+ int expectedBatchSize = 5 ;
132+ int rateLimit = 1000 / SEND_FREQUENCY_MILLIS * expectedBatchSize ;
133+ int messageCount = rateLimit * 2 ; // run it for 2 seconds
134+ runThroughputTest (messageCount , rateLimit );
135+
136+ // Then: Verify batches were roughly half max size
137+ List <LoggedRequest > batchRequests = findAll (postRequestedFor (anyUrl ()));
138+
139+ // Calculate batching metrics
140+ List <Integer > batchSizes = batchRequests .stream ()
141+ .map (req -> req .getBodyAsString ().split ("\" Id\" " ).length - 1 )
142+ .collect (Collectors .toList ());
143+
144+ double avgBatchSize = batchSizes .stream ()
145+ .mapToInt (Integer ::intValue )
146+ .average ()
147+ .orElse (0 );
148+
149+ // Assert batch expected range
150+ assertThat (avgBatchSize )
151+ .as ("Average batch size" )
152+ .isLessThan (7.0 )
153+ .isGreaterThan (3.0 );
154+
155+ assertThat ((double )batchRequests .size ())
156+ .as ("Total batch requests for %d messages" , messageCount )
157+ .isLessThan (messageCount / 3d );
158+ }
159+
160+ @ Test
161+ void sendMessage_whenLowLoadScenario_shouldCreateSmallBatches () throws Exception {
162+ int expectedBatchSize = 1 ;
163+ int rateLimit = 1000 / SEND_FREQUENCY_MILLIS * expectedBatchSize ;
164+ int messageCount = rateLimit * 2 ; // run it for 2 seconds
165+ runThroughputTest (messageCount , rateLimit );
166+
167+ // Then: Verify batches were roughly half max size
168+ List <LoggedRequest > batchRequests = findAll (postRequestedFor (anyUrl ()));
169+
170+ // Calculate batching metrics
171+ List <Integer > batchSizes = batchRequests .stream ()
172+ .map (req -> req .getBodyAsString ().split ("\" Id\" " ).length - 1 )
173+ .collect (Collectors .toList ());
174+
175+ double avgBatchSize = batchSizes .stream ()
176+ .mapToInt (Integer ::intValue )
177+ .average ()
178+ .orElse (0 );
179+
180+ // Assert batch expected range
181+ assertThat (avgBatchSize )
182+ .as ("Average batch size" )
183+ .isLessThan (2.0 );
184+
185+ assertThat ((double )batchRequests .size ())
186+ .as ("Total batch requests for %d messages" , messageCount )
187+ .isGreaterThan (messageCount * .5 );
188+ }
189+
190+ private void runThroughputTest (int messageCount , int rateLimit ) throws InterruptedException {
191+ // Given: SQS returns success for batch requests
192+ stubFor (post (anyUrl ())
193+ .willReturn (aResponse ()
194+ .withStatus (200 )
195+ .withBody ("{\" Successful\" : []}" )));
196+
197+ // When: Send rateLimit messages per second concurrently (using 50 threads)
198+ ExecutorService executor = Executors .newFixedThreadPool (CONCURRENT_THREADS );
199+
200+ // Rate limit to spread it out over a couple seconds; enough time to make
201+ // any orphaned scheduled flushes obvious.
202+ RateLimiter rateLimiter = RateLimiter .create (rateLimit );
203+
204+ for (int i = 0 ; i < messageCount ; i ++) {
205+ String messageBody = String .valueOf (i );
206+ rateLimiter .acquire ();
207+ executor .execute (() -> {
208+ try {
209+ batchManager .sendMessage (builder ->
210+ builder .queueUrl (QUEUE_URL )
211+ .messageBody (messageBody ));
212+ } catch (Exception ignored ) {
213+ // Test will fail on assertions if messages aren't sent
214+ }
215+ });
216+ }
217+
218+ executor .shutdown ();
219+ executor .awaitTermination (10 , TimeUnit .SECONDS );
220+ }
221+ }
0 commit comments