Skip to content

Conversation

@TheovanKraay
Copy link
Member

@TheovanKraay TheovanKraay commented Nov 28, 2025

Description

This PR adds transactional batch operation support to the Azure Cosmos DB Spark 3 connector, enabling atomic multi-document operations within a single partition. The implementation follows the same pattern as standard Cosmos writes, converting flat DataFrame rows to JSON documents while supporting batch-specific metadata.

Key Features

  • Atomic Operations: All operations in a batch succeed or fail together (ACID guarantees)
  • Multiple Operation Types: Supports create, replace, upsert, delete, and read operations
  • Hierarchical Partition Keys: Full support for 1-3 level hierarchical partition keys with JSON serialization for Spark broadcast compatibility
  • 100-Operation Limit Enforcement: Validates that batches do not exceed 100 operations per partition key (Cosmos DB limit)
  • Flexible Schema: Optional operationType column (defaults to "upsert" if not provided)
  • Partition Key Validation: Ensures all operations within a batch target the same partition
  • Detailed Results: Returns per-operation status codes, success flags, and error messages
  • Resource Management: Proper cleanup of Cosmos clients and throughput control resources

Usage Examples

Example 1: Basic Upsert Operations (No Operation Type Specified)

When you don't specify an operationType column, all operations default to "upsert":

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema WITHOUT operationType column
schema = StructType([
    StructField("id", StringType(), False),
    StructField("pk", StringType(), False),
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), False)
])

# Create DataFrame with flat fields - no operationType column
items = [
    ("item1", "partition1", "Alice", 30),
    ("item2", "partition1", "Bob", 25),
    ("item3", "partition1", "Charlie", 35)
]
df = spark.createDataFrame(items, schema)

# All operations will be upserts (default behavior)
results = df.write \
    .format("cosmos.oltp.transactionalBatch") \
    .options(**cosmosConfig) \
    .save()

results.show()

Output Schema:

Column Type Description
id String Document ID
partitionKey String Partition key value
operationType String Operation performed ("upsert")
statusCode Int HTTP status code (e.g., 201 for create)
isSuccessStatusCode Boolean Whether operation succeeded
resultDocument String Returned document (for read operations)
errorMessage String Error details (if operation failed)

Example 2: Financial Instrument Temporal Versioning (Hierarchical Partition Keys)

This example demonstrates hierarchical partition keys for temporal versioning of financial instruments. The partition key consists of two levels: PermId (instrument identifier) and SourceId (data source):

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema with hierarchical partition key: PermId (level 1) + SourceId (level 2)
schema = StructType([
    StructField("id", StringType(), False),
    StructField("PermId", StringType(), False),      # Partition key level 1
    StructField("SourceId", StringType(), False),    # Partition key level 2
    StructField("ValidFrom", TimestampType(), False),
    StructField("ValidTo", TimestampType(), True),
    StructField("Price", DoubleType(), False),
    StructField("Currency", StringType(), False)
])

# Temporal update: close old record and create new version atomically
# Both operations use the same hierarchical partition key [PermId="MSFT", SourceId="Bloomberg"]
operations = [
    # Close the current active record by setting ValidTo
    ("inst-msft-v1", "MSFT", "Bloomberg", "2024-01-01", "2024-12-01", 100.50, "USD"),
    # Create new version with updated price
    ("inst-msft-v2", "MSFT", "Bloomberg", "2024-12-01", None, 105.75, "USD")
]
df = spark.createDataFrame(operations, schema)

# Execute atomic temporal update - both succeed or both fail
results = df.write \
    .format("cosmos.oltp.transactionalBatch") \
    .options(**cosmosConfig) \
    .save()

# Check for any failures
results.filter(~results.isSuccessStatusCode).show()

Note: In this example, PermId and SourceId together form the hierarchical partition key (2 levels). All operations in the batch must use the same partition key values to maintain atomicity.

Example 3: Mixed Operations with Operation Type Column

