Skip to content

Commit 0019ecc

Browse files
author
Cody Thornhill
committed
Fix SqsAsyncBatchManager excessive batch flushing under heavy load (#6374)
* Synchronize replacement of scheduled flush
1 parent 884023a commit 0019ecc

File tree

7 files changed

+292
-11
lines changed

7 files changed

+292
-11
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon Simple Queue Service",
4+
"contributor": "thornhillcody",
5+
"description": "Fix SqsAsyncBatchManager excessive batch flushing under heavy load. Fixes [#6374](https://github.com/aws/aws-sdk-java-v2/issues/6374)."
6+
}

services/sqs/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@
111111
<artifactId>mockito-junit-jupiter</artifactId>
112112
<scope>test</scope>
113113
</dependency>
114+
<dependency>
115+
<groupId>com.google.guava</groupId>
116+
<artifactId>guava</artifactId>
117+
<scope>test</scope>
118+
</dependency>
114119

115120
</dependencies>
116121
</project>

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public boolean contains(String batchKey) {
5959
return batchContextMap.containsKey(batchKey);
6060
}
6161

62-
public void putScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
63-
batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush);
62+
public void cancelAndReplaceScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
63+
batchContextMap.get(batchKey).cancelAndReplaceScheduledFlush(scheduledFlush);
6464
}
6565

