Skip to content

Commit 0ebfa98

Browse files
committed
Revert operation type changes
1 parent 954309b commit 0ebfa98

File tree

5 files changed

+1
-57
lines changed

5 files changed

+1
-57
lines changed

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/AsyncItemWriter.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,6 @@ private trait AsyncItemWriter {
1414
*/
1515
def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode): Unit
1616

17-
/**
18-
* Schedule a write to happen in async and return immediately with per-row operation type
19-
* @param partitionKeyValue the partition key value
20-
* @param objectNode the json object node
21-
* @param operationType optional operation type (create, upsert, replace, delete) for this specific row
22-
*/
23-
def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode, operationType: Option[String]): Unit
24-
2517
/**
2618
* Wait for all remaining work
2719
* Throws if any of the work resulted in failure

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -646,12 +646,6 @@ private class BulkWriter
646646
}
647647

648648
override def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode): Unit = {
649-
scheduleWrite(partitionKeyValue, objectNode, None)
650-
}
651-
652-
override def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode, operationType: Option[String]): Unit = {
653-
// BulkWriter doesn't support per-row operation types - it uses global ItemWriteStrategy
654-
// The operationType parameter is ignored here for interface compatibility
655649
Preconditions.checkState(!closed.get())
656650
throwIfCapturedExceptionExists()
657651

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosWriterBase.scala

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -97,23 +97,6 @@ private abstract class CosmosWriterBase(
9797
override def write(internalRow: InternalRow): Unit = {
9898
val objectNode = cosmosRowConverter.fromInternalRowToObjectNode(internalRow, inputSchema)
9999

100-
// Extract operationType if column exists (for per-row operation support)
101-
val operationType: Option[String] = if (inputSchema.fieldNames.contains("operationType")) {
102-
val opTypeIndex = inputSchema.fieldIndex("operationType")
103-
if (!internalRow.isNullAt(opTypeIndex)) {
104-
Some(internalRow.getString(opTypeIndex))
105-
} else {
106-
None
107-
}
108-
} else {
109-
None
110-
}
111-
112-
// Remove operationType from objectNode if present (don't persist to Cosmos)
113-
if (objectNode.has("operationType")) {
114-
objectNode.remove("operationType")
115-
}
116-
117100
require(objectNode.has(CosmosConstants.Properties.Id) &&
118101
objectNode.get(CosmosConstants.Properties.Id).isTextual,
119102
s"${CosmosConstants.Properties.Id} is a mandatory field. " +
@@ -127,16 +110,7 @@ private abstract class CosmosWriterBase(
127110
}
128111

129112
val partitionKeyValue = PartitionKeyHelper.getPartitionKeyPath(objectNode, partitionKeyDefinition)
130-
131-
// Call the appropriate scheduleWrite method based on whether operationType is specified
132-
operationType match {
133-
case Some(opType) =>
134-
// Per-row operation type specified - use 3-parameter method
135-
writer.get.scheduleWrite(partitionKeyValue, objectNode, Some(opType))
136-
case None =>
137-
// No per-row operation type - use 2-parameter method (global strategy)
138-
writer.get.scheduleWrite(partitionKeyValue, objectNode)
139-
}
113+
writer.get.scheduleWrite(partitionKeyValue, objectNode)
140114
}
141115

142116
override def commit(): WriterCommitMessage = {

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/PointWriter.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,6 @@ private class PointWriter(container: CosmosAsyncContainer,
7676
}
7777

7878
override def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode): Unit = {
79-
scheduleWrite(partitionKeyValue, objectNode, None)
80-
}
81-
82-
override def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode, operationType: Option[String]): Unit = {
8379
checkState(!closed.get())
8480

8581
val etag = getETag(objectNode)

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -411,18 +411,6 @@ private class TransactionalBulkWriter
411411
scheduleWriteInternal(partitionKeyValue, objectNode, operationContext)
412412
}
413413

414-
/**
415-
* Per-row operation type is not supported for transactional batches.
416-
* All operations in a transactional batch must be upserts (ItemOverwrite strategy).
417-
* This method is implemented to satisfy the AsyncItemWriter interface but throws an exception.
418-
*/
419-
override def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode, operationType: Option[String]): Unit = {
420-
throw new UnsupportedOperationException(
421-
"Per-row operation types are not supported for transactional batches. " +
422-
"All operations in a transactional batch must use ItemOverwrite (upsert) strategy. " +
423-
"Use scheduleWrite(partitionKeyValue, objectNode) instead.")
424-
}
425-
426414
private def scheduleWriteInternal(partitionKeyValue: PartitionKey,
427415
objectNode: ObjectNode,
428416
operationContext: OperationContext): Unit = {

0 commit comments

Comments
 (0)