Skip to content

Commit 592661a

Browse files
author
annie-mac
committed
change
1 parent 1056d8f commit 592661a

File tree

4 files changed

+26
-7
lines changed

4 files changed

+26
-7
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ import java.util.Locale
1515
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
1616
import scala.collection.mutable.ArrayBuffer
1717

18+
class SparkInternalsBridge {
19+
// Only used in ChangeFeedMetricsListener, which is easier for test validation
20+
def getInternalCustomTaskMetricsAsSQLMetric(
21+
knownCosmosMetricNames: Set[String],
22+
taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
23+
SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
24+
}
25+
}
26+
1827
object SparkInternalsBridge extends BasicLoggingTrait {
1928
private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED"
2029
private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED"

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@ import java.lang.reflect.Method
1414
import java.util.Locale
1515
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
1616

17+
class SparkInternalsBridge {
18+
// Only used in ChangeFeedMetricsListener, which is easier for test validation
19+
def getInternalCustomTaskMetricsAsSQLMetric(
20+
knownCosmosMetricNames: Set[String],
21+
taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
22+
SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
23+
}
24+
}
25+
1726
object SparkInternalsBridge extends BasicLoggingTrait {
1827
private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED"
1928
private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED"

sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,13 @@ Used to influence the json serialization/deserialization behavior
112112
| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`, `AlwaysEpochMillisecondsWithSystemDefaultTimezone`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch. The behavior for `AlwaysEpochMillisecondsWithSystemDefaultTimezone` is identical with `AlwaysEpochMilliseconds` except that it will assume System default time zone / Spark session time zone (specified via `spark.sql.session.timezone`) instead of UTC when the date/time to be parsed has no explicit time zone. |
113113

114114
#### Change feed (only for Spark-Streaming using `cosmos.oltp.changeFeed` data source, which is read-only) configuration
115-
| Config Property Name | Default | Description |
116-
|:--------------------------------------------------|:-------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
117-
| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. |
118-
| `spark.cosmos.changeFeed.mode` | `Incremental/LatestVersion` | ChangeFeed mode (`Incremental/LatestVersion` or `FullFidelity/AllVersionsAndDeletes`) - NOTE: `FullFidelity/AllVersionsAndDeletes` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity/AllVersionsAndDeletes` (schema of the returned documents). It is recommended to only use `FullFidelity/AllVersionsAndDeletes` for non-production scenarios at this point. |
119-
| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None (process all available data in first micro-batch) | Approximate maximum number of items read from change feed for each micro-batch/trigger. If not set, all available data in the changefeed is going to be processed in the first micro-batch. This could overload the client-resources (especially memory), so choosing a value to cap the resource consumption in the Spark executors is advisable here. Usually a reasonable value would be at least in the 100-thousands or single-digit millions. |
120-
| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. |
115+
| Config Property Name | Default | Description |
116+
|:------------------------------------------------------------|:-------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
117+
| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. |
118+
| `spark.cosmos.changeFeed.mode` | `Incremental/LatestVersion` | ChangeFeed mode (`Incremental/LatestVersion` or `FullFidelity/AllVersionsAndDeletes`) - NOTE: `FullFidelity/AllVersionsAndDeletes` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity/AllVersionsAndDeletes` (schema of the returned documents). It is recommended to only use `FullFidelity/AllVersionsAndDeletes` for non-production scenarios at this point. |
119+
| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None (process all available data in first micro-batch) | Approximate maximum number of items read from change feed for each micro-batch/trigger. If not set, all available data in the changefeed is going to be processed in the first micro-batch. This could overload the client-resources (especially memory), so choosing a value to cap the resource consumption in the Spark executors is advisable here. Usually a reasonable value would be at least in the 100-thousands or single-digit millions. |
120+
| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. |
121+
| `spark.cosmos.changeFeed.performance.monitoring.enabled` | `true` | A Flag to indicate whether enable change feed performance monitoring. When enabled, custom task metrics will be tracked internally, which will be used to dynamically tuning the change feed micro-batch size. |
121122

122123
#### Json conversion configuration
123124
| Config Property Name | Default | Description |

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosPartitionPlannerSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ class CosmosPartitionPlannerSpec extends UnitSpec {
487487
calculate(1).endLsn.get shouldEqual 2150
488488
}
489489

490-
it should "calculateEndLsn should distribute rate based on metrics with readLimit and multiple partitions" in {
490+
it should "calculateEndLsn should distribute rate based on metrics with readLimit" in {
491491
val clientConfig = spark.CosmosClientConfiguration(
492492
UUID.randomUUID().toString,
493493
UUID.randomUUID().toString,

0 commit comments

Comments
 (0)