Skip to content

Commit cb7726d

Browse files
author
Cody Thornhill
committed
Fix SqsAsyncBatchManager excessive batch flushing under heavy load (#6374)
* Synchronize replacement of scheduled flush
1 parent 1c0b19f commit cb7726d

File tree

5 files changed

+22
-8
lines changed

5 files changed

+22
-8
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon Simple Queue Service",
4+
"contributor": "thornhillcody",
5+
"description": "Fix SqsAsyncBatchManager excessive batch flushing under heavy load. Fixes [#6374](https://github.com/aws/aws-sdk-java-v2/issues/6374)."
6+
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public boolean contains(String batchKey) {
5959
return batchContextMap.containsKey(batchKey);
6060
}
6161

62-
public void putScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
63-
batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush);
62+
public void cancelAndReplaceScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
63+
batchContextMap.get(batchKey).cancelAndReplaceScheduledFlush(scheduledFlush);
6464
}
6565

6666
public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>> action) {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
@SdkInternalApi
3030
public final class RequestBatchBuffer<RequestT, ResponseT> {
3131
private final Object flushLock = new Object();
32+
private final Object scheduledFlushLock = new Object();
3233

3334
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
3435
private final int maxBatchItems;
@@ -144,8 +145,14 @@ private String nextBatchEntry() {
144145
return Integer.toString(nextBatchEntry++);
145146
}
146147

147-
public void putScheduledFlush(ScheduledFuture<?> scheduledFlush) {
148-
this.scheduledFlush = scheduledFlush;
148+
public void cancelAndReplaceScheduledFlush(ScheduledFuture<?> scheduledFlush) {
149+
// Locking the cancellation and replacement of the scheduledFlush ensures that there is only one active.
150+
synchronized (scheduledFlushLock) {
151+
if (this.scheduledFlush != null) {
152+
cancelScheduledFlush();
153+
}
154+
this.scheduledFlush = scheduledFlush;
155+
}
149156
}
150157

151158
public void cancelScheduledFlush() {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ private void manualFlushBuffer(String batchKey,
109109
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests) {
110110
requestsAndResponsesMaps.cancelScheduledFlush(batchKey);
111111
flushBuffer(batchKey, flushableRequests);
112-
requestsAndResponsesMaps.putScheduledFlush(batchKey,
113-
scheduleBufferFlush(batchKey,
112+
requestsAndResponsesMaps.cancelAndReplaceScheduledFlush(batchKey,
113+
scheduleBufferFlush(batchKey,
114114
sendRequestFrequency.toMillis(),
115115
scheduledExecutor));
116116
}

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,12 @@ void whenMaxBufferSizeReachedThenThrowException() {
8181
}
8282

8383
@Test
84-
void whenPutScheduledFlushThenFlushIsSet() {
84+
void whenCancelAndReplaceScheduledFlushThenFlushIsSetAndOldFlushIsCanceled() {
8585
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
8686
ScheduledFuture<?> newScheduledFlush = mock(ScheduledFuture.class);
87-
batchBuffer.putScheduledFlush(newScheduledFlush);
87+
batchBuffer.cancelAndReplaceScheduledFlush(newScheduledFlush);
8888
assertNotNull(newScheduledFlush);
89+
verify(scheduledFlush).cancel(false);
8990
}
9091

9192
@Test

0 commit comments

Comments
 (0)