Skip to content

Commit 6392493

Browse files
committed
batch-level semantics, remove client validation, update test titles
1 parent 30fda3e commit 6392493

File tree

3 files changed

+12
-47
lines changed

3 files changed

+12
-47
lines changed

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,26 +1037,28 @@ private object TransactionalBulkWriter {
10371037

10381038
private val bulkProcessingThresholds = new CosmosBulkExecutionThresholdsState()
10391039

1040-
private val maxPendingOperationsPerJVM: Int = DefaultMaxPendingOperationPerCore * SparkUtils.getNumberOfHostCPUCores
1040+
// For batch-level backpressure: use a conservative estimate of concurrent batches across the JVM
1041+
// Each batch can have up to 100 operations, so this sizing provides headroom for the scheduler queues
1042+
private val maxPendingBatchesPerJVM: Int = DefaultMaxPendingOperationPerCore * SparkUtils.getNumberOfHostCPUCores / 50
10411043

10421044
// Custom bounded elastic scheduler to consume input flux
10431045
val bulkWriterRequestsBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
10441046
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
1045-
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + 2 * maxPendingOperationsPerJVM,
1047+
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + 2 * maxPendingBatchesPerJVM,
10461048
BULK_WRITER_REQUESTS_BOUNDED_ELASTIC_THREAD_NAME,
10471049
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)
10481050

10491051
// Custom bounded elastic scheduler to consume input flux
10501052
val bulkWriterInputBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
10511053
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
1052-
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + 2 * maxPendingOperationsPerJVM,
1054+
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + 2 * maxPendingBatchesPerJVM,
10531055
BULK_WRITER_INPUT_BOUNDED_ELASTIC_THREAD_NAME,
10541056
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)
10551057

10561058
// Custom bounded elastic scheduler to switch off IO thread to process response.
10571059
val bulkWriterResponsesBoundedElastic: Scheduler = Schedulers.newBoundedElastic(
10581060
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
1059-
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + maxPendingOperationsPerJVM,
1061+
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE + maxPendingBatchesPerJVM,
10601062
BULK_WRITER_RESPONSES_BOUNDED_ELASTIC_THREAD_NAME,
10611063
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true)
10621064

sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -424,11 +424,11 @@ class TransactionalBatchITest extends IntegrationSpec
424424
rootCause.getMessage should include("Batch request has more operations than what is supported")
425425
}
426426

