Skip to content

Commit 746d4c5

Browse files
author
Cody Thornhill
committed
Fix SqsAsyncBatchManager excessive batch flushing under heavy load (#6374)
* Synchronize replacement of scheduled flush
1 parent 884023a commit 746d4c5

File tree

7 files changed

+252
-9
lines changed

7 files changed

+252
-9
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon Simple Queue Service",
4+
"contributor": "thornhillcody",
5+
"description": "Fix SqsAsyncBatchManager excessive batch flushing under heavy load. Fixes [#6374](https://github.com/aws/aws-sdk-java-v2/issues/6374)."
6+
}

services/sqs/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@
111111
<artifactId>mockito-junit-jupiter</artifactId>
112112
<scope>test</scope>
113113
</dependency>
114+
<dependency>
115+
<groupId>com.google.guava</groupId>
116+
<artifactId>guava</artifactId>
117+
<scope>test</scope>
118+
</dependency>
114119

115120
</dependencies>
116121
</project>

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public boolean contains(String batchKey) {
5959
return batchContextMap.containsKey(batchKey);
6060
}
6161

62-
public void putScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
63-
batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush);
62+
public void cancelAndReplaceScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
63+
batchContextMap.get(batchKey).cancelAndReplaceScheduledFlush(scheduledFlush);
6464
}
6565

