Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.41.0-beta.1 (Unreleased)

#### Features Added
* Added support `spark.cosmos.write.strategy` value `ItemPatchIfExists` which allows gracefully ignoring documents/patch-instructions when the document does not exist (anymore). - See [47034](https://github.com/Azure/azure-sdk-for-java/pull/47034)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.41.0-beta.1 (Unreleased)

#### Features Added
* Added support `spark.cosmos.write.strategy` value `ItemPatchIfExists` which allows gracefully ignoring documents/patch-instructions when the document does not exist (anymore). - See [47034](https://github.com/Azure/azure-sdk-for-java/pull/47034)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.41.0-beta.1 (Unreleased)

#### Features Added
* Added support `spark.cosmos.write.strategy` value `ItemPatchIfExists` which allows gracefully ignoring documents/patch-instructions when the document does not exist (anymore). - See [47034](https://github.com/Azure/azure-sdk-for-java/pull/47034)

#### Breaking Changes

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private class BulkWriter

private val operationContext = initializeOperationContext()
private val cosmosPatchHelperOpt = writeConfig.itemWriteStrategy match {
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemBulkUpdate =>
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists | ItemWriteStrategy.ItemBulkUpdate =>
Some(new CosmosPatchHelper(diagnosticsConfig, writeConfig.patchConfigs.get))
case _ => None
}
Expand Down Expand Up @@ -757,7 +757,12 @@ private class BulkWriter
case _ => new CosmosBulkItemRequestOptions()
},
operationContext)
case ItemWriteStrategy.ItemPatch => getPatchItemOperation(operationContext.itemId, partitionKeyValue, partitionKeyDefinition, objectNode, operationContext)
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists => getPatchItemOperation(
operationContext.itemId,
partitionKeyValue,
partitionKeyDefinition,
objectNode,
operationContext)
case _ =>
throw new RuntimeException(s"${writeConfig.itemWriteStrategy} not supported")
}
Expand Down Expand Up @@ -1272,6 +1277,7 @@ private class BulkWriter
private def shouldIgnore(statusCode: Int, subStatusCode: Int): Boolean = {
val returnValue = writeConfig.itemWriteStrategy match {
case ItemWriteStrategy.ItemAppend => Exceptions.isResourceExistsException(statusCode)
case ItemWriteStrategy.ItemPatchIfExists => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode)
case ItemWriteStrategy.ItemDelete => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode)
case ItemWriteStrategy.ItemDeleteIfNotModified => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode) ||
Exceptions.isPreconditionFailedException(statusCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ private[spark] object DiagnosticsConfig {

private object ItemWriteStrategy extends Enumeration {
type ItemWriteStrategy = Value
val ItemOverwrite, ItemAppend, ItemDelete, ItemDeleteIfNotModified, ItemOverwriteIfNotModified, ItemPatch, ItemBulkUpdate = Value
val ItemOverwrite, ItemAppend, ItemDelete, ItemDeleteIfNotModified, ItemOverwriteIfNotModified, ItemPatch, ItemPatchIfExists, ItemBulkUpdate = Value
}

private object CosmosPatchOperationTypes extends Enumeration {
Expand Down Expand Up @@ -1753,7 +1753,7 @@ private object CosmosWriteConfig {
assert(maxRetryCountOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteMaxRetryCount}' is missing.")

itemWriteStrategyOpt.get match {
case ItemWriteStrategy.ItemPatch =>
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists =>
val patchColumnConfigMap = parsePatchColumnConfigs(cfg, inputSchema)
val patchFilter = CosmosConfigEntry.parse(cfg, patchFilterPredicate)
patchConfigsOpt = Some(CosmosPatchConfigs(patchColumnConfigMap, patchFilter))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private class PointWriter(container: CosmosAsyncContainer,
"PointWriter")

private val cosmosPatchHelpOpt = cosmosWriteConfig.itemWriteStrategy match {
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemBulkUpdate =>
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists | ItemWriteStrategy.ItemBulkUpdate =>
Some(new CosmosPatchHelper(diagnosticsConfig, cosmosWriteConfig.patchConfigs.get))
case _ => None
}
Expand All @@ -93,7 +93,9 @@ private class PointWriter(container: CosmosAsyncContainer,
case ItemWriteStrategy.ItemDeleteIfNotModified =>
deleteWithRetryAsync(partitionKeyValue, objectNode, onlyIfNotModified=true)
case ItemWriteStrategy.ItemPatch =>
patchWithRetryAsync(partitionKeyValue, objectNode)
patchWithRetryAsync(partitionKeyValue, objectNode, false)
case ItemWriteStrategy.ItemPatchIfExists =>
patchWithRetryAsync(partitionKeyValue, objectNode, true)
case ItemWriteStrategy.ItemBulkUpdate =>
patchBulkUpdateWithRetry(partitionKeyValue, objectNode)
}
Expand Down Expand Up @@ -201,14 +203,15 @@ private class PointWriter(container: CosmosAsyncContainer,
}

private def patchWithRetryAsync(partitionKeyValue: PartitionKey,
objectNode: ObjectNode): Unit = {
objectNode: ObjectNode,
ignoreNotFound: Boolean): Unit = {
val promise = Promise[Unit]()
pendingPointWrites.put(promise.future, true)

val patchOperation = PatchOperation(taskDiagnosticsContext,
CosmosItemIdentifier(objectNode.get(CosmosConstants.Properties.Id).asText(), partitionKeyValue))

executeAsync(() => patchWithRetry(partitionKeyValue, objectNode, patchOperation))
executeAsync(() => patchWithRetry(partitionKeyValue, objectNode, patchOperation, ignoreNotFound))
.onComplete {
case Success(_) =>
promise.success(Unit)
Expand Down Expand Up @@ -344,7 +347,8 @@ private class PointWriter(container: CosmosAsyncContainer,
// scalastyle:on multiple.string.literals
private def patchWithRetry(partitionKeyValue: PartitionKey,
objectNode: ObjectNode,
patchOperation: PatchOperation): Unit = {
patchOperation: PatchOperation,
ignoreNotFound: Boolean): Unit = {

var exceptionOpt = Option.empty[Exception]

Expand All @@ -362,6 +366,15 @@ private class PointWriter(container: CosmosAsyncContainer,

return
} catch {
case e: CosmosException if ignoreNotFound && Exceptions.isNotFoundExceptionCore(e.getStatusCode, e.getSubStatusCode) =>
log.logItemWriteSkipped(patchOperation, "notFound")
outputMetricsPublisher.trackWriteOperation(
0,
Option.apply(e.getDiagnostics) match {
case Some(diagnostics) => Option.apply(diagnostics.getDiagnosticsContext)
case None => None
})
return
case e: CosmosException if Exceptions.canBeTransientFailure(e.getStatusCode, e.getSubStatusCode) =>
log.logWarning(
s"patch item $patchOperation attempt #$attempt max remaining retries "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,93 @@ class SparkE2EWriteITest
quark.get("id").asText() shouldEqual "Quark"
quark.get("car").get("carType").asText() shouldEqual "X5"
}

it should s"support patch and skip non-existing records with bulkEnabled = $bulkEnabled defaultOperationType = $patchDefaultOperationType columnConfigString = $patchColumnConfigString patchConditionFilter = $patchConditionFilter " in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
)

val cfgPatch = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.write.strategy" -> ItemWriteStrategy.ItemPatchIfExists.toString,
"spark.cosmos.write.bulk.enabled" -> bulkEnabled.toString,
"spark.cosmos.write.patch.defaultOperationType" -> patchDefaultOperationType.toString,
"spark.cosmos.write.patch.columnConfigs" -> patchColumnConfigString
)

val newSpark = getSpark

// scalastyle:off underscore.import
// scalastyle:off import.grouping
import spark.implicits._
val spark = newSpark
// scalastyle:on underscore.import
// scalastyle:on import.grouping

val dfWithJson = Seq(
("Quark", "Quark", "Red", 1.0 / 2, "", "{ \"manufacturer\": \"BMW\", \"carType\": \"X3\" }")
).toDF("particle name", "id", "color", "spin", "empty", "childNodeJson")

val df = dfWithJson
.withColumn("car", from_json(col("childNodeJson"), StructType(Array(StructField("manufacturer", StringType, nullable = true), StructField("carType", StringType, nullable = true)))))
.drop("childNodeJson")
df.show(false)
df.write.format("cosmos.oltp").mode("Append").options(cfg).save()

// verify data is written
// wait for a second to allow replication is completed.
Thread.sleep(1000)

// the item with the same id/pk will be persisted based on the upsert config
var quarks = queryItems("SELECT * FROM r where r.id = 'Quark'").toArray
quarks should have size 1

var quark = quarks(0)
quark.get("particle name").asText() shouldEqual "Quark"
quark.get("id").asText() shouldEqual "Quark"
quark.get("car").get("carType").asText() shouldEqual "X3"

val patchDf = if (patchColumnConfigString.endsWith(".rawJson]")) {
Seq(("Quark", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"), ("NonExistingId", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"))
.toDF("id", "car")
} else {
Seq(("Quark", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"), ("NonExistingId", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"))
.toDF("id", "childNodeJson")
.withColumn("car", from_json(col("childNodeJson"), StructType(Array(StructField("manufacturer", StringType, nullable = true), StructField("carType", StringType, nullable = true)))))
.drop("childNodeJson")
}

logInfo(s"Schema of patchDf: ${patchDf.schema}")

patchDf.write.format("cosmos.oltp").mode("Append").options(cfgPatch).save()

// verify data is written
// wait for a second to allow replication is completed.
Thread.sleep(1000)

// the item with the same id/pk will be persisted based on the upsert config
quarks = queryItems("SELECT * FROM r where r.id = 'NonExistingId'").toArray
quarks should have size 0


quarks = queryItems("SELECT * FROM r where r.id = 'Quark'").toArray
quarks should have size 1

logInfo(s"JSON returned from query: ${quarks(0)}")

quark = quarks(0)
quark.get("particle name").asText() shouldEqual "Quark"
quark.get("id").asText() shouldEqual "Quark"
quark.get("car").get("carType").asText() shouldEqual "X5"
}
}

private case class PatchBulkUpdateParameterTest(bulkEnabled: Boolean, patchColumnConfigString: String)
Expand Down
Loading