427-
"Transactional Batch with Hierarchical Partition Keys" should "create items atomically with PermId and SourceId" in {
427+
"Transactional Batch with Hierarchical Partition Keys" should "create items atomically with testPrimaryKey and testSecondaryKey" in {
428428
val cosmosEndpoint = TestConfigurations.HOST
429429
val cosmosMasterKey = TestConfigurations.MASTER_KEY
430430

431-
// Create container with hierarchical partition keys (PermId, SourceId)
431+
// Create container with hierarchical partition keys (testPrimaryKey, testSecondaryKey)
432432
val containerName = s"test-hpk-${UUID.randomUUID()}"
433433
val containerProperties = new com.azure.cosmos.models.CosmosContainerProperties(
434434
containerName,
@@ -494,7 +494,7 @@ class TransactionalBatchITest extends IntegrationSpec
494494
val cosmosEndpoint = TestConfigurations.HOST
495495
val cosmosMasterKey = TestConfigurations.MASTER_KEY
496496

497-
// Create container with hierarchical partition keys (PermId, SourceId)
497+
// Create container with hierarchical partition keys (testPrimaryKey, testSecondaryKey)
498498
val containerName = s"test-hpk-temporal-${UUID.randomUUID()}"
499499
val containerProperties = new com.azure.cosmos.models.CosmosContainerProperties(
500500
containerName,
@@ -570,11 +570,11 @@ class TransactionalBatchITest extends IntegrationSpec
570570
}
571571
}
572572

573-
it should "handle operations across multiple PermId/SourceId combinations" in {
573+
it should "handle operations across multiple testPrimaryKey/testSecondaryKey combinations" in {
574574
val cosmosEndpoint = TestConfigurations.HOST
575575
val cosmosMasterKey = TestConfigurations.MASTER_KEY
576576

577-
// Create container with hierarchical partition keys (PermId, SourceId)
577+
// Create container with hierarchical partition keys (testPrimaryKey, testSecondaryKey)
578578
val containerName = s"test-hpk-multi-${UUID.randomUUID()}"
579579
val containerProperties = new com.azure.cosmos.models.CosmosContainerProperties(
580580
containerName,
@@ -633,7 +633,7 @@ class TransactionalBatchITest extends IntegrationSpec
633633
val cosmosEndpoint = TestConfigurations.HOST
634634
val cosmosMasterKey = TestConfigurations.MASTER_KEY
635635

636-
// Create container with hierarchical partition keys (PermId, SourceId)
636+
// Create container with hierarchical partition keys (testPrimaryKey, testSecondaryKey)
637637
val containerName = s"test-hpk-limit-${UUID.randomUUID()}"
638638
val containerProperties = new com.azure.cosmos.models.CosmosContainerProperties(
639639
containerName,

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/TransactionalBulkExecutor.java

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ public final class TransactionalBulkExecutor<TContext> implements Disposable {
9595
ImplementationBridgeHelpers.CosmosBatchResponseHelper.getCosmosBatchResponseAccessor();
9696

9797
private final CosmosAsyncContainer container;
98-
private final int maxMicroBatchPayloadSizeInBytes;
9998
private final AsyncDocumentClient docClientWrapper;
10099
private final String operationContextText;
101100
private final OperationContextAndListenerTuple operationListener;
@@ -126,7 +125,6 @@ public TransactionalBulkExecutor(CosmosAsyncContainer container,
126125
checkNotNull(inputOperations, "expected non-null inputOperations");
127126
checkNotNull(cosmosBulkOptions, "expected non-null bulkOptions");
128127

129-
this.maxMicroBatchPayloadSizeInBytes = cosmosBulkOptions.getMaxMicroBatchPayloadSizeInBytes();
130128
this.cosmosBulkExecutionOptions = cosmosBulkOptions;
131129
this.container = container;
132130
this.bulkSpanName = "transactionalBatch." + this.container.getId();
@@ -286,31 +284,6 @@ private Flux<CosmosBulkOperationResponse<TContext>> executeTransactionalBatch(Co
286284
String batchTrackingId = UUIDs.nonBlockingRandomUUID().toString();
287285
PartitionKey partitionKey = cosmosBatch.getPartitionKeyValue();
288286

289-
// Validate batch payload size before execution
290-
int totalSerializedLength = this.calculateTotalSerializedLength(operations);
291-
if (totalSerializedLength > this.maxMicroBatchPayloadSizeInBytes) {
292-
String errorMessage = String.format(
293-
"Transactional batch exceeds maximum payload size: %d bytes (limit: %d bytes), PK: %s",
294-
totalSerializedLength,
295-
this.maxMicroBatchPayloadSizeInBytes,
296-
partitionKey);
297-
logger.error("{}, Context: {}", errorMessage, this.operationContextText);
298-
299-
// Return error responses for all operations in the batch
300-
CosmosException payloadSizeException = BridgeInternal.createCosmosException(
301-
HttpConstants.StatusCodes.REQUEST_ENTITY_TOO_LARGE,
302-
errorMessage);
303-
304-
return Flux.fromIterable(operations)
305-
.map(operation -> {
306-
TContext actualContext = this.getActualContext(operation);
307-
return ModelBridgeInternal.createCosmosBulkOperationResponse(
308-
operation,
309-
payloadSizeException,
310-
actualContext);
311-
});
312-
}
313-
314287
logDebugOrWarning(
315288
"Executing transactional batch - {} operations, PK: {}, TrackingId: {}, Context: {}",
316289
operations.size(),
@@ -427,16 +400,6 @@ private Flux<CosmosBulkOperationResponse<TContext>> executeTransactionalBatch(Co
427400
});
428401
}
429402

430-
private int calculateTotalSerializedLength(List<CosmosItemOperation> operations) {
431-
int totalLength = 0;
432-
for (CosmosItemOperation operation : operations) {
433-
if (operation instanceof CosmosItemOperationBase) {
434-
totalLength += ((CosmosItemOperationBase) operation).getSerializedLength(this.effectiveItemSerializer);
435-
}
436-
}
437-
return totalLength;
438-
}
439-
440403
private TContext getActualContext(CosmosItemOperation itemOperation) {
441404
ItemBulkOperation<?, ?> itemBulkOperation = null;
442405

0 commit comments

Comments
 (0)