6666
public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>> action) {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
@SdkInternalApi
3030
public final class RequestBatchBuffer<RequestT, ResponseT> {
3131
private final Object flushLock = new Object();
32+
private final Object scheduledFlushLock = new Object();
3233

3334
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
3435
private final int maxBatchItems;
@@ -144,12 +145,20 @@ private String nextBatchEntry() {
144145
return Integer.toString(nextBatchEntry++);
145146
}
146147

147-
public void putScheduledFlush(ScheduledFuture<?> scheduledFlush) {
148-
this.scheduledFlush = scheduledFlush;
148+
public void cancelAndReplaceScheduledFlush(ScheduledFuture<?> scheduledFlush) {
149+
// Locking the cancellation and replacement of the scheduledFlush ensures that there is only one active.
150+
synchronized (scheduledFlushLock) {
151+
if (this.scheduledFlush != null) {
152+
cancelScheduledFlush();
153+
}
154+
this.scheduledFlush = scheduledFlush;
155+
}
149156
}
150157

151158
public void cancelScheduledFlush() {
152-
scheduledFlush.cancel(false);
159+
synchronized (scheduledFlushLock) {
160+
scheduledFlush.cancel(false);
161+
}
153162
}
154163

155164
public Collection<CompletableFuture<ResponseT>> responses() {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ private void manualFlushBuffer(String batchKey,
109109
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests) {
110110
requestsAndResponsesMaps.cancelScheduledFlush(batchKey);
111111
flushBuffer(batchKey, flushableRequests);
112-
requestsAndResponsesMaps.putScheduledFlush(batchKey,
113-
scheduleBufferFlush(batchKey,
112+
requestsAndResponsesMaps.cancelAndReplaceScheduledFlush(batchKey,
113+
scheduleBufferFlush(batchKey,
114114
sendRequestFrequency.toMillis(),
115115
scheduledExecutor));
116116
}
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.batchmanager;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.*;
19+
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
23+
import com.github.tomakehurst.wiremock.verification.LoggedRequest;
24+
import com.google.common.util.concurrent.RateLimiter;
25+
import java.net.URI;
26+
import java.time.Duration;
27+
import java.util.List;
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.stream.Collectors;
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.api.extension.RegisterExtension;
37+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
38+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
39+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
40+
41+
42+
/**
43+
* Tests the batching efficiency of {@link SqsAsyncBatchManager} under various load scenarios.
44+
*/
45+
public class BatchingEfficiencyUnderLoadTest {
46+
47+
private static final String QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue";
48+
private static final int CONCURRENT_THREADS = 50;
49+
private static final int MAX_BATCH_SIZE = 10;
50+
private static final int SEND_FREQUENCY_MILLIS = 5;
51+
52+
@RegisterExtension
53+
static WireMockExtension wireMock = WireMockExtension.newInstance()
54+
.options(wireMockConfig().dynamicPort())
55+
.configureStaticDsl(true)
56+
.build();
57+
58+
private SqsAsyncClient client;
59+
private SqsAsyncBatchManager batchManager;
60+
61+
@BeforeEach
62+
void setUp() {
63+
client = SqsAsyncClient.builder()
64+
.endpointOverride(URI.create("http://localhost:" + wireMock.getPort()))
65+
.checksumValidationEnabled(false)
66+
.credentialsProvider(StaticCredentialsProvider.create(
67+
AwsBasicCredentials.create("key", "secret")))
68+
.build();
69+
70+
batchManager = SqsAsyncBatchManager.builder()
71+
.client(client)
72+
.scheduledExecutor(Executors.newScheduledThreadPool(10))
73+
.overrideConfiguration(config -> config
74+
.sendRequestFrequency(Duration.ofMillis(SEND_FREQUENCY_MILLIS))
75+
.maxBatchSize(MAX_BATCH_SIZE))
76+
.build();
77+
}
78+
79+
@AfterEach
80+
void tearDown() {
81+
batchManager.close();
82+
client.close();
83+
}
84+
85+
/**
86+
* Test runs heavy load and expects average batch sizes to be close to max.
87+
*/
88+
@Test
89+
void shouldEfficientlyBatchMessagesUnderHighLoad() throws Exception {
90+
int expectedBatchSize = 25; // more than double the actual max of 10
91+
int rateLimit = 1000 / SEND_FREQUENCY_MILLIS * expectedBatchSize;
92+
int messageCount = rateLimit * 2; // run it for 2 seconds
93+
runThroughputTest(messageCount, rateLimit);
94+
95+
// Then: Verify messages were efficiently batched
96+
List<LoggedRequest> batchRequests = findAll(postRequestedFor(anyUrl()));
97+
98+
// Calculate batching metrics
99+
List<Integer> batchSizes = batchRequests.stream()
100+
.map(req -> req.getBodyAsString().split("\"Id\"").length - 1)
101+
.collect(Collectors.toList());
102+
103+
double avgBatchSize = batchSizes.stream()
104+
.mapToInt(Integer::intValue)
105+
.average()
106+
.orElse(0);
107+
108+
double fullBatchRatio = batchSizes.stream()
109+
.filter(size -> size >= 9)
110+
.count() / (double) batchSizes.size();
111+
112+
// Assert efficient batching
113+
assertThat(avgBatchSize)
114+
.as("Average batch size")
115+
.isGreaterThan(8.0);
116+
117+
118+
assertThat(fullBatchRatio)
119+
.as("Ratio of nearly full batches (9-10 messages)")
120+
.isGreaterThan(0.8);
121+
122+
assertThat((double)batchRequests.size())
123+
.as("Total batch requests for %d messages", messageCount)
124+
.isLessThan(messageCount / 5d);
125+
}
126+
127+
/**
128+
* Test runs a load that should cause an average batch size of 5.
129+
*/
130+
@Test
131+
void shouldMakeHalfBatches() throws Exception {
132+
int expectedBatchSize = 5;
133+
int rateLimit = 1000 / SEND_FREQUENCY_MILLIS * expectedBatchSize;
134+
int messageCount = rateLimit * 2; // run it for 2 seconds
135+
runThroughputTest(messageCount, rateLimit);
136+
137+
// Then: Verify batches were roughly half max size
138+
List<LoggedRequest> batchRequests = findAll(postRequestedFor(anyUrl()));
139+
140+
// Calculate batching metrics
141+
List<Integer> batchSizes = batchRequests.stream()
142+
.map(req -> req.getBodyAsString().split("\"Id\"").length - 1)
143+
.collect(Collectors.toList());
144+
145+
double avgBatchSize = batchSizes.stream()
146+
.mapToInt(Integer::intValue)
147+
.average()
148+
.orElse(0);
149+
150+
// Assert batch expected range
151+
assertThat(avgBatchSize)
152+
.as("Average batch size")
153+
.isLessThan(7.0)
154+
.isGreaterThan(3.0);
155+
156+
assertThat((double)batchRequests.size())
157+
.as("Total batch requests for %d messages", messageCount)
158+
.isLessThan(messageCount / 3d);
159+
}
160+
161+
@Test
162+
void shouldMakeSmallBatches() throws Exception {
163+
int expectedBatchSize = 1;
164+
int rateLimit = 1000 / SEND_FREQUENCY_MILLIS * expectedBatchSize;
165+
int messageCount = rateLimit * 2; // run it for 2 seconds
166+
runThroughputTest(messageCount, rateLimit);
167+
168+
// Then: Verify batches were roughly half max size
169+
List<LoggedRequest> batchRequests = findAll(postRequestedFor(anyUrl()));
170+
171+
// Calculate batching metrics
172+
List<Integer> batchSizes = batchRequests.stream()
173+
.map(req -> req.getBodyAsString().split("\"Id\"").length - 1)
174+
.collect(Collectors.toList());
175+
176+
double avgBatchSize = batchSizes.stream()
177+
.mapToInt(Integer::intValue)
178+
.average()
179+
.orElse(0);
180+
181+
// Assert batch expected range
182+
assertThat(avgBatchSize)
183+
.as("Average batch size")
184+
.isLessThan(2.0);
185+
186+
assertThat((double)batchRequests.size())
187+
.as("Total batch requests for %d messages", messageCount)
188+
.isGreaterThan(messageCount * .5);
189+
}
190+
191+
private void runThroughputTest(int messageCount, int rateLimit) throws InterruptedException {
192+
// Given: SQS returns success for batch requests
193+
stubFor(post(anyUrl())
194+
.willReturn(aResponse()
195+
.withStatus(200)
196+
.withBody("{\"Successful\": []}")));
197+
198+
// When: Send rateLimit messages per second concurrently (using 50 threads)
199+
ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREADS);
200+
201+
// Rate limit to spread it out over a couple seconds; enough time to make
202+
// any orphaned scheduled flushes obvious.
203+
RateLimiter rateLimiter = RateLimiter.create(rateLimit);
204+
205+
for (int i = 0; i < messageCount; i++) {
206+
String messageBody = String.valueOf(i);
207+
rateLimiter.acquire();
208+
executor.execute(() -> {
209+
try {
210+
batchManager.sendMessage(builder ->
211+
builder.queueUrl(QUEUE_URL)
212+
.messageBody(messageBody));
213+
} catch (Exception ignored) {
214+
// Test will fail on assertions if messages aren't sent
215+
}
216+
});
217+
}
218+
219+
executor.shutdown();
220+
executor.awaitTermination(10, TimeUnit.SECONDS);
221+
}
222+
}

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,12 @@ void whenMaxBufferSizeReachedThenThrowException() {
8181
}
8282

8383
@Test
84-
void whenPutScheduledFlushThenFlushIsSet() {
84+
void whenCancelAndReplaceScheduledFlushThenFlushIsSetAndOldFlushIsCanceled() {
8585
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
8686
ScheduledFuture<?> newScheduledFlush = mock(ScheduledFuture.class);
87-
batchBuffer.putScheduledFlush(newScheduledFlush);
87+
batchBuffer.cancelAndReplaceScheduledFlush(newScheduledFlush);
8888
assertNotNull(newScheduledFlush);
89+
verify(scheduledFlush).cancel(false);
8990
}
9091

9192
@Test

0 commit comments

Comments
 (0)