Specify different operations per row using the operationType column:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema WITH operationType column
schema = StructType([
    StructField("id", StringType(), False),
    StructField("pk", StringType(), False),
    StructField("operationType", StringType(), False),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Create DataFrame with operationType column specifying different operations
mixedOperations = [
    ("new1", "pk1", "create", "Alice", 30),
    ("existing1", "pk1", "replace", "Bob", 26),
    ("any1", "pk1", "upsert", "Charlie", 35),
    ("old1", "pk1", "delete", None, None),
    ("check1", "pk1", "read", None, None)
]
df = spark.createDataFrame(mixedOperations, schema)

results = df.write \
    .format("cosmos.oltp.transactionalBatch") \
    .options(**cosmosConfig) \
    .save()

# Check results
results.filter(~results.isSuccessStatusCode).show()

Input DataFrame Schema

Your DataFrame should have flat columns representing document properties:

Column Type Required Description
id String Yes Document identifier
pk (or partition key path) String/Multiple Yes Partition key value(s) - supports hierarchical keys
operationType String No Operation to perform: "create", "replace", "upsert", "delete", "read" (default: "upsert")
...additional columns... Any No Document properties (converted to JSON)

Note: For hierarchical partition keys, include all partition key path columns (e.g., PermId, SourceId). The operationType column is metadata and not included in the stored document.

Output DataFrame Schema

Column Type Description
id String Document ID
partitionKey String Partition key value used (comma-separated for hierarchical keys)
operationType String Operation that was performed
statusCode Int HTTP status code (200-299 for success)
isSuccessStatusCode Boolean Whether the operation succeeded
resultDocument String Document content (populated for "read" operations)
errorMessage String Error details (populated on failure)

Constraints

  • Same Partition Key: All operations in a batch must target the same partition key value(s)
  • 100 Operation Limit: Batches cannot exceed 100 operations per partition key (enforced with validation)
  • Atomicity: All operations succeed or fail together within each batch
  • 2MB Size Limit: Total batch payload cannot exceed 2MB
  • Hierarchical Keys: Supports up to 3-level hierarchical partition keys

Error Handling

The API returns detailed results for each operation:

results = df.write \
    .format("cosmos.oltp.transactionalBatch") \
    .options(**cosmosConfig) \
    .save()

# Check for validation errors (thrown before execution)
# - Missing 'id' column
# - Mismatched partition keys in batch
# - Invalid operation types

# Check for Cosmos errors (returned in results)
results.filter(~results.isSuccessStatusCode).select("id", "statusCode", "errorMessage").show()

# Common status codes:
# - 200/201: Success
# - 400: Bad request (invalid document)
# - 409: Conflict (document already exists for "create")
# - 404: Not found (document doesn't exist for "replace"/"delete")

All SDK Contribution checklist:

  • The pull request does not introduce [breaking changes]
  • CHANGELOG is updated for new features, bug fixes or other significant changes.
  • I have read the contribution guidelines.

General Guidelines and Best Practices

  • Title of the pull request is clear and informative.
  • There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.

Testing Guidelines

  • Pull request includes test coverage for the included changes.

Copilot AI review requested due to automatic review settings November 28, 2025 21:48
@TheovanKraay TheovanKraay requested review from a team and kirankumarkolli as code owners November 28, 2025 21:48
Copilot finished reviewing on behalf of TheovanKraay November 28, 2025 21:53
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements atomic transactional batch operations for the Cosmos DB Spark connector, enabling users to execute multiple operations within the same logical partition atomically with all-or-nothing semantics.

Key Changes:

  • Adds writeTransactionalBatch API to CosmosItemsDataSource that converts flat DataFrame schemas to batch operations using the same pattern as standard Cosmos writes
  • Implements TransactionalBatchWriter and TransactionalBatchPartitionExecutor classes following the CosmosReadManyReader pattern for consistency
  • Provides comprehensive integration tests covering atomic creation, rollback on failure, simplified schemas with default upsert, operation ordering, and update scenarios

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 19 comments.

Show a summary per file
File Description
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosItemsDataSource.scala Adds writeTransactionalBatch API methods and createBatchOperationExtraction function to convert DataFrame rows to batch operations
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBatchWriter.scala Implements core transactional batch execution logic with partition-based grouping and atomic execution
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala Adds ItemTransactionalBatch to the ItemWriteStrategy enumeration
sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md Documents the transactional batch API with usage patterns, schema requirements, and constraints
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala Provides integration tests for atomic operations, rollback scenarios, simplified schemas, and operation ordering

@TheovanKraay TheovanKraay force-pushed the spark-transactional-batch-support branch 8 times, most recently from 334b9b8 to a8506f0 Compare December 4, 2025 12:52
@github-actions
Copy link
Contributor

github-actions bot commented Dec 4, 2025

API Change Check

APIView identified API level changes in this PR and created the following API reviews

com.azure:azure-cosmos

@TheovanKraay TheovanKraay force-pushed the spark-transactional-batch-support branch from 50ad8fd to 74d6ed2 Compare December 4, 2025 18:02
TheovanKraay and others added 11 commits December 4, 2025 21:16
This commit implements atomic transactional batch operations for the Cosmos DB Spark connector, enabling users to execute multiple operations within the same logical partition atomically. All operations succeed together or fail together, ensuring data consistency.

CosmosItemsDataSource.scala:
Adds the writeTransactionalBatch API with support for flat DataFrame schemas following the same pattern as standard Cosmos writes. The implementation converts DataFrame rows to JSON documents using CosmosRowConverter, automatically handling metadata columns like operationType (excluded from documents) and partitionKey (normalized to "pk" for Cosmos DB). When the operationType column is omitted, operations default to upsert. The extraction logic ensures proper Spark serialization by creating non-serializable objects within lambda closures rather than capturing them.

TransactionalBatchWriter.scala (new file):
Implements the core batch execution logic following the CosmosReadManyReader pattern for consistency. Groups operations by partition key and executes them atomically using the Cosmos DB Batch API. Handles all five operation types (create, upsert, replace, delete, read) and returns detailed results including status codes, success indicators, result documents for read operations, and error messages for failures. Manages Cosmos client lifecycle, metadata cache broadcasting, and proper resource cleanup after execution.

TransactionalBatchITest.scala (new file):
Provides comprehensive integration tests covering atomic creation, rollback on failure, simplified schemas with default upsert operations, operation ordering preservation, and update operations. Tests verify both successful scenarios and failure modes including duplicate key conflicts that trigger atomic rollback. All tests use flat column DataFrames matching the standard Cosmos write pattern rather than JSON strings.

CosmosConfig.scala:
Adds ItemTransactionalBatch to the ItemWriteStrategy enumeration to enable configuration and routing of transactional batch operations within the connector's write strategy framework.

configuration-reference.md:
Documents the new transactional batch API with usage patterns, schema requirements, supported operations, constraints, and example code. Explains both simplified usage (default upsert) and explicit operation types, along with the atomic nature of batch operations and the 100-operation limit per partition key.
…s/spark/TransactionalBatchWriter.scala

Co-authored-by: Copilot <[email protected]>
…s/spark/CosmosItemsDataSource.scala

Co-authored-by: Copilot <[email protected]>
…al partition keys

Added toJson() method to PartitionKeyDefinition to enable JSON serialization for Spark broadcast variables, since the class contains complex nested objects that cannot be directly serialized. The Spark connector broadcasts partition key definitions from driver to executors, requiring this serialization capability. Also updated TransactionalBatchWriter to use PartitionKeyBuilder with pattern matching for correct hierarchical partition key construction, fixing Tuple2 type errors in the previous size-based approach.
@TheovanKraay TheovanKraay force-pushed the spark-transactional-batch-support branch from 0c3801e to 6b29c9e Compare December 4, 2025 21:16
* - The partition key column name must match the container's partition key path (e.g., "pk" if the path is "/pk").
* - The "id" column (String) is required.
* - An optional "operationType" column (String) can be provided to specify the operation ("create", "replace", "upsert", "delete", "read") for each row.
* If not provided, the default operation is "upsert".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

etag pre-condition should be taken into account as well if specified in input DF?

*
* Output DataFrame schema:
* - id: String
* - partition key column: String (name matches container's partition key path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need an output DF? if any transaction fails I think we should fail the entire operation? Otherwise too much complexity leaks into user code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought for observability? Users can see which partition keys had successful batch commits, useful for audit, etc. But you are right, the entire Spark job fails if any transaction fails, so I guess there's no partial success handling to be done in user code. We should just return Unit instead?

val defaultOperationExtraction = createBatchOperationExtraction(df, partitionKeyDefinition)

// Execute the batch with the pre-initialized client states
batchWriter.writeTransactionalBatchWithClientStates(df.rdd, defaultOperationExtraction, (clientMetadataBroadcast, partitionKeyDefBroadcast))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you have to ensure spark partitioning by PK - and also order by it to have a rpredictabel order (ensure change in pk means you can trigger flushing to backend)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am removing broadcast per Annie, but we can discuss the order by as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordering within each partition is already handled by grouping operations by partition key value in TransactionalBatchPartitionExecutor, but I guess you mean explicit repartitioning like df.repartition($"partitionKeyColumn") is needed to make batching predictable, right?


// Group operations by partition key
// Use the full partition key values sequence as the key
private val operationsByPartitionKey: mutable.Map[Seq[Any], mutable.ArrayBuffer[BatchOperation]] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything you do in a map requires pushign the entire df into memory - this is not going to work at scale. So, the whole grouping by PK etc. needs to be pushed into the DF - so, spark can distribute the work and do the partitioning (to ensure all docs with same pk value are even on teh same executor) and then grouping/ordering. That way you can simply look at row-by-row - and any change in PK-value means you need to send the transactional batch. Also you will need a construct liek in BulkWriter.scala that first separates the incoming data by physcial partition - to allow each TransactionalWriter to send tranasactional batches to all physical partitions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us discuss this in a call - everythign else looks liek a good start - but gettign this rgiht is crucial to make this work at scale and being efficient (which emans every Spark partition alone needs to be bale to fully saturate the CDB backedn - across all partitions)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also retries need to be added here - 408 will be very common for example

…d-buffer with streaming iterator to handle large datasets at scale. Add a new test to check that deliberately interleaved operations across 3 partition keys are grouped properly (reducing transitions from 6 to 2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants