Skip to content

Commit 89b41a3

Browse files
committed
Add batch size validation for transactional batches. Refactored calculateTotalSerializedLength to validate 2MB payload limit and return HTTP 413 errors with clear messages when exceeded.
1 parent 39241c4 commit 89b41a3

File tree

1 file changed

+32
-7
lines changed

1 file changed

+32
-7
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutor.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,31 @@ private Flux<CosmosBulkOperationResponse<TContext>> executeTransactionalBatch(Co
286286
String batchTrackingId = UUIDs.nonBlockingRandomUUID().toString();
287287
PartitionKey partitionKey = cosmosBatch.getPartitionKeyValue();
288288

289+
// Validate batch payload size before execution
290+
int totalSerializedLength = this.calculateTotalSerializedLength(operations);
291+
if (totalSerializedLength > this.maxMicroBatchPayloadSizeInBytes) {
292+
String errorMessage = String.format(
293+
"Transactional batch exceeds maximum payload size: %d bytes (limit: %d bytes), PK: %s",
294+
totalSerializedLength,
295+
this.maxMicroBatchPayloadSizeInBytes,
296+
partitionKey);
297+
logger.error("{}, Context: {}", errorMessage, this.operationContextText);
298+
299+
// Return error responses for all operations in the batch
300+
CosmosException payloadSizeException = BridgeInternal.createCosmosException(
301+
HttpConstants.StatusCodes.REQUEST_ENTITY_TOO_LARGE,
302+
errorMessage);
303+
304+
return Flux.fromIterable(operations)
305+
.map(operation -> {
306+
TContext actualContext = this.getActualContext(operation);
307+
return ModelBridgeInternal.createCosmosBulkOperationResponse(
308+
operation,
309+
payloadSizeException,
310+
actualContext);
311+
});
312+
}
313+
289314
logDebugOrWarning(
290315
"Executing transactional batch - {} operations, PK: {}, TrackingId: {}, Context: {}",
291316
operations.size(),
@@ -402,14 +427,14 @@ private Flux<CosmosBulkOperationResponse<TContext>> executeTransactionalBatch(Co
402427
});
403428
}
404429

405-
private int calculateTotalSerializedLength(AtomicInteger currentTotalSerializedLength, CosmosItemOperation item) {
406-
if (item instanceof CosmosItemOperationBase) {
407-
return currentTotalSerializedLength.accumulateAndGet(
408-
((CosmosItemOperationBase) item).getSerializedLength(this.effectiveItemSerializer),
409-
Integer::sum);
430+
private int calculateTotalSerializedLength(List<CosmosItemOperation> operations) {
431+
int totalLength = 0;
432+
for (CosmosItemOperation operation : operations) {
433+
if (operation instanceof CosmosItemOperationBase) {
434+
totalLength += ((CosmosItemOperationBase) operation).getSerializedLength(this.effectiveItemSerializer);
435+
}
410436
}
411-
412-
return currentTotalSerializedLength.get();
437+
return totalLength;
413438
}
414439

415440
private TContext getActualContext(CosmosItemOperation itemOperation) {

0 commit comments

Comments
 (0)