Skip to content

Commit 65810f5

Browse files
committed
refactored to use CosmosBatch, removed operationType per row, and added docs to ingestion.md
1 parent 2ff00a9 commit 65810f5

File tree

8 files changed

+531
-1264
lines changed

8 files changed

+531
-1264
lines changed

sdk/cosmos/azure-cosmos-spark_3-5/src/main/scala/com/azure/cosmos/spark/ItemsWriterBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private class ItemsWriterBuilder
8787
override def supportedCustomMetrics(): Array[CustomMetric] = supportedCosmosMetrics
8888

8989
override def requiredDistribution(): Distribution = {
90-
if (writeConfig.bulkEnabled && writeConfig.itemWriteStrategy == ItemWriteStrategy.ItemTransactionalBatch) {
90+
if (writeConfig.bulkEnabled && writeConfig.bulkTransactional) {
9191
// For transactional writes, partition by all partition key columns
9292
val partitionKeyPaths = getPartitionKeyColumnNames()
9393
if (partitionKeyPaths.nonEmpty) {
@@ -103,7 +103,7 @@ private class ItemsWriterBuilder
103103
}
104104

105105
override def requiredOrdering(): Array[SortOrder] = {
106-
if (writeConfig.bulkEnabled && writeConfig.itemWriteStrategy == ItemWriteStrategy.ItemTransactionalBatch) {
106+
if (writeConfig.bulkEnabled && writeConfig.bulkTransactional) {
107107
// For transactional writes, order by all partition key columns (ascending)
108108
val partitionKeyPaths = getPartitionKeyColumnNames()
109109
if (partitionKeyPaths.nonEmpty) {

sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
6868
| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` |
6969
| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
70-
| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
70+
| `spark.cosmos.write.bulk.transactional` | `false` | Enable transactional batch mode for bulk writes. When enabled, all operations for the same partition key are executed atomically (all succeed or all fail). Requires ordering and clustering by partition key columns. Only supports upsert operations. Cannot exceed 100 operations or 2MB per partition key. See [Transactional Batch documentation](https://learn.microsoft.com/azure/cosmos-db/nosql/transactional-batch) for details. |\n| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
7171
| `spark.cosmos.write.bulk.initialBatchSize` | `100` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume too many RUs. |
7272
| `spark.cosmos.write.bulk.maxBatchSize` | `100` | Cosmos DB max. bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the max. micro batch size is 100. Use this setting only when migrating Spark 2.4 workloads - for other scenarios relying on the auto-tuning combined with throughput control will result in better experience. |
7373
| `spark.cosmos.write.flush.noProgress.maxIntervalInSeconds` | `180` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes before forcing a retry. The retry will reinitialize the bulk write process - so, any delays on the retry can be sure to be actual service issues. The default value of 3 min should be sufficient to prevent false negatives when there is a short service-side write unavailability - like for partition splits or merges. Increase it only if you regularly see these transient errors to exceed a time period of 180 seconds. |

sdk/cosmos/azure-cosmos-spark_3/docs/scenarios/Ingestion.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,61 @@ When your container has a "unique key constraint policy" any 409 "Conflict" (ind
7979
- For `ItemOverwrite` a 409 - Conflict due to unique key violation will result in an error - and the Spark job will fail. *NOTE: Conflicts due to pk+id being identical to another document won't even result in a 409 - because with Upsert the existing document would simply be updated.*
8080
- For `ItemAppend` like conflicts on pk+id any unique key policy constraint violation will be ignored.
8181

82+
### Transactional batch writes
83+
84+
For scenarios requiring atomic all-or-nothing semantics within a partition, you can enable transactional batch writes using the `spark.cosmos.write.bulk.transactional` configuration. When enabled, all operations for a single partition key value either succeed or fail together.
85+
86+
#### Configuration
87+
88+
**Python example:**
89+
```python
90+
df.write \
91+
.format("cosmos.oltp") \
92+
.option("spark.cosmos.accountEndpoint", cosmosEndpoint) \
93+
.option("spark.cosmos.accountKey", cosmosKey) \
94+
.option("spark.cosmos.database", cosmosDatabase) \
95+
.option("spark.cosmos.container", cosmosContainer) \
96+
.option("spark.cosmos.write.bulk.enabled", "true") \
97+
.option("spark.cosmos.write.bulk.transactional", "true") \
98+
.mode("Append") \
99+
.save()
100+
```
101+
102+
**Scala example:**
103+
```scala
104+
df.write
105+
.format("cosmos.oltp")
106+
.option("spark.cosmos.accountEndpoint", cosmosEndpoint)
107+
.option("spark.cosmos.accountKey", cosmosKey)
108+
.option("spark.cosmos.database", cosmosDatabase)
109+
.option("spark.cosmos.container", cosmosContainer)
110+
.option("spark.cosmos.write.bulk.enabled", "true")
111+
.option("spark.cosmos.write.bulk.transactional", "true")
112+
.mode(SaveMode.Append)
113+
.save()
114+
```
115+
116+
#### Characteristics and limitations
117+
118+
- **Atomic semantics**: All operations for the same partition key succeed or all fail (rollback)
119+
- **Operation type**: Only upsert operations are supported (equivalent to `ItemOverwrite` write strategy)
120+
- **Partition grouping**: Spark automatically partitions and orders data by partition key columns
121+
- **Size limits**: Maximum 100 operations per transaction; maximum 2MB total payload per transaction
122+
- **Partition key requirement**: All operations in a transaction must share the same partition key value
123+
- **Bulk mode required**: Must have `spark.cosmos.write.bulk.enabled=true` (enabled by default)
124+
125+
#### Use cases
126+
127+
Transactional batch writes are ideal for:
128+
- Financial transactions requiring consistency across multiple documents
129+
- Order processing where order header and line items must be committed together
130+
- Multi-document updates that must be atomic (e.g., inventory adjustments)
131+
- Any scenario where partial success would leave data in an inconsistent state
132+
133+
#### Error handling
134+
135+
If any operation in a transaction fails (e.g., insufficient RUs, document too large, transaction exceeds 100 operations), the entire transaction is rolled back and no documents are modified. The Spark task will fail and retry according to Spark's retry policy.
136+
82137
## Preparation
83138
Below are a couple of tips/best-practices that can help you to prepare for a data migration into a Cosmos DB container.
84139

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ private[spark] object CosmosConfigNames {
113113
val ClientTelemetryEnabled = "spark.cosmos.clientTelemetry.enabled" // keep this to avoid breaking changes
114114
val ClientTelemetryEndpoint = "spark.cosmos.clientTelemetry.endpoint" // keep this to avoid breaking changes
115115
val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled"
116+
val WriteBulkTransactional = "spark.cosmos.write.bulk.transactional"
116117
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
117118
val WriteBulkMaxBatchSize = "spark.cosmos.write.bulk.maxBatchSize"
118119
val WriteBulkMinTargetBatchSize = "spark.cosmos.write.bulk.minTargetBatchSize"
@@ -242,6 +243,7 @@ private[spark] object CosmosConfigNames {
242243
ClientTelemetryEnabled,
243244
ClientTelemetryEndpoint,
244245
WriteBulkEnabled,
246+
WriteBulkTransactional,
245247
WriteBulkMaxPendingOperations,
246248
WriteBulkMaxConcurrentPartitions,
247249
WriteBulkPayloadSizeInBytes,
@@ -1437,7 +1439,7 @@ private[spark] object DiagnosticsConfig {
14371439

14381440
private object ItemWriteStrategy extends Enumeration {
14391441
type ItemWriteStrategy = Value
1440-
val ItemOverwrite, ItemAppend, ItemDelete, ItemDeleteIfNotModified, ItemOverwriteIfNotModified, ItemPatch, ItemPatchIfExists, ItemBulkUpdate, ItemTransactionalBatch = Value
1442+
val ItemOverwrite, ItemAppend, ItemDelete, ItemDeleteIfNotModified, ItemOverwriteIfNotModified, ItemPatch, ItemPatchIfExists, ItemBulkUpdate = Value
14411443
}
14421444

14431445
private object CosmosPatchOperationTypes extends Enumeration {
@@ -1462,6 +1464,7 @@ private case class CosmosPatchConfigs(columnConfigsMap: TrieMap[String, CosmosPa
14621464
private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
14631465
maxRetryCount: Int,
14641466
bulkEnabled: Boolean,
1467+
bulkTransactional: Boolean = false,
14651468
bulkMaxPendingOperations: Option[Int] = None,
14661469
pointMaxConcurrency: Option[Int] = None,
14671470
maxConcurrentCosmosPartitions: Option[Int] = None,
@@ -1486,6 +1489,12 @@ private object CosmosWriteConfig {
14861489
parseFromStringFunction = bulkEnabledAsString => bulkEnabledAsString.toBoolean,
14871490
helpMessage = "Cosmos DB Item Write bulk enabled")
14881491

1492+
private val bulkTransactional = CosmosConfigEntry[Boolean](key = CosmosConfigNames.WriteBulkTransactional,
1493+
defaultValue = Option.apply(false),
1494+
mandatory = false,
1495+
parseFromStringFunction = bulkTransactionalAsString => bulkTransactionalAsString.toBoolean,
1496+
helpMessage = "Cosmos DB Item Write bulk transactional batch mode enabled")
1497+
14891498
private val microBatchPayloadSizeInBytes = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkPayloadSizeInBytes,
14901499
defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES),
14911500
mandatory = false,
@@ -1758,6 +1767,7 @@ private object CosmosWriteConfig {
17581767
val itemWriteStrategyOpt = CosmosConfigEntry.parse(cfg, itemWriteStrategy)
17591768
val maxRetryCountOpt = CosmosConfigEntry.parse(cfg, maxRetryCount)
17601769
val bulkEnabledOpt = CosmosConfigEntry.parse(cfg, bulkEnabled)
1770+
val bulkTransactionalOpt = CosmosConfigEntry.parse(cfg, bulkTransactional)
17611771
var patchConfigsOpt = Option.empty[CosmosPatchConfigs]
17621772
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
17631773
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
@@ -1768,6 +1778,7 @@ private object CosmosWriteConfig {
17681778
.parse(cfg, writeOnRetryCommitInterceptor).flatten
17691779

17701780
assert(bulkEnabledOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteBulkEnabled}' is missing.")
1781+
assert(bulkTransactionalOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteBulkTransactional}' is missing.")
17711782

17721783
// parsing above already validated this
17731784
assert(itemWriteStrategyOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteStrategy}' is missing.")
@@ -1788,6 +1799,7 @@ private object CosmosWriteConfig {
17881799
itemWriteStrategyOpt.get,
17891800
maxRetryCountOpt.get,
17901801
bulkEnabled = bulkEnabledOpt.get,
1802+
bulkTransactional = bulkTransactionalOpt.get,
17911803
bulkMaxPendingOperations = CosmosConfigEntry.parse(cfg, bulkMaxPendingOperations),
17921804
pointMaxConcurrency = CosmosConfigEntry.parse(cfg, pointWriteConcurrency),
17931805
maxConcurrentCosmosPartitions = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions),

0 commit comments

Comments
 (0)