Skip to content

Commit 30fda3e

Browse files
committed
Fix transactional batch diagnostics prefix, simplify validation, rename test fields
1 parent 89b41a3 commit 30fda3e

File tree

2 files changed

+30
-33
lines changed

2 files changed

+30
-33
lines changed

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,8 @@ private class TransactionalBulkWriter
8484
case None => 2 * ContainerFeedRangesCache.getFeedRanges(container, containerConfig.feedRangeRefreshIntervalInSecondsOpt).block().size
8585
}
8686
// Validate write strategy for transactional batches
87-
if (writeConfig.itemWriteStrategy != ItemWriteStrategy.ItemOverwrite) {
88-
throw new IllegalArgumentException(
89-
s"Transactional batches only support ItemOverwrite (upsert) write strategy. " +
90-
s"Requested strategy: ${writeConfig.itemWriteStrategy}")
91-
}
87+
require(writeConfig.itemWriteStrategy == ItemWriteStrategy.ItemOverwrite,
88+
s"Transactional batches only support ItemOverwrite (upsert) write strategy. Requested: ${writeConfig.itemWriteStrategy}")
9289

9390
log.logInfo(
9491
s"TransactionalBulkWriter instantiated (Host CPU count: $cpuCount, maxPendingBatches: $maxPendingBatches, " +
@@ -186,7 +183,7 @@ private class TransactionalBulkWriter
186183
private def initializeOperationContext(): SparkTaskContext = {
187184
val taskContext = TaskContext.get
188185

189-
val diagnosticsContext: DiagnosticsContext = DiagnosticsContext(UUIDs.nonBlockingRandomUUID(), "BulkWriter")
186+
val diagnosticsContext: DiagnosticsContext = DiagnosticsContext(UUIDs.nonBlockingRandomUUID(), "transactional-BulkWriter")
190187

191188
if (taskContext != null) {
192189
val taskDiagnosticsContext = SparkTaskContext(diagnosticsContext.correlationActivityId,
@@ -406,7 +403,7 @@ private class TransactionalBulkWriter
406403
operationContext: OperationContext): Unit = {
407404
activeTasks.incrementAndGet()
408405
if (operationContext.attemptNumber > 1) {
409-
logInfoOrWarning(s"bulk scheduleWrite attemptCnt: ${operationContext.attemptNumber}, " +
406+
logInfoOrWarning(s"TransactionalBulkWriter scheduleWrite attemptCnt: ${operationContext.attemptNumber}, " +
410407
s"Context: ${operationContext.toString} $getThreadInfo")
411408
}
412409

sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ class TransactionalBatchITest extends IntegrationSpec
435435
new com.azure.cosmos.models.PartitionKeyDefinition()
436436
)
437437
val paths = new java.util.ArrayList[String]()
438-
paths.add("/PermId")
439-
paths.add("/SourceId")
438+
paths.add("/testPrimaryKey")
439+
paths.add("/testSecondaryKey")
440440
containerProperties.getPartitionKeyDefinition.setPaths(paths)
441441
containerProperties.getPartitionKeyDefinition.setKind(com.azure.cosmos.models.PartitionKind.MULTI_HASH)
442442
containerProperties.getPartitionKeyDefinition.setVersion(com.azure.cosmos.models.PartitionKeyDefinitionVersion.V2)
@@ -452,9 +452,9 @@ class TransactionalBatchITest extends IntegrationSpec
452452
// Create batch operations with hierarchical partition keys
453453
val schema = StructType(Seq(
454454
StructField("id", StringType, nullable = false),
455-
StructField("PermId", StringType, nullable = false),
456-
StructField("SourceId", StringType, nullable = false),
457-
StructField("price", org.apache.spark.sql.types.DoubleType, nullable = false)
455+
StructField("testPrimaryKey", StringType, nullable = false),
456+
StructField("testSecondaryKey", StringType, nullable = false),
457+
StructField("testPrice", org.apache.spark.sql.types.DoubleType, nullable = false)
458458
))
459459

460460
val batchOperations = Seq(
@@ -479,11 +479,11 @@ class TransactionalBatchITest extends IntegrationSpec
479479
val pk = new PartitionKeyBuilder().add(permId).add(sourceId).build()
480480
val item1 = container.readItem(item1Id, pk, classOf[ObjectNode]).block()
481481
item1 should not be null
482-
item1.getItem.get("price").asDouble() shouldEqual 100.5
482+
item1.getItem.get("testPrice").asDouble() shouldEqual 100.5
483483

484484
val item2 = container.readItem(item2Id, pk, classOf[ObjectNode]).block()
485485
item2 should not be null
486-
item2.getItem.get("price").asDouble() shouldEqual 101.25
486+
item2.getItem.get("testPrice").asDouble() shouldEqual 101.25
487487
} finally {
488488
// Clean up container
489489
container.delete().block()
@@ -501,8 +501,8 @@ class TransactionalBatchITest extends IntegrationSpec
501501
new com.azure.cosmos.models.PartitionKeyDefinition()
502502
)
503503
val paths = new java.util.ArrayList[String]()
504-
paths.add("/PermId")
505-
paths.add("/SourceId")
504+
paths.add("/testPrimaryKey")
505+
paths.add("/testSecondaryKey")
506506
containerProperties.getPartitionKeyDefinition.setPaths(paths)
507507
containerProperties.getPartitionKeyDefinition.setKind(com.azure.cosmos.models.PartitionKind.MULTI_HASH)
508508
containerProperties.getPartitionKeyDefinition.setVersion(com.azure.cosmos.models.PartitionKeyDefinitionVersion.V2)
@@ -518,18 +518,18 @@ class TransactionalBatchITest extends IntegrationSpec
518518
// First, create initial record
519519
val initialDoc = Utils.getSimpleObjectMapper.createObjectNode()
520520
initialDoc.put("id", oldRecordId)
521-
initialDoc.put("PermId", permId)
522-
initialDoc.put("SourceId", sourceId)
523-
initialDoc.put("price", 100.0)
521+
initialDoc.put("testPrimaryKey", permId)
522+
initialDoc.put("testSecondaryKey", sourceId)
523+
initialDoc.put("testPrice", 100.0)
524524
initialDoc.putNull("valid_to")
525525
container.createItem(initialDoc).block()
526526

527527
// Now perform atomic temporal update: close old record + create new record
528528
val schema = StructType(Seq(
529529
StructField("id", StringType, nullable = false),
530-
StructField("PermId", StringType, nullable = false),
531-
StructField("SourceId", StringType, nullable = false),
532-
StructField("price", org.apache.spark.sql.types.DoubleType, nullable = false),
530+
StructField("testPrimaryKey", StringType, nullable = false),
531+
StructField("testSecondaryKey", StringType, nullable = false),
532+
StructField("testPrice", org.apache.spark.sql.types.DoubleType, nullable = false),
533533
StructField("valid_to", StringType, nullable = true)
534534
))
535535

@@ -562,7 +562,7 @@ class TransactionalBatchITest extends IntegrationSpec
562562
// Verify new record was created
563563
val newRecord = container.readItem(newRecordId, pk, classOf[ObjectNode]).block()
564564
newRecord should not be null
565-
newRecord.getItem.get("price").asDouble() shouldEqual 150.0
565+
newRecord.getItem.get("testPrice").asDouble() shouldEqual 150.0
566566
newRecord.getItem.get("valid_to").isNull shouldBe true
567567
} finally {
568568
// Clean up container
@@ -581,8 +581,8 @@ class TransactionalBatchITest extends IntegrationSpec
581581
new com.azure.cosmos.models.PartitionKeyDefinition()
582582
)
583583
val paths = new java.util.ArrayList[String]()
584-
paths.add("/PermId")
585-
paths.add("/SourceId")
584+
paths.add("/testPrimaryKey")
585+
paths.add("/testSecondaryKey")
586586
containerProperties.getPartitionKeyDefinition.setPaths(paths)
587587
containerProperties.getPartitionKeyDefinition.setKind(com.azure.cosmos.models.PartitionKind.MULTI_HASH)
588588
containerProperties.getPartitionKeyDefinition.setVersion(com.azure.cosmos.models.PartitionKeyDefinitionVersion.V2)
@@ -593,9 +593,9 @@ class TransactionalBatchITest extends IntegrationSpec
593593
// Create operations for different PermId/SourceId combinations
594594
val schema = StructType(Seq(
595595
StructField("id", StringType, nullable = false),
596-
StructField("PermId", StringType, nullable = false),
597-
StructField("SourceId", StringType, nullable = false),
598-
StructField("price", org.apache.spark.sql.types.DoubleType, nullable = false)
596+
StructField("testPrimaryKey", StringType, nullable = false),
597+
StructField("testSecondaryKey", StringType, nullable = false),
598+
StructField("testPrice", org.apache.spark.sql.types.DoubleType, nullable = false)
599599
))
600600

601601
val batchOperations = Seq(
@@ -640,8 +640,8 @@ class TransactionalBatchITest extends IntegrationSpec
640640
new com.azure.cosmos.models.PartitionKeyDefinition()
641641
)
642642
val paths = new java.util.ArrayList[String]()
643-
paths.add("/PermId")
644-
paths.add("/SourceId")
643+
paths.add("/testPrimaryKey")
644+
paths.add("/testSecondaryKey")
645645
containerProperties.getPartitionKeyDefinition.setPaths(paths)
646646
containerProperties.getPartitionKeyDefinition.setKind(com.azure.cosmos.models.PartitionKind.MULTI_HASH)
647647
containerProperties.getPartitionKeyDefinition.setVersion(com.azure.cosmos.models.PartitionKeyDefinitionVersion.V2)
@@ -654,9 +654,9 @@ class TransactionalBatchITest extends IntegrationSpec
654654
// Create 101 operations for the same hierarchical partition key
655655
val schema = StructType(Seq(
656656
StructField("id", StringType, nullable = false),
657-
StructField("PermId", StringType, nullable = false),
658-
StructField("SourceId", StringType, nullable = false),
659-
StructField("price", org.apache.spark.sql.types.DoubleType, nullable = false)
657+
StructField("testPrimaryKey", StringType, nullable = false),
658+
StructField("testSecondaryKey", StringType, nullable = false),
659+
StructField("testPrice", org.apache.spark.sql.types.DoubleType, nullable = false)
660660
))
661661

662662
val batchOperations = (1 to 101).map { i =>

0 commit comments

Comments
 (0)