Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.41.0-beta.1 (Unreleased)

#### Features Added
* Added support `spark.cosmos.write.strategy` value `ItemPatchIfExists` which allows gracefully ignoring documents/patch-instructions when the document does not exist (anymore). - See [46759](https://github.com/Azure/azure-sdk-for-java/pull/46759)

#### Breaking Changes

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private class BulkWriter

private val operationContext = initializeOperationContext()
private val cosmosPatchHelperOpt = writeConfig.itemWriteStrategy match {
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemBulkUpdate =>
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists | ItemWriteStrategy.ItemBulkUpdate =>
Some(new CosmosPatchHelper(diagnosticsConfig, writeConfig.patchConfigs.get))
case _ => None
}
Expand Down Expand Up @@ -757,7 +757,12 @@ private class BulkWriter
case _ => new CosmosBulkItemRequestOptions()
},
operationContext)
case ItemWriteStrategy.ItemPatch => getPatchItemOperation(operationContext.itemId, partitionKeyValue, partitionKeyDefinition, objectNode, operationContext)
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists => getPatchItemOperation(
operationContext.itemId,
partitionKeyValue,
partitionKeyDefinition,
objectNode,
operationContext)
case _ =>
throw new RuntimeException(s"${writeConfig.itemWriteStrategy} not supported")
}
Expand Down Expand Up @@ -1272,6 +1277,7 @@ private class BulkWriter
private def shouldIgnore(statusCode: Int, subStatusCode: Int): Boolean = {
val returnValue = writeConfig.itemWriteStrategy match {
case ItemWriteStrategy.ItemAppend => Exceptions.isResourceExistsException(statusCode)
case ItemWriteStrategy.ItemPatchIfExists => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode)
case ItemWriteStrategy.ItemDelete => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode)
case ItemWriteStrategy.ItemDeleteIfNotModified => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode) ||
Exceptions.isPreconditionFailedException(statusCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,10 @@ private[spark] object DiagnosticsConfig {

val azureMonitorAuthEnabledOpt = CosmosConfigEntry.parse(cfg, diagnosticsAzureMonitorAuthEnabled)
val azureMonitorAuthTypeOpt: Option[CosmosAuthType] =
if (withAzMon && azureMonitorAuthEnabledOpt.getOrElse(false)) {
if (withAzMon
&& azureMonitorAuthEnabledOpt.getOrElse(false)
&& (!effectiveAzMonConnectionStringOpt.getOrElse("").isBlank || azureMonitorAuthEnabledOpt.getOrElse(false))) {

CosmosConfigEntry
.parse(cfg, diagnosticsAzureMonitorAuthType)
} else {
Expand Down Expand Up @@ -1384,7 +1387,10 @@ private[spark] object DiagnosticsConfig {
None
}

val azMonConfig = if (withAzMon) {
val azMonConfig = if (withAzMon
&& azureMonitorAuthEnabledOpt.getOrElse(false)
&& (!effectiveAzMonConnectionStringOpt.getOrElse("").isBlank || azureMonitorAuthEnabledOpt.getOrElse(false))) {

val azMonConfigCandidate =
AzureMonitorConfig(
azureMonitorEnabledOpt.get,
Expand Down Expand Up @@ -1416,7 +1422,7 @@ private[spark] object DiagnosticsConfig {

private object ItemWriteStrategy extends Enumeration {
type ItemWriteStrategy = Value
val ItemOverwrite, ItemAppend, ItemDelete, ItemDeleteIfNotModified, ItemOverwriteIfNotModified, ItemPatch, ItemBulkUpdate = Value
val ItemOverwrite, ItemAppend, ItemDelete, ItemDeleteIfNotModified, ItemOverwriteIfNotModified, ItemPatch, ItemPatchIfExists, ItemBulkUpdate = Value
}

private object CosmosPatchOperationTypes extends Enumeration {
Expand Down Expand Up @@ -1753,7 +1759,7 @@ private object CosmosWriteConfig {
assert(maxRetryCountOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteMaxRetryCount}' is missing.")

itemWriteStrategyOpt.get match {
case ItemWriteStrategy.ItemPatch =>
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists =>
val patchColumnConfigMap = parsePatchColumnConfigs(cfg, inputSchema)
val patchFilter = CosmosConfigEntry.parse(cfg, patchFilterPredicate)
patchConfigsOpt = Some(CosmosPatchConfigs(patchColumnConfigMap, patchFilter))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private class PointWriter(container: CosmosAsyncContainer,
"PointWriter")

private val cosmosPatchHelpOpt = cosmosWriteConfig.itemWriteStrategy match {
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemBulkUpdate =>
case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists | ItemWriteStrategy.ItemBulkUpdate =>
Some(new CosmosPatchHelper(diagnosticsConfig, cosmosWriteConfig.patchConfigs.get))
case _ => None
}
Expand All @@ -93,7 +93,9 @@ private class PointWriter(container: CosmosAsyncContainer,
case ItemWriteStrategy.ItemDeleteIfNotModified =>
deleteWithRetryAsync(partitionKeyValue, objectNode, onlyIfNotModified=true)
case ItemWriteStrategy.ItemPatch =>
patchWithRetryAsync(partitionKeyValue, objectNode)
patchWithRetryAsync(partitionKeyValue, objectNode, false)
case ItemWriteStrategy.ItemPatchIfExists =>
patchWithRetryAsync(partitionKeyValue, objectNode, true)
case ItemWriteStrategy.ItemBulkUpdate =>
patchBulkUpdateWithRetry(partitionKeyValue, objectNode)
}
Expand Down Expand Up @@ -201,14 +203,15 @@ private class PointWriter(container: CosmosAsyncContainer,
}

private def patchWithRetryAsync(partitionKeyValue: PartitionKey,
objectNode: ObjectNode): Unit = {
objectNode: ObjectNode,
ignoreNotFound: Boolean): Unit = {
val promise = Promise[Unit]()
pendingPointWrites.put(promise.future, true)

val patchOperation = PatchOperation(taskDiagnosticsContext,
CosmosItemIdentifier(objectNode.get(CosmosConstants.Properties.Id).asText(), partitionKeyValue))

executeAsync(() => patchWithRetry(partitionKeyValue, objectNode, patchOperation))
executeAsync(() => patchWithRetry(partitionKeyValue, objectNode, patchOperation, ignoreNotFound))
.onComplete {
case Success(_) =>
promise.success(Unit)
Expand Down Expand Up @@ -344,7 +347,8 @@ private class PointWriter(container: CosmosAsyncContainer,
// scalastyle:on multiple.string.literals
private def patchWithRetry(partitionKeyValue: PartitionKey,
objectNode: ObjectNode,
patchOperation: PatchOperation): Unit = {
patchOperation: PatchOperation,
ignoreNotFound: Boolean): Unit = {

var exceptionOpt = Option.empty[Exception]

Expand All @@ -362,6 +366,15 @@ private class PointWriter(container: CosmosAsyncContainer,

return
} catch {
case e: CosmosException if ignoreNotFound && Exceptions.isNotFoundExceptionCore(e.getStatusCode, e.getSubStatusCode) =>
log.logItemWriteSkipped(patchOperation, "notFound")
outputMetricsPublisher.trackWriteOperation(
0,
Option.apply(e.getDiagnostics) match {
case Some(diagnostics) => Option.apply(diagnostics.getDiagnosticsContext)
case None => None
})
return
case e: CosmosException if Exceptions.canBeTransientFailure(e.getStatusCode, e.getSubStatusCode) =>
log.logWarning(
s"patch item $patchOperation attempt #$attempt max remaining retries "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,93 @@ class SparkE2EWriteITest
quark.get("id").asText() shouldEqual "Quark"
quark.get("car").get("carType").asText() shouldEqual "X5"
}

it should s"support patch and skip non-existing records with bulkEnabled = $bulkEnabled defaultOperationType = $patchDefaultOperationType columnConfigString = $patchColumnConfigString patchConditionFilter = $patchConditionFilter " in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
)

val cfgPatch = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.write.strategy" -> ItemWriteStrategy.ItemPatchIfExists.toString,
"spark.cosmos.write.bulk.enabled" -> bulkEnabled.toString,
"spark.cosmos.write.patch.defaultOperationType" -> patchDefaultOperationType.toString,
"spark.cosmos.write.patch.columnConfigs" -> patchColumnConfigString
)

val newSpark = getSpark

// scalastyle:off underscore.import
// scalastyle:off import.grouping
import spark.implicits._
val spark = newSpark
// scalastyle:on underscore.import
// scalastyle:on import.grouping

val dfWithJson = Seq(
("Quark", "Quark", "Red", 1.0 / 2, "", "{ \"manufacturer\": \"BMW\", \"carType\": \"X3\" }")
).toDF("particle name", "id", "color", "spin", "empty", "childNodeJson")

val df = dfWithJson
.withColumn("car", from_json(col("childNodeJson"), StructType(Array(StructField("manufacturer", StringType, nullable = true), StructField("carType", StringType, nullable = true)))))
.drop("childNodeJson")
df.show(false)
df.write.format("cosmos.oltp").mode("Append").options(cfg).save()

// verify data is written
// wait for a second to allow replication is completed.
Thread.sleep(1000)

// the item with the same id/pk will be persisted based on the upsert config
var quarks = queryItems("SELECT * FROM r where r.id = 'Quark'").toArray
quarks should have size 1

var quark = quarks(0)
quark.get("particle name").asText() shouldEqual "Quark"
quark.get("id").asText() shouldEqual "Quark"
quark.get("car").get("carType").asText() shouldEqual "X3"

val patchDf = if (patchColumnConfigString.endsWith(".rawJson]")) {
Seq(("Quark", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"), ("NonExistingId", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"))
.toDF("id", "car")
} else {
Seq(("Quark", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"), ("NonExistingId", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"))
.toDF("id", "childNodeJson")
.withColumn("car", from_json(col("childNodeJson"), StructType(Array(StructField("manufacturer", StringType, nullable = true), StructField("carType", StringType, nullable = true)))))
.drop("childNodeJson")
}

logInfo(s"Schema of patchDf: ${patchDf.schema}")

patchDf.write.format("cosmos.oltp").mode("Append").options(cfgPatch).save()

// verify data is written
// wait for a second to allow replication is completed.
Thread.sleep(1000)

// the item with the same id/pk will be persisted based on the upsert config
quarks = queryItems("SELECT * FROM r where r.id = 'NonExistingId'").toArray
quarks should have size 0


quarks = queryItems("SELECT * FROM r where r.id = 'Quark'").toArray
quarks should have size 1

logInfo(s"JSON returned from query: ${quarks(0)}")

quark = quarks(0)
quark.get("particle name").asText() shouldEqual "Quark"
quark.get("id").asText() shouldEqual "Quark"
quark.get("car").get("carType").asText() shouldEqual "X5"
}
}

private case class PatchBulkUpdateParameterTest(bulkEnabled: Boolean, patchColumnConfigString: String)
Expand Down