Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon Simple Queue Service",
"contributor": "thornhillcody",
"description": "Fix SqsAsyncBatchManager excessive batch flushing under heavy load. Fixes [#6374](https://github.com/aws/aws-sdk-java-v2/issues/6374)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public boolean contains(String batchKey) {
return batchContextMap.containsKey(batchKey);
}

public void putScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush);
public void cancelAndReplaceScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
batchContextMap.get(batchKey).cancelAndReplaceScheduledFlush(scheduledFlush);
}

public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
@SdkInternalApi
public final class RequestBatchBuffer<RequestT, ResponseT> {
private final Object flushLock = new Object();
private final Object scheduledFlushLock = new Object();

private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
private final int maxBatchItems;
Expand Down Expand Up @@ -144,8 +145,14 @@ private String nextBatchEntry() {
return Integer.toString(nextBatchEntry++);
}

public void putScheduledFlush(ScheduledFuture<?> scheduledFlush) {
this.scheduledFlush = scheduledFlush;
public void cancelAndReplaceScheduledFlush(ScheduledFuture<?> scheduledFlush) {
// Locking the cancellation and replacement of the scheduledFlush ensures that there is only one active.
synchronized (scheduledFlushLock) {
if (this.scheduledFlush != null) {
cancelScheduledFlush();
}
this.scheduledFlush = scheduledFlush;
}
}

public void cancelScheduledFlush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private void manualFlushBuffer(String batchKey,
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests) {
requestsAndResponsesMaps.cancelScheduledFlush(batchKey);
flushBuffer(batchKey, flushableRequests);
requestsAndResponsesMaps.putScheduledFlush(batchKey,
scheduleBufferFlush(batchKey,
requestsAndResponsesMaps.cancelAndReplaceScheduledFlush(batchKey,
scheduleBufferFlush(batchKey,
sendRequestFrequency.toMillis(),
scheduledExecutor));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ void whenMaxBufferSizeReachedThenThrowException() {
}

@Test
void whenPutScheduledFlushThenFlushIsSet() {
void whenCancelAndReplaceScheduledFlushThenFlushIsSetAndOldFlushIsCanceled() {
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
ScheduledFuture<?> newScheduledFlush = mock(ScheduledFuture.class);
batchBuffer.putScheduledFlush(newScheduledFlush);
batchBuffer.cancelAndReplaceScheduledFlush(newScheduledFlush);
assertNotNull(newScheduledFlush);
verify(scheduledFlush).cancel(false);
}

@Test
Expand Down