6666
public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>> action) {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
@SdkInternalApi
3030
public final class RequestBatchBuffer<RequestT, ResponseT> {
3131
private final Object flushLock = new Object();
32+
private final Object scheduledFlushLock = new Object();
3233

3334
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
3435
private final int maxBatchItems;
@@ -144,12 +145,20 @@ private String nextBatchEntry() {
144145
return Integer.toString(nextBatchEntry++);
145146
}
146147

147-
public void putScheduledFlush(ScheduledFuture<?> scheduledFlush) {
148-
this.scheduledFlush = scheduledFlush;
148+
public void cancelAndReplaceScheduledFlush(ScheduledFuture<?> scheduledFlush) {
149+
// Locking the cancellation and replacement of the scheduledFlush ensures that there is only one active.
150+
synchronized (scheduledFlushLock) {
151+
if (this.scheduledFlush != null) {
152+
cancelScheduledFlush();
153+
}
154+
this.scheduledFlush = scheduledFlush;
155+
}
149156
}
150157

151158
public void cancelScheduledFlush() {
152-
scheduledFlush.cancel(false);
159+
synchronized (scheduledFlushLock) {
160+
scheduledFlush.cancel(false);
161+
}
153162
}
154163

155164
public Collection<CompletableFuture<ResponseT>> responses() {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,9 @@ protected abstract CompletableFuture<BatchResponseT> batchAndSend(List<Identifia
107107

108108
private void manualFlushBuffer(String batchKey,
109109
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests) {
110-
requestsAndResponsesMaps.cancelScheduledFlush(batchKey);
111110
flushBuffer(batchKey, flushableRequests);
112-
requestsAndResponsesMaps.putScheduledFlush(batchKey,
113-
scheduleBufferFlush(batchKey,
111+
requestsAndResponsesMaps.cancelAndReplaceScheduledFlush(batchKey,
112+
scheduleBufferFlush(batchKey,
114113
sendRequestFrequency.toMillis(),
115114
scheduledExecutor));
116115
}
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.batchmanager;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.*;
19+
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
23+
import com.github.tomakehurst.wiremock.verification.LoggedRequest;
24+
import com.google.common.util.concurrent.RateLimiter;
25+
import java.net.URI;
26+
import java.time.Duration;
27+
import java.util.List;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.stream.Collectors;
32+
import org.junit.jupiter.api.AfterEach;
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.extension.RegisterExtension;
36+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
37+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
38+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
39+
40+
41+
/**
42+
* Tests the batching efficiency of {@link SqsAsyncBatchManager} under various load scenarios.
43+
*/
44+
public class BatchingEfficiencyUnderLoadTest {
45+
46+
private static final String QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue";
47+
private static final int CONCURRENT_THREADS = 50;
48+
private static final int MAX_BATCH_SIZE = 10;
49+
private static final int SEND_FREQUENCY_MILLIS = 5;
50+
51+
@RegisterExtension
52+
static WireMockExtension wireMock = WireMockExtension.newInstance()
53+
.options(wireMockConfig().dynamicPort())
54+
.configureStaticDsl(true)
55+
.build();
56+
57+
private SqsAsyncClient client;
58+
private SqsAsyncBatchManager batchManager;
59+
60+
@BeforeEach
61+
void setUp() {
62+
client = SqsAsyncClient.builder()
63+
.endpointOverride(URI.create("http://localhost:" + wireMock.getPort()))
64+
.checksumValidationEnabled(false)
65+
.credentialsProvider(StaticCredentialsProvider.create(
66+
AwsBasicCredentials.create("key", "secret")))
67+
.build();
68+
69+
batchManager = SqsAsyncBatchManager.builder()
70+
.client(client)
71+
.scheduledExecutor(Executors.newScheduledThreadPool(10))
72+
.overrideConfiguration(config -> config
73+
.sendRequestFrequency(Duration.ofMillis(SEND_FREQUENCY_MILLIS))
74+
.maxBatchSize(MAX_BATCH_SIZE))
75+
.build();
76+
}
77+
78+
@AfterEach
79+
void tearDown() {
80+
batchManager.close();
81+
client.close();
82+
}
83+
84+
/**
85+
* Test runs heavy load and expects average batch sizes to be close to max.
86+
*/
87+
@Test
88+
void shouldEfficientlyBatchMessagesUnderHighLoad() throws Exception {
89+
int expectedBatchSize = 25; // more than double the actual max of 10
90+
int rateLimit = 1000 / SEND_FREQUENCY_MILLIS * expectedBatchSize;
91+
int messageCount = rateLimit * 2; // run it for 2 seconds
92+
runThroughputTest(messageCount, rateLimit);
93+
94+
// Then: Verify messages were efficiently batched
95+
List<LoggedRequest> batchRequests = findAll(postRequestedFor(anyUrl()));
96+
97+
// Calculate batching metrics
98+
List<Integer> batchSizes = batchRequests.stream()
99+
.map(req -> req.getBodyAsString().split("\"Id\"").length - 1)
100+
.collect(Collectors.toList());
101+
102+
double avgBatchSize = batchSizes.stream()
103+
.mapToInt(Integer::intValue)
104+
.average()
105+
.orElse(0);
106+
107+
double fullBatchRatio = batchSizes.stream()
108+
.filter(size -> size >= 9)
109+
.count() / (double) batchSizes.size();
110+
111+
// Assert efficient batching
112+
assertThat(avgBatchSize)
113+
.as("Average batch size")
114+
.isGreaterThan(8.0);
115+
116+
117+
assertThat(fullBatchRatio)
118+
.as("Ratio of nearly full batches (9-10 messages)")
119+
.isGreaterThan(0.8);
120+
121+
assertThat((double)batchRequests.size())
122+
.as("Total batch requests for %d messages", messageCount)
123+
.isLessThan(messageCount / 5d);
124+
}
125+
126+
/**
127+
* Test runs a load that should cause an average batch size of 5.
128+
*/
129+
@Test
130+
void shouldMakeHalfBatches() throws Exception {
131+
int expectedBatchSize = 5;
132+
int rateLimit = 1000 / SEND_FREQUENCY_MILLIS * expectedBatchSize;
133+
int messageCount = rateLimit * 2; // run it for 2 seconds
134+
runThroughputTest(messageCount, rateLimit);
135+
136+
// Then: Verify batches were roughly half max size
137+
List<LoggedRequest> batchRequests = findAll(postRequestedFor(anyUrl()));
138+
139+
// Calculate batching metrics
140+
List<Integer> batchSizes = batchRequests.stream()
141+
.map(req -> req.getBodyAsString().split("\"Id\"").length - 1)
142+
.collect(Collectors.toList());
143+
144+
double avgBatchSize = batchSizes.stream()
145+
.mapToInt(Integer::intValue)
146+
.average()
147+
.orElse(0);
148+
149+
// Assert batch expected range
150+
assertThat(avgBatchSize)
151+
.as("Average batch size")
152+
.isLessThan(7.0)
153+
.isGreaterThan(3.0);
154+
155+
assertThat((double)batchRequests.size())
156+
.as("Total batch requests for %d messages", messageCount)
157+
.isLessThan(messageCount / 3d);
158+
}
159+
160+
@Test
161+
void shouldMakeSmallBatches() throws Exception {
162+
int expectedBatchSize = 1;
163+
int rateLimit = 1000 / SEND_FREQUENCY_MILLIS * expectedBatchSize;
164+
int messageCount = rateLimit * 2; // run it for 2 seconds
165+
runThroughputTest(messageCount, rateLimit);
166+
167+
// Then: Verify batches were roughly half max size
168+
List<LoggedRequest> batchRequests = findAll(postRequestedFor(anyUrl()));
169+
170+
// Calculate batching metrics
171+
List<Integer> batchSizes = batchRequests.stream()
172+
.map(req -> req.getBodyAsString().split("\"Id\"").length - 1)
173+
.collect(Collectors.toList());
174+
175+
double avgBatchSize = batchSizes.stream()
176+
.mapToInt(Integer::intValue)
177+
.average()
178+
.orElse(0);
179+
180+
// Assert batch expected range
181+
assertThat(avgBatchSize)
182+
.as("Average batch size")
183+
.isLessThan(2.0);
184+
185+
assertThat((double)batchRequests.size())
186+
.as("Total batch requests for %d messages", messageCount)
187+
.isGreaterThan(messageCount * .5);
188+
}
189+
190+
private void runThroughputTest(int messageCount, int rateLimit) throws InterruptedException {
191+
// Given: SQS returns success for batch requests
192+
stubFor(post(anyUrl())
193+
.willReturn(aResponse()
194+
.withStatus(200)
195+
.withBody("{\"Successful\": []}")));
196+
197+
// When: Send rateLimit messages per second concurrently (using 50 threads)
198+
ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREADS);
199+
200+
// Rate limit to spread it out over a couple seconds; enough time to make
201+
// any orphaned scheduled flushes obvious.
202+
RateLimiter rateLimiter = RateLimiter.create(rateLimit);
203+
204+
for (int i = 0; i < messageCount; i++) {
205+
String messageBody = String.valueOf(i);
206+
rateLimiter.acquire();
207+
executor.execute(() -> {
208+
try {
209+
batchManager.sendMessage(builder ->
210+
builder.queueUrl(QUEUE_URL)
211+
.messageBody(messageBody));
212+
} catch (Exception ignored) {
213+
// Test will fail on assertions if messages aren't sent
214+
}
215+
});
216+
}
217+
218+
executor.shutdown();
219+
executor.awaitTermination(10, TimeUnit.SECONDS);
220+
}
221+
}

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,12 @@ void whenMaxBufferSizeReachedThenThrowException() {
8181
}
8282

8383
@Test
84-
void whenPutScheduledFlushThenFlushIsSet() {
84+
void whenCancelAndReplaceScheduledFlushThenFlushIsSetAndOldFlushIsCanceled() {
8585
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
8686
ScheduledFuture<?> newScheduledFlush = mock(ScheduledFuture.class);
87-
batchBuffer.putScheduledFlush(newScheduledFlush);
87+
batchBuffer.cancelAndReplaceScheduledFlush(newScheduledFlush);
8888
assertNotNull(newScheduledFlush);
89+
verify(scheduledFlush).cancel(false);
8990
}
9091

9192
@Test
@@ -188,6 +189,46 @@ void testFlushWhenCumulativePayloadExceedsMaxSize() {
188189
}
189190

190191

192+
@Test
193+
void whenSequentialCancelAndReplaceScheduledFlushThenEachPreviousFlushIsCanceled() {
194+
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
195+
196+
// Create a sequence of mock scheduled futures
197+
ScheduledFuture<?> flush1 = mock(ScheduledFuture.class);
198+
ScheduledFuture<?> flush2 = mock(ScheduledFuture.class);
199+
ScheduledFuture<?> flush3 = mock(ScheduledFuture.class);
200+
201+
// First replacement - should cancel the initial scheduledFlush
202+
batchBuffer.cancelAndReplaceScheduledFlush(flush1);
203+
verify(scheduledFlush, times(1)).cancel(false);
204+
205+
// Second replacement - should cancel flush1
206+
batchBuffer.cancelAndReplaceScheduledFlush(flush2);
207+
verify(flush1, times(1)).cancel(false);
208+
209+
// Verify flush2 has not been canceled (it's the current one)
210+
verify(flush2, never()).cancel(false);
211+
212+
// Verify buffer is still functional
213+
CompletableFuture<String> response = new CompletableFuture<>();
214+
batchBuffer.put("test-request", response);
215+
assertEquals(1, batchBuffer.responses().size());
216+
}
217+
218+
@Test
219+
void whenCancelAndReplaceScheduledFlushWithNullInitialFlushThenNoExceptionThrown() {
220+
// Create buffer with null initial flush
221+
batchBuffer = new RequestBatchBuffer<>(null, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
222+
223+
ScheduledFuture<?> newFlush = mock(ScheduledFuture.class);
224+
225+
// Should not throw exception when initial flush is null
226+
assertDoesNotThrow(() -> batchBuffer.cancelAndReplaceScheduledFlush(newFlush));
227+
228+
// Verify newFlush is not canceled (it's the current one)
229+
verify(newFlush, never()).cancel(false);
230+
}
231+
191232
private String createLargeString(char ch, int length) {
192233
StringBuilder sb = new StringBuilder(length);
193234
for (int i = 0; i < length; i++) {
@@ -198,4 +239,4 @@ private String createLargeString(char ch, int length) {
198239

199240

200241

201-
}
242+
}

0 commit comments

Comments
 (0)