diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index fe5015ab9232..82322d973899 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.41.0-beta.1 (Unreleased) #### Features Added +* Added support `spark.cosmos.write.strategy` value `ItemPatchIfExists` which allows gracefully ignoring documents/patch-instructions when the document does not exist (anymore). - See [47034](https://github.com/Azure/azure-sdk-for-java/pull/47034) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index aa49bc357979..01219ca548ad 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.41.0-beta.1 (Unreleased) #### Features Added +* Added support `spark.cosmos.write.strategy` value `ItemPatchIfExists` which allows gracefully ignoring documents/patch-instructions when the document does not exist (anymore). - See [47034](https://github.com/Azure/azure-sdk-for-java/pull/47034) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md index 48735b5a0722..7eb2e1801770 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.41.0-beta.1 (Unreleased) #### Features Added +* Added support `spark.cosmos.write.strategy` value `ItemPatchIfExists` which allows gracefully ignoring documents/patch-instructions when the document does not exist (anymore). - See [47034](https://github.com/Azure/azure-sdk-for-java/pull/47034) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md index 30738831644f..02abc36e7911 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md @@ -59,18 +59,18 @@ | `spark.cosmos.metadata.feedRange.refreshIntervalInSeconds` | `120` | The time interval in seconds to refresh the internal partition key range cache, valid between `[60, 1800]`. By default it is 120 seconds. | ### Write Config -| Config Property Name | Default | Description | -|:----------------------------------------------------------------|:----------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy:
- `ItemOverwrite` (using upsert),
- `ItemOverwriteIfNotModified` (if etag property of the row is empty/null it will just do an insert and ignore if the document already exists - same as `ItemAppend`, if an etag value exists it will attempt to replace the document with etag pre-condition. If the document changed - identified by precondition failure - the update is skipped and the document is not updated with the content of the data frame row),
- `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts),
- `ItemDelete` (delete all documents),
- `ItemDeleteIfNotModified` (delete all documents for which the etag hasn't changed),
- `ItemPatch` (Partial update all documents based on the patch config),
- `ItemBulkUpdate` (read item, then patch the item locally, then using create if etag is empty, update/replace with etag pre-condition. In cases of any conflict or precondition failure, SDK will retry the above steps to update the documents properly.) | -| `spark.cosmos.write.maxRetryCount` | `10` | Cosmos DB Write Max Retry Attempts on retriable failures (e.g., connection error, moderakh add more details) | -| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size | -| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` | -| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled | -| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). | -| `spark.cosmos.write.bulk.initialBatchSize` | `100` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume too many RUs. | -| `spark.cosmos.write.bulk.maxBatchSize` | `100` | Cosmos DB max. bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the max. micro batch size is 100. Use this setting only when migrating Spark 2.4 workloads - for other scenarios relying on the auto-tuning combined with throughput control will result in better experience. | -| `spark.cosmos.write.flush.noProgress.maxIntervalInSeconds` | `180` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes before forcing a retry. The retry will reinitialize the bulk write process - so, any delays on the retry can be sure to be actual service issues. The default value of 3 min should be sufficient to prevent flas negatives when there is a short service-side write unavailability - like for partition splits or merges. Increase it only if you regularly see these transient errors to exceed a time period of 180 seconds. | -| `spark.cosmos.write.flush.noProgress.maxRetryIntervalInSeconds` | `2700` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes after the initial attempt (and restarting the bulk writer client-side). This time interval is supposed to be large enough to not fail Spark jobs even when there are transient write availability outages in the service. The default value of 45 minutes can be modified when you rather prefer Spark jobs to fail or extended when needed. | +| Config Property Name | Default | Description | +|:----------------------------------------------------------------|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy:
- `ItemOverwrite` (using upsert),
- `ItemOverwriteIfNotModified` (if etag property of the row is empty/null it will just do an insert and ignore if the document already exists - same as `ItemAppend`, if an etag value exists it will attempt to replace the document with etag pre-condition. If the document changed - identified by precondition failure - the update is skipped and the document is not updated with the content of the data frame row),
- `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts),
- `ItemDelete` (delete all documents),
- `ItemDeleteIfNotModified` (delete all documents for which the etag hasn't changed),
- `ItemPatch` and `ItemPatchIfExists` (Partial update all documents based on the patch config, `ItemPatch` will fail the spark job when hitting 404/Not Found - while `ItemPatchIfExists` will skip documents that don't exist gracefully.),
- `ItemBulkUpdate` (read item, then patch the item locally, then using create if etag is empty, update/replace with etag pre-condition. In cases of any conflict or precondition failure, SDK will retry the above steps to update the documents properly.) | +| `spark.cosmos.write.maxRetryCount` | `10` | Cosmos DB Write Max Retry Attempts on retriable failures (e.g., connection error, moderakh add more details) | +| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size | +| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` | +| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled | +| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). | +| `spark.cosmos.write.bulk.initialBatchSize` | `100` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume too many RUs. | +| `spark.cosmos.write.bulk.maxBatchSize` | `100` | Cosmos DB max. bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the max. micro batch size is 100. Use this setting only when migrating Spark 2.4 workloads - for other scenarios relying on the auto-tuning combined with throughput control will result in better experience. | +| `spark.cosmos.write.flush.noProgress.maxIntervalInSeconds` | `180` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes before forcing a retry. The retry will reinitialize the bulk write process - so, any delays on the retry can be sure to be actual service issues. The default value of 3 min should be sufficient to prevent flas negatives when there is a short service-side write unavailability - like for partition splits or merges. Increase it only if you regularly see these transient errors to exceed a time period of 180 seconds. | +| `spark.cosmos.write.flush.noProgress.maxRetryIntervalInSeconds` | `2700` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes after the initial attempt (and restarting the bulk writer client-side). This time interval is supposed to be large enough to not fail Spark jobs even when there are transient write availability outages in the service. The default value of 45 minutes can be modified when you rather prefer Spark jobs to fail or extended when needed. | #### Patch Config | Config Property Name | Default | Description | diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala index fd923684b059..aec77f8762e8 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala @@ -172,7 +172,7 @@ private class BulkWriter private val operationContext = initializeOperationContext() private val cosmosPatchHelperOpt = writeConfig.itemWriteStrategy match { - case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemBulkUpdate => + case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists | ItemWriteStrategy.ItemBulkUpdate => Some(new CosmosPatchHelper(diagnosticsConfig, writeConfig.patchConfigs.get)) case _ => None } @@ -757,7 +757,12 @@ private class BulkWriter case _ => new CosmosBulkItemRequestOptions() }, operationContext) - case ItemWriteStrategy.ItemPatch => getPatchItemOperation(operationContext.itemId, partitionKeyValue, partitionKeyDefinition, objectNode, operationContext) + case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists => getPatchItemOperation( + operationContext.itemId, + partitionKeyValue, + partitionKeyDefinition, + objectNode, + operationContext) case _ => throw new RuntimeException(s"${writeConfig.itemWriteStrategy} not supported") } @@ -1272,6 +1277,7 @@ private class BulkWriter private def shouldIgnore(statusCode: Int, subStatusCode: Int): Boolean = { val returnValue = writeConfig.itemWriteStrategy match { case ItemWriteStrategy.ItemAppend => Exceptions.isResourceExistsException(statusCode) + case ItemWriteStrategy.ItemPatchIfExists => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode) case ItemWriteStrategy.ItemDelete => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode) case ItemWriteStrategy.ItemDeleteIfNotModified => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode) || Exceptions.isPreconditionFailedException(statusCode) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index d7fec9269904..15b0b0e8d439 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -1416,7 +1416,7 @@ private[spark] object DiagnosticsConfig { private object ItemWriteStrategy extends Enumeration { type ItemWriteStrategy = Value - val ItemOverwrite, ItemAppend, ItemDelete, ItemDeleteIfNotModified, ItemOverwriteIfNotModified, ItemPatch, ItemBulkUpdate = Value + val ItemOverwrite, ItemAppend, ItemDelete, ItemDeleteIfNotModified, ItemOverwriteIfNotModified, ItemPatch, ItemPatchIfExists, ItemBulkUpdate = Value } private object CosmosPatchOperationTypes extends Enumeration { @@ -1753,7 +1753,7 @@ private object CosmosWriteConfig { assert(maxRetryCountOpt.isDefined, s"Parameter '${CosmosConfigNames.WriteMaxRetryCount}' is missing.") itemWriteStrategyOpt.get match { - case ItemWriteStrategy.ItemPatch => + case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists => val patchColumnConfigMap = parsePatchColumnConfigs(cfg, inputSchema) val patchFilter = CosmosConfigEntry.parse(cfg, patchFilterPredicate) patchConfigsOpt = Some(CosmosPatchConfigs(patchColumnConfigMap, patchFilter)) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala index c2840c25fdac..45d45e033e53 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala @@ -70,7 +70,7 @@ private class PointWriter(container: CosmosAsyncContainer, "PointWriter") private val cosmosPatchHelpOpt = cosmosWriteConfig.itemWriteStrategy match { - case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemBulkUpdate => + case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemPatchIfExists | ItemWriteStrategy.ItemBulkUpdate => Some(new CosmosPatchHelper(diagnosticsConfig, cosmosWriteConfig.patchConfigs.get)) case _ => None } @@ -93,7 +93,9 @@ private class PointWriter(container: CosmosAsyncContainer, case ItemWriteStrategy.ItemDeleteIfNotModified => deleteWithRetryAsync(partitionKeyValue, objectNode, onlyIfNotModified=true) case ItemWriteStrategy.ItemPatch => - patchWithRetryAsync(partitionKeyValue, objectNode) + patchWithRetryAsync(partitionKeyValue, objectNode, false) + case ItemWriteStrategy.ItemPatchIfExists => + patchWithRetryAsync(partitionKeyValue, objectNode, true) case ItemWriteStrategy.ItemBulkUpdate => patchBulkUpdateWithRetry(partitionKeyValue, objectNode) } @@ -201,14 +203,15 @@ private class PointWriter(container: CosmosAsyncContainer, } private def patchWithRetryAsync(partitionKeyValue: PartitionKey, - objectNode: ObjectNode): Unit = { + objectNode: ObjectNode, + ignoreNotFound: Boolean): Unit = { val promise = Promise[Unit]() pendingPointWrites.put(promise.future, true) val patchOperation = PatchOperation(taskDiagnosticsContext, CosmosItemIdentifier(objectNode.get(CosmosConstants.Properties.Id).asText(), partitionKeyValue)) - executeAsync(() => patchWithRetry(partitionKeyValue, objectNode, patchOperation)) + executeAsync(() => patchWithRetry(partitionKeyValue, objectNode, patchOperation, ignoreNotFound)) .onComplete { case Success(_) => promise.success(Unit) @@ -344,7 +347,8 @@ private class PointWriter(container: CosmosAsyncContainer, // scalastyle:on multiple.string.literals private def patchWithRetry(partitionKeyValue: PartitionKey, objectNode: ObjectNode, - patchOperation: PatchOperation): Unit = { + patchOperation: PatchOperation, + ignoreNotFound: Boolean): Unit = { var exceptionOpt = Option.empty[Exception] @@ -362,6 +366,15 @@ private class PointWriter(container: CosmosAsyncContainer, return } catch { + case e: CosmosException if ignoreNotFound && Exceptions.isNotFoundExceptionCore(e.getStatusCode, e.getSubStatusCode) => + log.logItemWriteSkipped(patchOperation, "notFound") + outputMetricsPublisher.trackWriteOperation( + 0, + Option.apply(e.getDiagnostics) match { + case Some(diagnostics) => Option.apply(diagnostics.getDiagnosticsContext) + case None => None + }) + return case e: CosmosException if Exceptions.canBeTransientFailure(e.getStatusCode, e.getSubStatusCode) => log.logWarning( s"patch item $patchOperation attempt #$attempt max remaining retries " diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala index 0cac7578e549..8de99536c613 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala @@ -593,6 +593,93 @@ class SparkE2EWriteITest quark.get("id").asText() shouldEqual "Quark" quark.get("car").get("carType").asText() shouldEqual "X5" } + + it should s"support patch and skip non-existing records with bulkEnabled = $bulkEnabled defaultOperationType = $patchDefaultOperationType columnConfigString = $patchColumnConfigString patchConditionFilter = $patchConditionFilter " in { + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + + val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + "spark.cosmos.serialization.inclusionMode" -> "NonDefault" + ) + + val cfgPatch = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + "spark.cosmos.write.strategy" -> ItemWriteStrategy.ItemPatchIfExists.toString, + "spark.cosmos.write.bulk.enabled" -> bulkEnabled.toString, + "spark.cosmos.write.patch.defaultOperationType" -> patchDefaultOperationType.toString, + "spark.cosmos.write.patch.columnConfigs" -> patchColumnConfigString + ) + + val newSpark = getSpark + + // scalastyle:off underscore.import + // scalastyle:off import.grouping + import spark.implicits._ + val spark = newSpark + // scalastyle:on underscore.import + // scalastyle:on import.grouping + + val dfWithJson = Seq( + ("Quark", "Quark", "Red", 1.0 / 2, "", "{ \"manufacturer\": \"BMW\", \"carType\": \"X3\" }") + ).toDF("particle name", "id", "color", "spin", "empty", "childNodeJson") + + val df = dfWithJson + .withColumn("car", from_json(col("childNodeJson"), StructType(Array(StructField("manufacturer", StringType, nullable = true), StructField("carType", StringType, nullable = true))))) + .drop("childNodeJson") + df.show(false) + df.write.format("cosmos.oltp").mode("Append").options(cfg).save() + + // verify data is written + // wait for a second to allow replication is completed. + Thread.sleep(1000) + + // the item with the same id/pk will be persisted based on the upsert config + var quarks = queryItems("SELECT * FROM r where r.id = 'Quark'").toArray + quarks should have size 1 + + var quark = quarks(0) + quark.get("particle name").asText() shouldEqual "Quark" + quark.get("id").asText() shouldEqual "Quark" + quark.get("car").get("carType").asText() shouldEqual "X3" + + val patchDf = if (patchColumnConfigString.endsWith(".rawJson]")) { + Seq(("Quark", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"), ("NonExistingId", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }")) + .toDF("id", "car") + } else { + Seq(("Quark", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }"), ("NonExistingId", "{ \"manufacturer\": \"BMW\", \"carType\": \"X5\" }")) + .toDF("id", "childNodeJson") + .withColumn("car", from_json(col("childNodeJson"), StructType(Array(StructField("manufacturer", StringType, nullable = true), StructField("carType", StringType, nullable = true))))) + .drop("childNodeJson") + } + + logInfo(s"Schema of patchDf: ${patchDf.schema}") + + patchDf.write.format("cosmos.oltp").mode("Append").options(cfgPatch).save() + + // verify data is written + // wait for a second to allow replication is completed. + Thread.sleep(1000) + + // the item with the same id/pk will be persisted based on the upsert config + quarks = queryItems("SELECT * FROM r where r.id = 'NonExistingId'").toArray + quarks should have size 0 + + + quarks = queryItems("SELECT * FROM r where r.id = 'Quark'").toArray + quarks should have size 1 + + logInfo(s"JSON returned from query: ${quarks(0)}") + + quark = quarks(0) + quark.get("particle name").asText() shouldEqual "Quark" + quark.get("id").asText() shouldEqual "Quark" + quark.get("car").get("carType").asText() shouldEqual "X5" + } } private case class PatchBulkUpdateParameterTest(bulkEnabled: Boolean, patchColumnConfigString: String)