-
Notifications
You must be signed in to change notification settings - Fork 937
Description
Describe the bug
We observed SqsAsyncBatchManager to be making much smaller than expected batches in an experiment with high concurrency with a small batch window.
Regression Issue
- Select this option if this issue appears to be a regression.
Expected Behavior
With 5000 requests per second and a 10ms batching window (and requests sufficiently small), we would expect most batches to be full (10 requests per batch).
Current Behavior
The batching client averages ~2.5 requests per batch, and the average drifts slightly lower over a longer period of running.
Reproduction Steps
To reproduce this, I created a simple app that creates the SqsAsyncBatchManager with 10ms batch window and calls batchManager.sendMessage() 5000 times per second. An ExecutionInterceptor monitors the batch sizes and reports them every couple seconds.
You can use env credentials and fill in your own
public class SqsBenchmark {
private static final Logger LOG = LogManager.getLogger();
private static final int MESSAGE_PROGRESS_INTERVAL = 10000;
private static final int BATCH_STAT_INTERVAL_SEC = 2;
private final SqsAsyncClient sqsClient;
private final SqsAsyncBatchManager batchManager;
private final AtomicLong messagesSent = new AtomicLong(0);
private final RateLimiter rateLimiter;
private final ScheduledExecutorService metricReportScheduler;
public SqsBenchmark(Region region, double messagesPerSecond, int batchWindowMs) {
this.rateLimiter = RateLimiter.create(messagesPerSecond);
LOG.info(String.format("Rate limiter configured for %.2f messages per second", messagesPerSecond));
LOG.info(String.format("BatchWindow configured for %dms", batchWindowMs));
MetricReporter metricReporter = new MetricReporter();
metricReportScheduler = Executors.newSingleThreadScheduledExecutor();
metricReportScheduler.scheduleAtFixedRate(metricReporter, BATCH_STAT_INTERVAL_SEC, BATCH_STAT_INTERVAL_SEC,
TimeUnit.SECONDS);
BatchExecutionInterceptor batchExecutionInterceptor = new BatchExecutionInterceptor(metricReporter);
ClientOverrideConfiguration overrideConfiguration = ClientOverrideConfiguration.builder()
.addExecutionInterceptor(batchExecutionInterceptor)
.build();
sqsClient = SqsAsyncClient.builder()
.credentialsProvider(DefaultCredentialsProvider.builder().build())
.region(region)
.httpClientBuilder(AwsCrtAsyncHttpClient.builder()
.maxConcurrency(400)
.connectionTimeout(Duration.ofMillis(5000))
.connectionAcquisitionTimeout(Duration.ofMillis(5000)))
// Configure client behavior
.overrideConfiguration(overrideConfiguration)
.build();
this.batchManager = SqsAsyncBatchManager.builder()
.client(sqsClient)
.overrideConfiguration(builder -> builder
.maxBatchSize(10)
.sendRequestFrequency(Duration.ofMillis(batchWindowMs)))
.scheduledExecutor(Executors.newScheduledThreadPool(50))
.build();
}
public void sendMessages(String queueUrl, int messageCount, int concurrency) {
LOG.info(String.format("Starting to send %d messages with concurrency %d, rate limit %.2f", messageCount, concurrency, rateLimiter.getRate()));
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
ConcurrentHashMap<Integer, CompletableFuture<Void>> incompleteFutures = new ConcurrentHashMap<>();
for (int i = 0; i < messageCount; i++) {
final int messageId = i;
// Let the rate limiter decide how many we can add at a time. Not having enough threads will cause surge queue backlog though.
rateLimiter.acquire();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
SendMessageRequest request = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(String.format("Benchmark message %d", messageId))
.build();
try {
batchManager.sendMessage(request)
.whenComplete((result, throwable) -> {
if (throwable == null) {
long sent = messagesSent.incrementAndGet();
if (sent % MESSAGE_PROGRESS_INTERVAL == 0) {
LOG.info(String.format("Sent %d messages", sent));
}
} else {
LOG.error("Failed to send message: " + throwable);
}
});
} catch (Exception e) {
LOG.error("Failed to send message: " + e.getMessage(), e);
throw e;
}
}, executor);
incompleteFutures.put(i, future);
future.whenComplete((result, throwable) -> incompleteFutures.remove(messageId));
}
try {
while (messagesSent.get() != messageCount) {
Thread.sleep(1000);
}
CompletableFuture.allOf(incompleteFutures.values().toArray(new CompletableFuture[0])).get();
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for message sends to complete", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error in message send futures", e);
}
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
public void printStats() {
LOG.info(String.format("Messages sent: %d",
messagesSent.get()));
}
public static void main(String[] args) throws ParseException {
String queueUrl = "<queue-url>";
int concurrency = 300;
int messageCount = 50000;
double rateLimit = 5000;
int batchWindowMs = 10;
Region region = Region.of("us-east-1");
SqsBenchmark benchmark = new SqsBenchmark(region, rateLimit, batchWindowMs);
try {
LOG.info(String.format("Starting SQS Benchmark with %s queueUrl",
queueUrl));
long startTime = System.currentTimeMillis();
// Send messages
benchmark.sendMessages(queueUrl, messageCount, concurrency);
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
// Stop reporting sendbatch metrics
benchmark.metricReportScheduler.shutdown();
LOG.info(String.format("Benchmark completed in %d ms", duration));
LOG.info(String.format("Throughput: %.2f messages/second",
(double) messageCount * 1000 / duration));
LOG.info("Purging the queue");
benchmark.sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build());
LOG.info("Purged the queue successfully.");
benchmark.printStats();
} catch (Exception e) {
LOG.error("Benchmark failed: " + e.getMessage(), e);
}
LOG.info("Done");
System.exit(0);
}
}
public class MetricReporter implements Runnable {
private static final Logger LOG = LogManager.getLogger();
private final SynchronizedDescriptiveStatistics batchSizeStats;
public MetricReporter() {
batchSizeStats = new SynchronizedDescriptiveStatistics(SynchronizedDescriptiveStatistics.INFINITE_WINDOW);
}
public void addBatchSizeStat(Integer value) {
batchSizeStats.addValue(value);
}
/**
* Report stats and clear the counters
*/
@Override
public void run() {
LOG.info("------BEGIN_BATCH_STAT------");
LOG.info(String.format("BatchSize.avg=%.1f", batchSizeStats.getMean()));
LOG.info(String.format("BatchSize.p50=%.1f", batchSizeStats.getPercentile(50)));
LOG.info(String.format("BatchSize.p90=%.1f", batchSizeStats.getPercentile(90)));
LOG.info("------END_BATCH_STAT------");
batchSizeStats.clear();
}
}
public class BatchExecutionInterceptor implements ExecutionInterceptor {
private final MetricReporter metricReporter;
public BatchExecutionInterceptor(MetricReporter metricReporter) {
this.metricReporter = metricReporter;
}
@Override
public void afterExecution(Context.AfterExecution context, ExecutionAttributes executionAttributes) {
if (context.request() instanceof SendMessageBatchRequest && context.response() instanceof SendMessageBatchResponse) {
SendMessageBatchResponse response = (SendMessageBatchResponse) context.response();
metricReporter.addBatchSizeStat(response.successful().size() + response.failed().size());
}
}
}
Possible Solution
This occurs because of a race condition in the the RequestBatchManager's manualFlushBuffer method. During high concurrency, multiple manualFlushBuffers may be triggered at once.
Here's a the sequence with Thread A and Thread B both running manualFlushBuffer around the same time and a pre-existing scheduled flush 'X':
Thread A | Thread B | Comments |
---|---|---|
cancel X | ||
flush | ||
..flushing | cancel X | // both threads cancel the same flush task |
putFlush Y | flush | |
putFlush Z | // two flushes have been scheduled, Y is now orphaned |
Because the flushes are scheduleAtFixedRate, each orphaned scheduled flush will continue to flush perpetually and cannot be cancelled.
We can fix this by synchronizing the cancellation and putting of a new flush in the RequestBatchBuffer.putScheduledFlush() to ensure only one scheduled flush is ever active.
Additional Information/Context
No response
AWS Java SDK version used
2.32.20
JDK version used
openjdk 17.0.16 2025-07-15 LTS
Operating System and version
Amazon Linux 2