Skip to content

Commit 73320ce

Browse files
Add ReadConsistencyStrategy API (Azure#45161)
* Add ReadConsistencyStrategy API * Adding test coverage * Update ClientConfigDiagnosticsTest.java * Update ConsistencyReaderTest.java * Update SessionTest.java * Update OperationPoliciesTest.java * Update OperationPoliciesTest.java * Update ReadConsistencyStrategy.java * Reacted to code review feedback * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java Co-authored-by: Abhijeet Mohanty <[email protected]> * Update ReadConsistencyStrategy.java --------- Co-authored-by: Abhijeet Mohanty <[email protected]>
1 parent dfd1058 commit 73320ce

File tree

84 files changed

+1203
-238
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+1203
-238
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private class ChangeFeedMicroBatchStream
3737
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
3838
private val sparkEnvironmentInfo = CosmosClientConfiguration.getSparkEnvironmentInfo(Some(session))
3939
private val clientConfiguration = CosmosClientConfiguration.apply(
40-
config, readConfig.forceEventualConsistency, sparkEnvironmentInfo)
40+
config, readConfig.readConsistencyStrategy, sparkEnvironmentInfo)
4141
private val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(config)
4242
private val partitioningConfig = CosmosPartitioningConfig.parseCosmosPartitioningConfig(config)
4343
private val changeFeedConfig = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(config)

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsScanBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private case class ItemsScanBuilder(session: SparkSession,
3535
private var processedPredicates : Option[AnalyzedAggregatedFilters] = Option.empty
3636
private val clientConfiguration = CosmosClientConfiguration.apply(
3737
configMap,
38-
readConfig.forceEventualConsistency,
38+
readConfig.readConsistencyStrategy,
3939
CosmosClientConfiguration.getSparkEnvironmentInfo(Some(session))
4040
)
4141
private val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(configMap)

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private class ChangeFeedMicroBatchStream
3838
private val sparkEnvironmentInfo = CosmosClientConfiguration.getSparkEnvironmentInfo(Some(session))
3939
private val clientConfiguration = CosmosClientConfiguration.apply(
4040
config,
41-
readConfig.forceEventualConsistency,
41+
readConfig.readConsistencyStrategy,
4242
sparkEnvironmentInfo)
4343
private val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(config)
4444
private val partitioningConfig = CosmosPartitioningConfig.parseCosmosPartitioningConfig(config)

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ItemsScanBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private case class ItemsScanBuilder(session: SparkSession,
3636

3737
private val clientConfiguration = CosmosClientConfiguration.apply(
3838
configMap,
39-
readConfig.forceEventualConsistency,
39+
readConfig.readConsistencyStrategy,
4040
CosmosClientConfiguration.getSparkEnvironmentInfo(Some(session))
4141
)
4242
private val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(configMap)

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private class ChangeFeedMicroBatchStream
3838
private val sparkEnvironmentInfo = CosmosClientConfiguration.getSparkEnvironmentInfo(Some(session))
3939
private val clientConfiguration = CosmosClientConfiguration.apply(
4040
config,
41-
readConfig.forceEventualConsistency,
41+
readConfig.readConsistencyStrategy,
4242
sparkEnvironmentInfo)
4343
private val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(config)
4444
private val partitioningConfig = CosmosPartitioningConfig.parseCosmosPartitioningConfig(config)

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ItemsScanBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private case class ItemsScanBuilder(session: SparkSession,
3636

3737
private val clientConfiguration = CosmosClientConfiguration.apply(
3838
configMap,
39-
readConfig.forceEventualConsistency,
39+
readConfig.readConsistencyStrategy,
4040
CosmosClientConfiguration.getSparkEnvironmentInfo(Some(session))
4141
)
4242
private val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(configMap)

sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
| `spark.cosmos.enforceNativeTransport` | `false` | Indicates whether native netty transport should be enforced. If true, on client creation an exception will be thrown if netty is not able to use native transport. The native transport is more efficient especially when there is a high number of connections. |
2929
| `spark.cosmos.http.connectionPoolSize` | `1000` | Gateway mode connection pool size |
3030
| `spark.cosmos.read.forceEventualConsistency` | `true` | Makes the client use Eventual consistency for read operations instead of using the default account level consistency |
31+
| `spark.cosmos.read.consistencyStrategy` | None | Overrides the read consistency strategy - will override `spark.cosmos.read.forceEventualConsistency` if both are specified. `EVENTUAL` means the same as setting `spark.cosmos.read.forceEventualConsistency` to `true`. `LOCAL_COMMITTED` will ensure quorum reads, `SESSION` will apply session consistency and `DEFAULT` applies the default account-level consistency. (Same as `spark.cosmos.read.forceEventualConsistency` beign `false`). This API is still in preview - currently only `DEFAULT` and `EVENTUAL` can be used in Gateway mode. |
3132
| `spark.cosmos.applicationName` | None | Application name |
3233
| `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). Please note that you can also use `spark.cosmos.preferredRegions` as alias |
3334
| `spark.cosmos.diagnostics` | None | Can be used to enable more verbose diagnostics. Currently the only supported options are `simple`, `feed` and `feed_details`. The mode `simple` will result in additional logs being emitted as `INFO` logs in the Driver and Executor logs. The mode `feed` will provide per `FeedResponse` information and `feed_details` will also add pk and id values of all items in a `FeedResponse` to the logs. Please note that any of the diagnostic modes causes the logs being emitted to increase significantly - so, there will be perf impact when enabling it. |

0 commit comments

Comments
 (0)