diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index aaf60b9d783c..dbd5ddc24cf3 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -10,6 +10,7 @@ * Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451) #### Other Changes +* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320) ### 4.38.0 (2025-07-31) diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala index 4ce06382608d..74efe9ffb8a2 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala @@ -2,7 +2,9 @@ // Licensed under the MIT License. package com.azure.cosmos.spark +import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps} import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs} +import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker} import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver} import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper} import org.apache.spark.broadcast.Broadcast @@ -12,7 +14,12 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFacto import org.apache.spark.sql.types.StructType import java.time.Duration -import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +// scalastyle:off underscore.import +import scala.collection.JavaConverters._ +// scalastyle:on underscore.import // scala style rule flaky - even complaining on partial log messages // scalastyle:off multiple.string.literals @@ -57,6 +64,18 @@ private class ChangeFeedMicroBatchStream private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None + private val partitionIndex = new AtomicLong(0) + private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]()) + private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + + // Register metrics listener + if (changeFeedConfig.performanceMonitoringEnabled) { + log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener") + session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap)) + } else { + log.logInfo("ChangeFeed performance monitoring is disabled") + } + override def latestOffset(): Offset = { // For Spark data streams implementing SupportsAdmissionControl trait // latestOffset(Offset, ReadLimit) is called instead @@ -99,11 +118,15 @@ private class ChangeFeedMicroBatchStream end .inputPartitions .get - .map(partition => partition - .withContinuationState( - SparkBridgeImplementationInternal - .extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange), - clearEndLsn = false)) + .map(partition => { + val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet()) + partition + .withContinuationState( + SparkBridgeImplementationInternal + .extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange), + clearEndLsn = false) + .withIndex(index) + }) } /** @@ -150,7 +173,8 @@ private class ChangeFeedMicroBatchStream this.containerConfig, this.partitioningConfig, this.defaultParallelism, - this.container + this.container, + Some(this.partitionMetricsMap) ) if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) { diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala index 8ee9ecb84edd..0eff623f4ce2 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala @@ -6,6 +6,7 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import org.apache.spark.TaskContext +import org.apache.spark.executor.TaskMetrics import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.util.AccumulatorV2 @@ -14,6 +15,15 @@ import java.util.Locale import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.mutable.ArrayBuffer +class SparkInternalsBridge { + // Only used in ChangeFeedMetricsListener, which is easier for test validation + def getInternalCustomTaskMetricsAsSQLMetric( + knownCosmosMetricNames: Set[String], + taskMetrics: TaskMetrics) : Map[String, SQLMetric] = { + SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics) + } +} + object SparkInternalsBridge extends BasicLoggingTrait { private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED" private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED" @@ -41,20 +51,23 @@ object SparkInternalsBridge extends BasicLoggingTrait { private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed) def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]): Map[String, SQLMetric] = { + Option.apply(TaskContext.get()) match { + case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx.taskMetrics()) + case None => Map.empty[String, SQLMetric] + } + } + + def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics): Map[String, SQLMetric] = { if (!reflectionAccessAllowed.get) { Map.empty[String, SQLMetric] } else { - Option.apply(TaskContext.get()) match { - case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx) - case None => Map.empty[String, SQLMetric] - } + getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics) } } - private def getAccumulators(taskCtx: TaskContext): Option[ArrayBuffer[AccumulatorV2[_, _]]] = { + private def getAccumulators(taskMetrics: TaskMetrics): Option[ArrayBuffer[AccumulatorV2[_, _]]] = { try { - val taskMetrics: Object = taskCtx.taskMetrics() val method = Option(accumulatorsMethod.get) match { case Some(existing) => existing case None => @@ -79,8 +92,8 @@ object SparkInternalsBridge extends BasicLoggingTrait { private def getInternalCustomTaskMetricsAsSQLMetricInternal( knownCosmosMetricNames: Set[String], - taskCtx: TaskContext): Map[String, SQLMetric] = { - getAccumulators(taskCtx) match { + taskMetrics: TaskMetrics): Map[String, SQLMetric] = { + getAccumulators(taskMetrics) match { case Some(accumulators) => accumulators .filter(accumulable => accumulable.isInstanceOf[SQLMetric] && accumulable.name.isDefined diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index 7157641e8b70..4920a3e39739 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -10,6 +10,7 @@ * Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451) #### Other Changes +* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320) ### 4.38.0 (2025-07-31) diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala index f0297719d8f7..4f1ccd0e2fa0 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala @@ -2,7 +2,9 @@ // Licensed under the MIT License. package com.azure.cosmos.spark +import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps} import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs} +import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker} import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver} import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper} import org.apache.spark.broadcast.Broadcast @@ -12,7 +14,12 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFacto import org.apache.spark.sql.types.StructType import java.time.Duration -import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +// scalastyle:off underscore.import +import scala.collection.JavaConverters._ +// scalastyle:on underscore.import // scala style rule flaky - even complaining on partial log messages // scalastyle:off multiple.string.literals @@ -59,6 +66,17 @@ private class ChangeFeedMicroBatchStream private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None + private val partitionIndex = new AtomicLong(0) + private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]()) + private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + + if (changeFeedConfig.performanceMonitoringEnabled) { + log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener") + session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap)) + } else { + log.logInfo("ChangeFeed performance monitoring is disabled") + } + override def latestOffset(): Offset = { // For Spark data streams implementing SupportsAdmissionControl trait // latestOffset(Offset, ReadLimit) is called instead @@ -101,11 +119,15 @@ private class ChangeFeedMicroBatchStream end .inputPartitions .get - .map(partition => partition - .withContinuationState( - SparkBridgeImplementationInternal + .map(partition => { + val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet()) + partition + .withContinuationState( + SparkBridgeImplementationInternal .extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange), - clearEndLsn = false)) + clearEndLsn = false) + .withIndex(index) + }) } /** @@ -152,7 +174,8 @@ private class ChangeFeedMicroBatchStream this.containerConfig, this.partitioningConfig, this.defaultParallelism, - this.container + this.container, + Some(this.partitionMetricsMap) ) if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) { diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala index 3d4050c3ee59..075741a756e3 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala @@ -6,6 +6,7 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import org.apache.spark.TaskContext +import org.apache.spark.executor.TaskMetrics import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.util.AccumulatorV2 @@ -13,6 +14,15 @@ import java.lang.reflect.Method import java.util.Locale import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +class SparkInternalsBridge { + // Only used in ChangeFeedMetricsListener, which is easier for test validation + def getInternalCustomTaskMetricsAsSQLMetric( + knownCosmosMetricNames: Set[String], + taskMetrics: TaskMetrics) : Map[String, SQLMetric] = { + SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics) + } +} + object SparkInternalsBridge extends BasicLoggingTrait { private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED" private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED" @@ -40,20 +50,23 @@ object SparkInternalsBridge extends BasicLoggingTrait { private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed) def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = { + Option.apply(TaskContext.get()) match { + case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames, taskCtx.taskMetrics()) + case None => Map.empty[String, SQLMetric] + } + } + + def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics) : Map[String, SQLMetric] = { if (!reflectionAccessAllowed.get) { Map.empty[String, SQLMetric] } else { - Option.apply(TaskContext.get()) match { - case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx) - case None => Map.empty[String, SQLMetric] - } + getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics) } } - private def getAccumulators(taskCtx: TaskContext): Option[Seq[AccumulatorV2[_, _]]] = { + private def getAccumulators(taskMetrics: TaskMetrics): Option[Seq[AccumulatorV2[_, _]]] = { try { - val taskMetrics: Object = taskCtx.taskMetrics() val method = Option(accumulatorsMethod.get) match { case Some(existing) => existing case None => @@ -78,8 +91,8 @@ object SparkInternalsBridge extends BasicLoggingTrait { private def getInternalCustomTaskMetricsAsSQLMetricInternal( knownCosmosMetricNames: Set[String], - taskCtx: TaskContext): Map[String, SQLMetric] = { - getAccumulators(taskCtx) match { + taskMetrics: TaskMetrics): Map[String, SQLMetric] = { + getAccumulators(taskMetrics) match { case Some(accumulators) => accumulators .filter(accumulable => accumulable.isInstanceOf[SQLMetric] && accumulable.name.isDefined diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md index 50f5c804e184..7c5f94086e1a 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md @@ -10,6 +10,7 @@ * Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451) #### Other Changes +* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320) ### 4.38.0 (2025-07-31) diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala index cfe5916bc770..7cb0b92d19f9 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala @@ -2,7 +2,9 @@ // Licensed under the MIT License. package com.azure.cosmos.spark +import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker} import com.azure.cosmos.implementation.SparkBridgeImplementationInternal +import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps} import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver} import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper} import org.apache.spark.broadcast.Broadcast @@ -13,6 +15,12 @@ import org.apache.spark.sql.types.StructType import java.time.Duration import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +// scalastyle:off underscore.import +import scala.collection.JavaConverters._ +// scalastyle:on underscore.import // scala style rule flaky - even complaining on partial log messages // scalastyle:off multiple.string.literals @@ -59,6 +67,17 @@ private class ChangeFeedMicroBatchStream private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None + private val partitionIndex = new AtomicLong(0) + private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]()) + private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + + if (changeFeedConfig.performanceMonitoringEnabled) { + log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener") + session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap)) + } else { + log.logInfo("ChangeFeed performance monitoring is disabled") + } + override def latestOffset(): Offset = { // For Spark data streams implementing SupportsAdmissionControl trait // latestOffset(Offset, ReadLimit) is called instead @@ -101,11 +120,15 @@ private class ChangeFeedMicroBatchStream end .inputPartitions .get - .map(partition => partition - .withContinuationState( - SparkBridgeImplementationInternal + .map(partition => { + val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet()) + partition + .withContinuationState( + SparkBridgeImplementationInternal .extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange), - clearEndLsn = false)) + clearEndLsn = false) + .withIndex(index) + }) } /** @@ -152,7 +175,8 @@ private class ChangeFeedMicroBatchStream this.containerConfig, this.partitioningConfig, this.defaultParallelism, - this.container + this.container, + Some(this.partitionMetricsMap) ) if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) { diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala index 3d4050c3ee59..45d7bacef995 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala @@ -6,12 +6,21 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import org.apache.spark.TaskContext +import org.apache.spark.executor.TaskMetrics import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.util.AccumulatorV2 import java.lang.reflect.Method import java.util.Locale import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +class SparkInternalsBridge { + // Only used in ChangeFeedMetricsListener, which is easier for test validation + def getInternalCustomTaskMetricsAsSQLMetric( + knownCosmosMetricNames: Set[String], + taskMetrics: TaskMetrics) : Map[String, SQLMetric] = { + SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics) + } +} object SparkInternalsBridge extends BasicLoggingTrait { private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED" @@ -40,20 +49,23 @@ object SparkInternalsBridge extends BasicLoggingTrait { private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed) def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = { + Option.apply(TaskContext.get()) match { + case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames, taskCtx.taskMetrics()) + case None => Map.empty[String, SQLMetric] + } + } + + def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics) : Map[String, SQLMetric] = { if (!reflectionAccessAllowed.get) { Map.empty[String, SQLMetric] } else { - Option.apply(TaskContext.get()) match { - case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx) - case None => Map.empty[String, SQLMetric] - } + getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics) } } - private def getAccumulators(taskCtx: TaskContext): Option[Seq[AccumulatorV2[_, _]]] = { + private def getAccumulators(taskMetrics: TaskMetrics): Option[Seq[AccumulatorV2[_, _]]] = { try { - val taskMetrics: Object = taskCtx.taskMetrics() val method = Option(accumulatorsMethod.get) match { case Some(existing) => existing case None => @@ -78,8 +90,8 @@ object SparkInternalsBridge extends BasicLoggingTrait { private def getInternalCustomTaskMetricsAsSQLMetricInternal( knownCosmosMetricNames: Set[String], - taskCtx: TaskContext): Map[String, SQLMetric] = { - getAccumulators(taskCtx) match { + taskMetrics: TaskMetrics): Map[String, SQLMetric] = { + getAccumulators(taskMetrics) match { case Some(accumulators) => accumulators .filter(accumulable => accumulable.isInstanceOf[SQLMetric] && accumulable.name.isDefined diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md index 1a2d3cd74eca..1c44f94fe9a5 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md @@ -112,12 +112,13 @@ Used to influence the json serialization/deserialization behavior | `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`, `AlwaysEpochMillisecondsWithSystemDefaultTimezone`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch. The behavior for `AlwaysEpochMillisecondsWithSystemDefaultTimezone` is identical with `AlwaysEpochMilliseconds` except that it will assume System default time zone / Spark session time zone (specified via `spark.sql.session.timezone`) instead of UTC when the date/time to be parsed has no explicit time zone. | #### Change feed (only for Spark-Streaming using `cosmos.oltp.changeFeed` data source, which is read-only) configuration -| Config Property Name | Default | Description | -|:--------------------------------------------------|:-------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. | -| `spark.cosmos.changeFeed.mode` | `Incremental/LatestVersion` | ChangeFeed mode (`Incremental/LatestVersion` or `FullFidelity/AllVersionsAndDeletes`) - NOTE: `FullFidelity/AllVersionsAndDeletes` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity/AllVersionsAndDeletes` (schema of the returned documents). It is recommended to only use `FullFidelity/AllVersionsAndDeletes` for non-production scenarios at this point. | -| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None (process all available data in first micro-batch) | Approximate maximum number of items read from change feed for each micro-batch/trigger. If not set, all available data in the changefeed is going to be processed in the first micro-batch. This could overload the client-resources (especially memory), so choosing a value to cap the resource consumption in the Spark executors is advisable here. Usually a reasonable value would be at least in the 100-thousands or single-digit millions. | -| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. | +| Config Property Name | Default | Description | +|:------------------------------------------------------------|:-------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. | +| `spark.cosmos.changeFeed.mode` | `Incremental/LatestVersion` | ChangeFeed mode (`Incremental/LatestVersion` or `FullFidelity/AllVersionsAndDeletes`) - NOTE: `FullFidelity/AllVersionsAndDeletes` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity/AllVersionsAndDeletes` (schema of the returned documents). It is recommended to only use `FullFidelity/AllVersionsAndDeletes` for non-production scenarios at this point. | +| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None (process all available data in first micro-batch) | Approximate maximum number of items read from change feed for each micro-batch/trigger. If not set, all available data in the changefeed is going to be processed in the first micro-batch. This could overload the client-resources (especially memory), so choosing a value to cap the resource consumption in the Spark executors is advisable here. Usually a reasonable value would be at least in the 100-thousands or single-digit millions. | +| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. | +| `spark.cosmos.changeFeed.performance.monitoring.enabled` | `true` | A Flag to indicate whether enable change feed performance monitoring. When enabled, custom task metrics will be tracked internally, which will be used to dynamically tuning the change feed micro-batch size. | #### Json conversion configuration | Config Property Name | Default | Description | diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedItemsCntMetric.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedItemsCntMetric.scala new file mode 100644 index 000000000000..372fe59ecbf5 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedItemsCntMetric.scala @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.changeFeedMetrics + +import com.azure.cosmos.spark.CosmosConstants.MetricNames +import org.apache.spark.sql.connector.metric.CustomSumMetric + +/*** + * This metric is used to track the partition total items count within a change feed micro-batch + * Note: not all the items will be returned back to spark + */ +private[cosmos] class ChangeFeedItemsCntMetric extends CustomSumMetric { + + override def name(): String = MetricNames.ChangeFeedItemsCnt + + override def description(): String = MetricNames.ChangeFeedItemsCnt +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedLsnRangeMetric.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedLsnRangeMetric.scala new file mode 100644 index 000000000000..e52f1babe38c --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedLsnRangeMetric.scala @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.changeFeedMetrics + +import com.azure.cosmos.spark.CosmosConstants.MetricNames +import org.apache.spark.sql.connector.metric.CustomSumMetric + +/*** + * This metric is used to track the lsn range for partition within a change feed micro-batch + */ +private[cosmos] class ChangeFeedLsnRangeMetric extends CustomSumMetric { + + override def name(): String = MetricNames.ChangeFeedLsnRange + + override def description(): String = MetricNames.ChangeFeedLsnRange +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListener.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListener.scala new file mode 100644 index 000000000000..801b91a331ba --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListener.scala @@ -0,0 +1,73 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.changeFeedMetrics + +import com.azure.cosmos.implementation.guava25.collect.BiMap +import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait +import com.azure.cosmos.spark.{CosmosConstants, NormalizedRange, SparkInternalsBridge} +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.sql.execution.metric.SQLMetric + +import java.util.concurrent.ConcurrentHashMap + +private[cosmos] class ChangeFeedMetricsListener( + partitionIndexMap: BiMap[NormalizedRange, Long], + partitionMetricsMap: ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]) extends SparkListener with BasicLoggingTrait{ + + private val sparkInternalsBridge = new SparkInternalsBridge() + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + try { + val metrics = sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric( + Set( + CosmosConstants.MetricNames.ChangeFeedLsnRange, + CosmosConstants.MetricNames.ChangeFeedItemsCnt, + CosmosConstants.MetricNames.ChangeFeedPartitionIndex + ), + taskEnd.taskMetrics + ) + + if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedPartitionIndex)) { + val index = metrics(CosmosConstants.MetricNames.ChangeFeedPartitionIndex).value + + val normalizedRange = partitionIndexMap.inverse().get(index) + if (normalizedRange != null) { + partitionMetricsMap.putIfAbsent(normalizedRange, ChangeFeedMetricsTracker(index, normalizedRange)) + val metricsTracker = partitionMetricsMap.get(normalizedRange) + val changeFeedItemsCnt = getFetchedItemCnt(metrics) + val lsnRange = getLsnRange(metrics) + + if (changeFeedItemsCnt >= 0 && lsnRange >= 0) { + metricsTracker.track( + metrics(CosmosConstants.MetricNames.ChangeFeedLsnRange).value, + metrics(CosmosConstants.MetricNames.ChangeFeedItemsCnt).value + ) + } + + logInfo(s"onTaskEnd for partition index $index, changeFeedItemsCnt $changeFeedItemsCnt, lsnRange $lsnRange") + } + } + } catch { + // using metrics to tune the change feed micro batch is optimization + // suppress any exceptions captured + case e: Throwable => + logWarning("Tracking changeFeed metrics failed", e) + } + } + + private def getFetchedItemCnt(metrics: Map[String, SQLMetric]): Long = { + if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedItemsCnt)) { + metrics(CosmosConstants.MetricNames.ChangeFeedItemsCnt).value + } else { + -1 + } + } + + private def getLsnRange(metrics: Map[String, SQLMetric]): Long = { + if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedLsnRange)) { + metrics(CosmosConstants.MetricNames.ChangeFeedLsnRange).value + } else { + -1 + } + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala new file mode 100644 index 000000000000..81e8a137b321 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.changeFeedMetrics + +import com.azure.cosmos.implementation.guava25.collect.EvictingQueue +import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait +import com.azure.cosmos.spark.{CosmosConstants, NormalizedRange} + +// scalastyle:off underscore.import +import scala.collection.JavaConverters._ +// scalastyle:on underscore.import + +private[cosmos] object ChangeFeedMetricsTracker { + private val MetricsHistory: Int = CosmosConstants.ChangeFeedMetricsConfigs.MetricsHistory + private val MetricsHistoryDecayFactor: Double = CosmosConstants.ChangeFeedMetricsConfigs.MetricsHistoryDecayFactor + + def apply( + partitionIndex: Long, + feedRange: NormalizedRange, + metricsHistory: Int, + metricsHistoryDecayFactor: Double): ChangeFeedMetricsTracker = { + new ChangeFeedMetricsTracker(partitionIndex, feedRange, metricsHistory, metricsHistoryDecayFactor) + } + + def apply(partitionIndex: Long, feedRange: NormalizedRange): ChangeFeedMetricsTracker = { + new ChangeFeedMetricsTracker(partitionIndex, feedRange, MetricsHistory, MetricsHistoryDecayFactor) + } +} + +/** + * Tracks metrics for Change Feed operations including partition indices and LSN range + * with exponential weighting for recent metrics + */ +private[cosmos] class ChangeFeedMetricsTracker( + private val partitionIndex: Long, + private val feedRange: NormalizedRange, + private val maxHistory: Int = ChangeFeedMetricsTracker.MetricsHistory, + private val decayFactor: Double = ChangeFeedMetricsTracker.MetricsHistoryDecayFactor) extends BasicLoggingTrait { + + private val changeFeedItemsPerLsnHistory = EvictingQueue.create[Double](maxHistory) + private var currentChangeFeedItemsPerLsnOpt: Option[Double] = None + + /** + * Track the normalized change feed items per lsn + * + * @param lsnRange the lsn range of the fetched items. + * @param fetchedItemCnt the total fetched item cnt within a micro-batch for the feed range. + */ + def track(lsnRange: Long, fetchedItemCnt: Long): Unit = { + val effectiveChangesFetchedCnt = Math.max(1, fetchedItemCnt) + val changesPerLsn = if (lsnRange == 0) effectiveChangesFetchedCnt.toDouble else effectiveChangesFetchedCnt.toDouble / lsnRange + synchronized { + changeFeedItemsPerLsnHistory.add(changesPerLsn) + calculateWeightedItemsPerLsn() + } + } + + /** + * Calculates weighted normalized items per lsn where recent values have more impact + * Uses exponential weighting: weight = decayFactor^(n-i-1) where: + * n = number of measurements + * i = index of measurement (0 being oldest) + * @return Weighted changes per lsn + */ + private def calculateWeightedItemsPerLsn(): Unit = { + if (!changeFeedItemsPerLsnHistory.isEmpty) { + val histories = changeFeedItemsPerLsnHistory.asScala.toArray + val n = histories.length + var weightedSum = 0.0 + var weightSum = 0.0 + + for (i <- histories.indices) { + val weight = math.pow(decayFactor, n - i - 1) + weightedSum += histories(i) * weight + weightSum += weight + } + + currentChangeFeedItemsPerLsnOpt = Some(weightedSum / weightSum) + } + } + + /** + * Gets current weighted items per lsn + * @return weighted items per lsn + */ + def getWeightedChangeFeedItemsPerLsn: Option[Double] = { + logDebug(s"getWeightedChangeFeedItemsPerLsn for feedRangeIndex [$partitionIndex], " + + s"feedRange [$feedRange] value [$currentChangeFeedItemsPerLsnOpt]") + this.currentChangeFeedItemsPerLsnOpt + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedPartitionIndexMetric.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedPartitionIndexMetric.scala new file mode 100644 index 000000000000..1a52c5c98e4a --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedPartitionIndexMetric.scala @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.changeFeedMetrics + +import com.azure.cosmos.spark.CosmosConstants.MetricNames +import org.apache.spark.sql.connector.metric.CustomMetric + +/*** + * This metric is used to capture the cosmos partition index in a consistent way + */ +private[cosmos] class ChangeFeedPartitionIndexMetric extends CustomMetric { + + override def name(): String = MetricNames.ChangeFeedPartitionIndex + + override def description(): String = MetricNames.ChangeFeedPartitionIndex + + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + taskMetrics.mkString(",") + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala index f398c6b683fe..2f3b01b54965 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala @@ -3,13 +3,13 @@ package com.azure.cosmos.spark // scalastyle:off underscore.import -import com.azure.cosmos.implementation.{CosmosDaemonThreadFactory, UUIDs} -import com.azure.cosmos.{BridgeInternal, CosmosAsyncContainer, CosmosDiagnosticsContext, CosmosEndToEndOperationLatencyPolicyConfigBuilder, CosmosException} import com.azure.cosmos.implementation.apachecommons.lang.StringUtils import com.azure.cosmos.implementation.batch.{BatchRequestResponseConstants, BulkExecutorDiagnosticsTracker, ItemBulkOperation} +import com.azure.cosmos.implementation.{CosmosDaemonThreadFactory, UUIDs} import com.azure.cosmos.models._ import com.azure.cosmos.spark.BulkWriter.{BulkOperationFailedException, bulkWriterInputBoundedElastic, bulkWriterRequestsBoundedElastic, bulkWriterResponsesBoundedElastic, getThreadInfo, readManyBoundedElastic} import com.azure.cosmos.spark.diagnostics.DefaultDiagnostics +import com.azure.cosmos.{BridgeInternal, CosmosAsyncContainer, CosmosDiagnosticsContext, CosmosEndToEndOperationLatencyPolicyConfigBuilder, CosmosException} import reactor.core.Scannable import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler @@ -37,7 +37,6 @@ import reactor.core.scala.publisher.SMono.PimpJFlux import reactor.core.scala.publisher.{SFlux, SMono} import reactor.core.scheduler.Schedulers -import java.util.UUID import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference} import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{Semaphore, TimeUnit} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala index fd54db638632..67b12eca96df 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala @@ -13,7 +13,6 @@ import org.apache.spark.sql.types.StructType import java.nio.file.Paths import java.time.Duration -import java.util.UUID private class ChangeFeedBatch ( @@ -194,6 +193,7 @@ private class ChangeFeedBatch SparkBridgeImplementationInternal .extractChangeFeedStateForRange(initialOffsetJson, partition.feedRange), clearEndLsn = !hasBatchCheckpointLocation)) + .map(_.asInstanceOf[InputPartition]) log.logInfo(s"<-- planInputPartitions $batchId (creating ${inputPartitions.length} partitions)") inputPartitions diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala index d4c5eb5890f5..5d83a5139ef2 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala @@ -3,20 +3,22 @@ package com.azure.cosmos.spark -import com.azure.cosmos.{CosmosItemSerializer, CosmosItemSerializerNoExceptionWrapping, SparkBridgeInternal} import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple import com.azure.cosmos.implementation.{ChangeFeedSparkRowItem, ImplementationBridgeHelpers, ObjectNodeMap, SparkBridgeImplementationInternal, Strings, Utils} import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, ModelBridgeInternal, PartitionKeyDefinition} import com.azure.cosmos.spark.ChangeFeedPartitionReader.LsnPropertyName +import com.azure.cosmos.spark.CosmosConstants.MetricNames import com.azure.cosmos.spark.CosmosPredicates.requireNotNull import com.azure.cosmos.spark.CosmosTableSchemaInferrer.LsnAttributeName import com.azure.cosmos.spark.diagnostics.{DetailedFeedDiagnosticsProvider, DiagnosticsContext, DiagnosticsLoader, LoggerHelper, SparkTaskContext} +import com.azure.cosmos.{CosmosItemSerializer, CosmosItemSerializerNoExceptionWrapping, SparkBridgeInternal} import com.fasterxml.jackson.databind.node.ObjectNode import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.connector.metric.CustomTaskMetric import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.types.StructType @@ -46,8 +48,24 @@ private case class ChangeFeedPartitionReader assert(partition.continuationState.isDefined, "Argument 'partition.continuationState' must be defined here.") log.logTrace(s"Instantiated ${this.getClass.getSimpleName}") + private val startLsn = getPartitionStartLsn + + private val changeFeedLSNRangeMetric = new CustomTaskMetric { + override def name(): String = MetricNames.ChangeFeedLsnRange + override def value(): Long = getChangeFeedLSNRange + } + private val changeFeedItemsCntMetric = new CustomTaskMetric { + override def name(): String = MetricNames.ChangeFeedItemsCnt + override def value(): Long = getChangeFeedItemsCnt + } + private val changeFeedPartitionIndexMetric = new CustomTaskMetric { + override def name(): String = MetricNames.ChangeFeedPartitionIndex + override def value(): Long = if (partition.index.isDefined) partition.index.get else -1 + } + private val containerTargetConfig = CosmosContainerConfig.parseCosmosContainerConfig(config) - log.logInfo(s"Reading from feed range ${partition.feedRange} of " + + log.logInfo(s"Reading from feed range ${partition.feedRange}, startLsn $getPartitionStartLsn, " + + s"endLsn ${partition.endLsn} of " + s"container ${containerTargetConfig.database}.${containerTargetConfig.container}") private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config) private val clientCacheItem = CosmosClientCache( @@ -87,6 +105,15 @@ private case class ChangeFeedPartitionReader private val cosmosRowConverter = CosmosRowConverter.get(cosmosSerializationConfig) private val cosmosChangeFeedConfig = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(config) + + override def currentMetricsValues(): Array[CustomTaskMetric] = { + Array( + changeFeedLSNRangeMetric, + changeFeedItemsCntMetric, + changeFeedPartitionIndexMetric + ) + } + private def changeFeedItemFactoryMethod(objectNode: ObjectNode): ChangeFeedSparkRowItem = { val pkValue = partitionKeyDefinition match { case Some(pkDef) => Some(PartitionKeyHelper.getPartitionKeyPath(objectNode, pkDef)) @@ -157,10 +184,17 @@ private case class ChangeFeedPartitionReader } } + private def getPartitionStartLsn: Long = { + if (partition.continuationState.isDefined) { + SparkBridgeImplementationInternal.extractLsnFromChangeFeedContinuation(this.partition.continuationState.get) + } else { + 0 + } + } + private val changeFeedRequestOptions = { - val startLsn = - SparkBridgeImplementationInternal.extractLsnFromChangeFeedContinuation(this.partition.continuationState.get) + val startLsn = getPartitionStartLsn log.logDebug( s"Request options for Range '${partition.feedRange.min}-${partition.feedRange.max}' LSN '$startLsn'") @@ -248,4 +282,28 @@ private case class ChangeFeedPartitionReader throughputControlClientCacheItemOpt.get.close() } } + + private def getChangeFeedItemsCnt: Long = { + this.iterator.getTotalChangeFeedItemsCnt + } + + private def getChangeFeedLSNRange: Long = { + // calculate the changes per lsn + val latestLsnOpt = this.iterator.getLatestContinuationToken match { + case Some(continuationToken) => + // for cases where the feed range spans multiple physical partitions + // pick the smallest lsn + Some(SparkBridgeImplementationInternal + .extractContinuationTokensFromChangeFeedStateJson(continuationToken) + .minBy(_._2)._2) + case None => + // for change feed, we would only reach here before the first page got fetched + // fallback to use the continuation token from the partition instead + Some(SparkBridgeImplementationInternal + .extractContinuationTokensFromChangeFeedStateJson(partition.continuationState.get) + .minBy(_._2)._2) + } + + if (latestLsnOpt.isDefined) latestLsnOpt.get - startLsn else 0 + } } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedScan.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedScan.scala index e5a2f4bbd923..d448e1e715e2 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedScan.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedScan.scala @@ -2,9 +2,11 @@ // Licensed under the MIT License. package com.azure.cosmos.spark +import com.azure.cosmos.changeFeedMetrics.{ChangeFeedItemsCntMetric, ChangeFeedLsnRangeMetric, ChangeFeedPartitionIndexMetric} import com.azure.cosmos.spark.diagnostics.LoggerHelper import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.read.streaming.MicroBatchStream import org.apache.spark.sql.connector.read.{Batch, Scan} import org.apache.spark.sql.types.StructType @@ -64,4 +66,12 @@ private case class ChangeFeedScan override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { new ChangeFeedMicroBatchStream(session, schema, config, cosmosClientStateHandles, checkpointLocation: String, diagnosticsConfig) } + + override def supportedCustomMetrics(): Array[CustomMetric] = { + Array( + new ChangeFeedLsnRangeMetric, + new ChangeFeedItemsCntMetric, + new ChangeFeedPartitionIndexMetric + ) + } } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index 2d699d23aba3..dc50511483df 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -131,6 +131,7 @@ private[spark] object CosmosConfigNames { val ChangeFeedItemCountPerTriggerHint = "spark.cosmos.changeFeed.itemCountPerTriggerHint" val ChangeFeedBatchCheckpointLocation = "spark.cosmos.changeFeed.batchCheckpointLocation" val ChangeFeedBatchCheckpointLocationIgnoreWhenInvalid = "spark.cosmos.changeFeed.batchCheckpointLocation.ignoreWhenInvalid" + val ChangeFeedPerformanceMonitoringEnabled = "spark.cosmos.changeFeed.performance.monitoring.enabled" val ThroughputControlEnabled = "spark.cosmos.throughputControl.enabled" val ThroughputControlAccountEndpoint = "spark.cosmos.throughputControl.accountEndpoint" val ThroughputControlAccountKey = "spark.cosmos.throughputControl.accountKey" @@ -253,6 +254,7 @@ private[spark] object CosmosConfigNames { ChangeFeedItemCountPerTriggerHint, ChangeFeedBatchCheckpointLocation, ChangeFeedBatchCheckpointLocationIgnoreWhenInvalid, + ChangeFeedPerformanceMonitoringEnabled, ThroughputControlEnabled, ThroughputControlAccountEndpoint, ThroughputControlAccountKey, @@ -2170,7 +2172,8 @@ private case class CosmosChangeFeedConfig startFromPointInTime: Option[Instant], maxItemCountPerTrigger: Option[Long], batchCheckpointLocation: Option[String], - ignoreOffsetWhenInvalid: Boolean + ignoreOffsetWhenInvalid: Boolean, + performanceMonitoringEnabled: Boolean ) { def toRequestOptions(feedRange: FeedRange): CosmosChangeFeedRequestOptions = { @@ -2201,6 +2204,7 @@ private object CosmosChangeFeedConfig { private val DefaultChangeFeedMode: ChangeFeedMode = ChangeFeedModes.Incremental private val DefaultStartFromMode: ChangeFeedStartFromMode = ChangeFeedStartFromModes.Beginning private val DefaultIgnoreOffsetWhenInvalid: Boolean = false + private val DefaultPerformanceMonitoringEnabled: Boolean = true private val startFrom = CosmosConfigEntry[ChangeFeedStartFromMode]( key = CosmosConfigNames.ChangeFeedStartFrom, @@ -2250,6 +2254,14 @@ private object CosmosChangeFeedConfig { "instead. If this config is set and a file exists the StartFrom settings are ignored and instead the change " + "feed will be processed from the previous position.") + private val performanceMonitoringEnabled = CosmosConfigEntry[Boolean]( + key = CosmosConfigNames.ChangeFeedPerformanceMonitoringEnabled, + mandatory = false, + defaultValue = Some(DefaultPerformanceMonitoringEnabled), + parseFromStringFunction = enabled => enabled.toBoolean, + helpMessage = "A Flag to indicate whether enable change feed performance monitoring." + + " When enabled, custom task metrics will be tracked internally, which will be used to dynamically tuning the change feed micro-batch size.") + private def validateStartFromMode(startFrom: String): ChangeFeedStartFromMode = { Option(startFrom).fold(DefaultStartFromMode)(sf => { val trimmed = sf.trim @@ -2274,6 +2286,7 @@ private object CosmosChangeFeedConfig { case _ => None } val batchCheckpointLocationParsed = CosmosConfigEntry.parse(cfg, batchCheckpointLocation) + val performanceMonitoringEnabledParsed = CosmosConfigEntry.parse(cfg, performanceMonitoringEnabled) CosmosChangeFeedConfig( changeFeedModeParsed.getOrElse(DefaultChangeFeedMode), @@ -2281,7 +2294,8 @@ private object CosmosChangeFeedConfig { startFromPointInTimeParsed, maxItemCountPerTriggerHintParsed, batchCheckpointLocationParsed, - ignoreOffsetWhenInvalidParsed.getOrElse(DefaultIgnoreOffsetWhenInvalid) + ignoreOffsetWhenInvalidParsed.getOrElse(DefaultIgnoreOffsetWhenInvalid), + performanceMonitoringEnabledParsed.get ) } } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala index 71ffbbebd62a..78e91fe25fed 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala @@ -19,12 +19,9 @@ private[cosmos] object CosmosConstants { val maxRetryIntervalForTransientFailuresInMs = 5000 val maxRetryCountForTransientFailures = 100 val defaultDirectRequestTimeoutInSeconds = 10L - val defaultHttpRequestTimeoutInSeconds = 70L val feedRangesCacheIntervalInMinutes = 1L val defaultIoThreadCountFactorPerCore = 4 val smallestPossibleReactorQueueSizeLargerThanOne: Int = math.min(8, Queues.XS_BUFFER_SIZE) - val defaultMetricsIntervalInSeconds = 60 - val defaultSlf4jMetricReporterEnabled = false val readOperationEndToEndTimeoutInSeconds = 65 val batchOperationEndToEndTimeoutInSeconds = 65 @@ -38,6 +35,10 @@ private[cosmos] object CosmosConstants { val RecordsWritten = "recordsWritten" val TotalRequestCharge = "cosmos.totalRequestCharge" + val ChangeFeedLsnRange = "cosmos.changeFeed.partition.lsnRange" + val ChangeFeedItemsCnt = "cosmos.changeFeed.partition.itemsCnt" + val ChangeFeedPartitionIndex = "cosmos.changeFeed.partition.index" + val KnownCustomMetricNames: Set[String] = Set(TotalRequestCharge) } @@ -71,4 +72,21 @@ private[cosmos] object CosmosConstants { val DefaultTtlInSeconds = "DefaultTtlInSeconds" val AnalyticalStoreTtlInSeconds = "AnalyticalStoreTtlInSeconds" } + + object ChangeFeedMetricsConfigs { + private val MetricsHistoryPropertyName = "spark.cosmos.changeFeed.performance.metrics.history" + private val MetricsHistoryEnvName = "SPARK.COSMOS.CHANGEFEED.PERFORMANCE.METRICS.HISTORY" + private val DefaultMetricsHistory = "5" + private val MetricsHistoryDecayFactorPropertyName = "spark.cosmos.changeFeed.performance.metrics.decayFactor" + private val MetricsHistoryDecayFactorEnvName = "SPARK.COSMOS.CHANGEFEED.PERFORMANCE.METRICS.DECAYFACTOR" + private val DefaultMetricsHistoryDecayFactor = "0.85" + val MetricsHistory: Int = + Option(System.getProperty(MetricsHistoryPropertyName)) + .orElse(sys.env.get(MetricsHistoryEnvName)) + .getOrElse(DefaultMetricsHistory).toInt + val MetricsHistoryDecayFactor: Double = + Option(System.getProperty(MetricsHistoryPropertyName)) + .orElse(sys.env.get(MetricsHistoryEnvName)) + .getOrElse(DefaultMetricsHistory) .toDouble + } } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosInputPartition.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosInputPartition.scala index dc1aaea79d63..a0f2c120266d 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosInputPartition.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosInputPartition.scala @@ -44,7 +44,8 @@ private[spark] case class CosmosInputPartition feedRange: NormalizedRange, endLsn: Option[Long], continuationState: Option[String] = None, - readManyFilterOpt: Option[List[String]] = None + readManyFilterOpt: Option[List[String]] = None, + index: Option[Long] = None ) extends InputPartition { // Intentionally leaving out the change feed state when serializing input partition to json @@ -57,16 +58,20 @@ private[spark] case class CosmosInputPartition def json(): String = jsonPersisted - private[spark] def withContinuationState(continuationState: String, clearEndLsn: Boolean): InputPartition = { + private[spark] def withContinuationState(continuationState: String, clearEndLsn: Boolean): CosmosInputPartition = { val effectiveEndLsn = if (clearEndLsn) { None } else { this.endLsn } - CosmosInputPartition(this.feedRange, effectiveEndLsn , Some(continuationState)) + CosmosInputPartition(this.feedRange, effectiveEndLsn, Some(continuationState)) } private[spark] def clearEndLsn(): InputPartition = { CosmosInputPartition(this.feedRange, None, this.continuationState) } + + private[spark] def withIndex(index: Long): CosmosInputPartition = { + CosmosInputPartition(this.feedRange, this.endLsn, this.continuationState, index = Some(index)) + } } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala index 97623f544ecb..3f381934fb7c 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala @@ -5,6 +5,7 @@ package com.azure.cosmos.spark import com.azure.cosmos.implementation.SparkBridgeImplementationInternal.extractContinuationTokensFromChangeFeedStateJson import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, Strings} +import com.azure.cosmos.changeFeedMetrics.ChangeFeedMetricsTracker import com.azure.cosmos.models.FeedRange import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver, requireNotNull} import com.azure.cosmos.spark.CosmosTableSchemaInferrer.LsnAttributeName @@ -26,19 +27,19 @@ import scala.collection.mutable.ArrayBuffer // scalastyle:off underscore.import import scala.collection.JavaConverters._ // scalastyle:on underscore.import - private object CosmosPartitionPlanner extends BasicLoggingTrait { val DefaultPartitionSizeInMB: Int = 5 * 1024 // 10 GB def createInputPartitions ( - cosmosPartitioningConfig: CosmosPartitioningConfig, - container: CosmosAsyncContainer, - partitionMetadata: Array[PartitionMetadata], - defaultMinimalPartitionCount: Int, - defaultMaxPartitionSizeInMB: Int, - readLimit: ReadLimit, - isChangeFeed: Boolean + cosmosPartitioningConfig: CosmosPartitioningConfig, + container: CosmosAsyncContainer, + partitionMetadata: Array[PartitionMetadata], + defaultMinimalPartitionCount: Int, + defaultMaxPartitionSizeInMB: Int, + readLimit: ReadLimit, + isChangeFeed: Boolean, + partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None ): Array[CosmosInputPartition] = { TransientErrorsRetryPolicy.executeWithRetry(() => @@ -49,18 +50,20 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { defaultMinimalPartitionCount, defaultMaxPartitionSizeInMB, readLimit, - isChangeFeed)) + isChangeFeed, + partitionMetricsMap)) } private[this] def createInputPartitionsImpl ( - cosmosPartitioningConfig: CosmosPartitioningConfig, - container: CosmosAsyncContainer, - partitionMetadata: Array[PartitionMetadata], - defaultMinimalPartitionCount: Int, - defaultMaxPartitionSizeInMB: Int, - readLimit: ReadLimit, - isChangeFeed: Boolean + cosmosPartitioningConfig: CosmosPartitioningConfig, + container: CosmosAsyncContainer, + partitionMetadata: Array[PartitionMetadata], + defaultMinimalPartitionCount: Int, + defaultMaxPartitionSizeInMB: Int, + readLimit: ReadLimit, + isChangeFeed: Boolean, + partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None ): Array[CosmosInputPartition] = { assertOnSparkDriver() //scalastyle:off multiple.string.literals @@ -72,7 +75,7 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { val inputPartitions = if (!isChangeFeed && cosmosPartitioningConfig.partitioningStrategy == PartitioningStrategies.Restrictive) { partitionMetadata.map(metadata => CosmosInputPartition(metadata.feedRange, None)) } else { - val planningInfo = this.getPartitionPlanningInfo(partitionMetadata, readLimit) + val planningInfo = this.getPartitionPlanningInfo(partitionMetadata, readLimit, isChangeFeed, partitionMetricsMap) cosmosPartitioningConfig.partitioningStrategy match { case PartitioningStrategies.Restrictive => @@ -257,7 +260,8 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { containerConfig: CosmosContainerConfig, partitioningConfig: CosmosPartitioningConfig, defaultParallelism: Int, - container: CosmosAsyncContainer + container: CosmosAsyncContainer, + partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None ): ChangeFeedOffset = { TransientErrorsRetryPolicy.executeWithRetry(() => @@ -271,7 +275,8 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { containerConfig, partitioningConfig, defaultParallelism, - container)) + container, + partitionMetricsMap)) } // scalastyle:on parameter.number @@ -289,7 +294,8 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { containerConfig: CosmosContainerConfig, partitioningConfig: CosmosPartitioningConfig, defaultParallelism: Int, - container: CosmosAsyncContainer + container: CosmosAsyncContainer, + partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None ): ChangeFeedOffset = { assertOnSparkDriver() assertNotNull(startOffset, "startOffset") @@ -317,7 +323,8 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { defaultMinPartitionCount, defaultMaxPartitionSizeInMB, readLimit, - true + true, + partitionMetricsMap ) if (isDebugLogEnabled) { @@ -488,8 +495,8 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { targetPartitionCount: Int ): Array[CosmosInputPartition] = { val customPartitioningFactor = planningInfo - .map(pi => pi.scaleFactor) - .sum / targetPartitionCount + .map(pi => pi.scaleFactor) + .sum / targetPartitionCount applyStorageAlignedStrategy( container, planningInfo, @@ -501,7 +508,9 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { def getPartitionPlanningInfo ( partitionMetadata: Array[PartitionMetadata], - readLimit: ReadLimit + readLimit: ReadLimit, + isChangeFeed: Boolean, + partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None ): Array[PartitionPlanningInfo] = { assertOnSparkDriver() @@ -511,7 +520,7 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { new Array[PartitionPlanningInfo](partitionMetadata.length) var index = 0 - calculateEndLsn(partitionMetadata, readLimit) + calculateEndLsn(partitionMetadata, readLimit, isChangeFeed, partitionMetricsMap) .foreach(m => { val storageSizeInMB: Double = m.totalDocumentSizeInKB / 1024.toDouble val progressWeightFactor: Double = getChangeFeedProgressFactor(storageSizeInMB, m) @@ -540,12 +549,14 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { private[spark] def calculateEndLsn ( metadata: Array[PartitionMetadata], - readLimit: ReadLimit + readLimit: ReadLimit, + isChangeFeed: Boolean, + partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None ): Array[PartitionMetadata] = { val totalWeightedLsnGap = new AtomicLong(0) metadata.foreach(m => { - totalWeightedLsnGap.addAndGet(m.getWeightedLsnGap) + totalWeightedLsnGap.addAndGet(m.getWeightedLsnGap(isChangeFeed, partitionMetricsMap)) }) metadata @@ -557,18 +568,25 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { if (totalWeightedLsnGap.get <= maxRowsLimit.maxRows) { val effectiveLatestLsn = metadata.getAndValidateLatestLsn if (isDebugLogEnabled) { + val avgItemsPerLsn = metadata.getAvgItemsPerLsn(isChangeFeed, partitionMetricsMap) + val weightedLsnGap = metadata.getWeightedLsnGap(isChangeFeed, partitionMetricsMap) + val calculateDebugLine = s"calculateEndLsn (feedRange: ${metadata.feedRange}) - avg. Docs " + - s"per LSN: ${metadata.getAvgItemsPerLsn} documentCount ${metadata.documentCount} firstLsn " + + s"per LSN: $avgItemsPerLsn documentCount ${metadata.documentCount} firstLsn " + s"${metadata.firstLsn} latestLsn ${metadata.latestLsn} startLsn ${metadata.startLsn} weightedGap " + - s"${metadata.getWeightedLsnGap} effectiveEndLsn $effectiveLatestLsn maxRows ${maxRowsLimit.maxRows}" + s"$weightedLsnGap effectiveEndLsn $effectiveLatestLsn maxRows ${maxRowsLimit.maxRows}" logDebug(calculateDebugLine) } effectiveLatestLsn } else { // the weight of this feedRange compared to other feedRanges - val feedRangeWeightFactor = metadata.getWeightedLsnGap.toDouble / totalWeightedLsnGap.get + val avgItemsPerLsn = metadata.getAvgItemsPerLsn(isChangeFeed, partitionMetricsMap) + val weightedLsnGap = metadata.getWeightedLsnGap(isChangeFeed, partitionMetricsMap) + + val feedRangeWeightFactor = weightedLsnGap.toDouble / totalWeightedLsnGap.get - val allowedRate = (feedRangeWeightFactor * maxRowsLimit.maxRows() / metadata.getAvgItemsPerLsn) + val allowedRate = + (feedRangeWeightFactor * maxRowsLimit.maxRows() / avgItemsPerLsn) .toLong .max(1) val effectiveLatestLsn = metadata.getAndValidateLatestLsn @@ -577,11 +595,11 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait { metadata.startLsn + allowedRate) if (isDebugLogEnabled) { val calculateDebugLine = s"calculateEndLsn (feedRange: ${metadata.feedRange}) - avg. Docs/LSN: " + - s"${metadata.getAvgItemsPerLsn} feedRangeWeightFactor $feedRangeWeightFactor documentCount " + - s"${metadata.documentCount} firstLsn ${metadata.firstLsn} latestLsn ${metadata.latestLsn} " + - s"effectiveLatestLsn $effectiveLatestLsn startLsn " + - s"${metadata.startLsn} allowedRate $allowedRate weightedGap ${metadata.getWeightedLsnGap} " + - s"effectiveEndLsn $effectiveEndLsn maxRows $maxRowsLimit.maxRows" + s"$avgItemsPerLsn feedRangeWeightFactor $feedRangeWeightFactor documentCount " + + s"${metadata.documentCount} firstLsn ${metadata.firstLsn} latestLsn ${metadata.latestLsn} " + + s"effectiveLatestLsn $effectiveLatestLsn startLsn " + + s"${metadata.startLsn} allowedRate $allowedRate weightedGap $weightedLsnGap " + + s"effectiveEndLsn $effectiveEndLsn maxRows $maxRowsLimit.maxRows" logDebug(calculateDebugLine) } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsScanBase.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsScanBase.scala index 2f108d166a37..453f0341818e 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsScanBase.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsScanBase.scala @@ -14,8 +14,8 @@ import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionRead import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType +import java.util.OptionalLong import java.util.concurrent.atomic.{AtomicLong, AtomicReference} -import java.util.{OptionalLong, UUID} import scala.collection.mutable.ListBuffer private[spark] abstract class ItemsScanBase(session: SparkSession, diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PartitionMetadata.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PartitionMetadata.scala index 1452349e7df2..f290f00e129e 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PartitionMetadata.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PartitionMetadata.scala @@ -3,12 +3,14 @@ package com.azure.cosmos.spark +import com.azure.cosmos.changeFeedMetrics.ChangeFeedMetricsTracker import com.azure.cosmos.implementation.SparkBridgeImplementationInternal import com.azure.cosmos.spark.CosmosPredicates.requireNotNull import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import org.apache.spark.broadcast.Broadcast import java.time.Instant +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong private object PartitionMetadata { @@ -114,20 +116,41 @@ private[cosmos] case class PartitionMetadata ) } - def getWeightedLsnGap: Long = { + def getWeightedLsnGap( + isChangeFeed: Boolean, + partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None): Long = { val progressFactor = math.max(this.getAndValidateLatestLsn - this.startLsn, 0) if (progressFactor == 0) { 0 } else { - val averageItemsPerLsn = getAvgItemsPerLsn + val effectiveItemsPerLsn = this.getAvgItemsPerLsn(isChangeFeed, partitionMetricsMap) - val weightedGap: Double = progressFactor * averageItemsPerLsn + val weightedGap: Double = progressFactor * effectiveItemsPerLsn // Any double less than 1 gets rounded to 0 when toLong is invoked weightedGap.toLong.max(1) } } - def getAvgItemsPerLsn: Double = { + def getAvgItemsPerLsn(isChangeFeed: Boolean, + partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None): Double = { + var itemsPerLsnFromMetricsOpt: Option[Double] = None + if (isChangeFeed) { + partitionMetricsMap match { + case Some(metricsMap) => + if (metricsMap.containsKey(this.feedRange)) { + itemsPerLsnFromMetricsOpt = partitionMetricsMap.get.get(this.feedRange).getWeightedChangeFeedItemsPerLsn + } + case None => + } + } + + itemsPerLsnFromMetricsOpt match { + case Some(itemsPerLsnFromMetrics) => itemsPerLsnFromMetrics + case None => getDefaultAvgItemsPerLsn + } + } + + def getDefaultAvgItemsPerLsn: Double = { if (this.firstLsn.isEmpty) { math.max(1d, this.documentCount.toDouble / this.getAndValidateLatestLsn) } else if (this.documentCount == 0 || (this.getAndValidateLatestLsn - this.firstLsn.get) <= 0) { diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala index 7fb2b2dad0a5..8b049034602f 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala @@ -68,6 +68,21 @@ private class TransientIOErrorsRetryingIterator[TSparkRow] private[spark] var currentFeedResponseIterator: Option[BufferedIterator[FeedResponse[TSparkRow]]] = None private[spark] var currentItemIterator: Option[BufferedIterator[TSparkRow]] = None private val lastPagedFlux = new AtomicReference[Option[CosmosPagedFlux[TSparkRow]]](None) + + private val totalChangesCnt = new AtomicLong(0) + + def getTotalChangeFeedItemsCnt: Long = { + totalChangesCnt.get() + } + + def getLatestContinuationToken: Option[String] = { + if (lastContinuationToken == null) { + None + } else { + Some(lastContinuationToken.get()) + } + } + override def hasNext: Boolean = { executeWithRetry("hasNextInternal", () => hasNextInternal) } @@ -161,6 +176,7 @@ private class TransientIOErrorsRetryingIterator[TSparkRow] } val iteratorCandidate = feedResponse.getResults.iterator().asScala.buffered lastContinuationToken.set(feedResponse.getContinuationToken) + totalChangesCnt.addAndGet(feedResponse.getResults.size()) if (iteratorCandidate.hasNext && validateNextLsn(iteratorCandidate)) { currentItemIterator = Some(iteratorCandidate) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListenerSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListenerSpec.scala new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedMetricsListenerSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedMetricsListenerSpec.scala new file mode 100644 index 000000000000..70d0352369a6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedMetricsListenerSpec.scala @@ -0,0 +1,138 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.spark + +import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker} +import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps} +import org.apache.spark.Success +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.scheduler.{SparkListenerTaskEnd, TaskInfo} +import org.apache.spark.sql.execution.metric.SQLMetric +import org.mockito.ArgumentMatchers +import org.mockito.Mockito.{mock, when} + +import java.lang.reflect.Field +import java.util.concurrent.ConcurrentHashMap + +class ChangeFeedMetricsListenerSpec extends UnitSpec { + "ChangeFeedMetricsListener" should "be able to capture changeFeed performance metrics" in { + val taskEnd = SparkListenerTaskEnd( + stageId = 1, + stageAttemptId = 0, + taskType = "ResultTask", + reason = Success, + taskInfo = mock(classOf[TaskInfo]), + taskExecutorMetrics = mock(classOf[ExecutorMetrics]), + taskMetrics = mock(classOf[TaskMetrics]) + ) + + val metrics = Map[String, SQLMetric]( + CosmosConstants.MetricNames.ChangeFeedPartitionIndex -> new SQLMetric("index", 1), + CosmosConstants.MetricNames.ChangeFeedLsnRange -> new SQLMetric("lsn", 100), + CosmosConstants.MetricNames.ChangeFeedItemsCnt -> new SQLMetric("items", 100) + ) + + // create sparkInternalsBridge mock + val sparkInternalsBridge = mock(classOf[SparkInternalsBridge]) + when(sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric( + ArgumentMatchers.any[Set[String]], + ArgumentMatchers.any[TaskMetrics] + )).thenReturn(metrics) + + val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]()) + partitionIndexMap.put(NormalizedRange("0", "FF"), 1) + + val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + val changeFeedMetricsListener = new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap) + + // set the internal sparkInternalsBridgeField + val sparkInternalsBridgeField: Field = classOf[ChangeFeedMetricsListener].getDeclaredField("sparkInternalsBridge") + sparkInternalsBridgeField.setAccessible(true) + sparkInternalsBridgeField.set(changeFeedMetricsListener, sparkInternalsBridge) + + // verify that metrics will be properly tracked + changeFeedMetricsListener.onTaskEnd(taskEnd) + partitionMetricsMap.size() shouldBe 1 + partitionMetricsMap.containsKey(NormalizedRange("0", "FF")) shouldBe true + partitionMetricsMap.get(NormalizedRange("0", "FF")).getWeightedChangeFeedItemsPerLsn.get shouldBe 1 + } + + it should "ignore metrics for unknown partition index" in { + val taskEnd = SparkListenerTaskEnd( + stageId = 1, + stageAttemptId = 0, + taskType = "ResultTask", + reason = Success, + taskInfo = mock(classOf[TaskInfo]), + taskExecutorMetrics = mock(classOf[ExecutorMetrics]), + taskMetrics = mock(classOf[TaskMetrics]) + ) + + val metrics = Map[String, SQLMetric]( + CosmosConstants.MetricNames.ChangeFeedPartitionIndex -> new SQLMetric("index", 10), + CosmosConstants.MetricNames.ChangeFeedLsnRange -> new SQLMetric("lsn", 100), + CosmosConstants.MetricNames.ChangeFeedItemsCnt -> new SQLMetric("items", 100) + ) + + // create sparkInternalsBridge mock + val sparkInternalsBridge = mock(classOf[SparkInternalsBridge]) + when(sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric( + ArgumentMatchers.any[Set[String]], + ArgumentMatchers.any[TaskMetrics] + )).thenReturn(metrics) + + val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]()) + partitionIndexMap.put(NormalizedRange("0", "FF"), 1) + + val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + val changeFeedMetricsListener = new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap) + + // set the internal sparkInternalsBridgeField + val sparkInternalsBridgeField: Field = classOf[ChangeFeedMetricsListener].getDeclaredField("sparkInternalsBridge") + sparkInternalsBridgeField.setAccessible(true) + sparkInternalsBridgeField.set(changeFeedMetricsListener, sparkInternalsBridge) + + // because partition index 10 does not exist in the partitionIndexMap, it will be ignored + changeFeedMetricsListener.onTaskEnd(taskEnd) + partitionMetricsMap shouldBe empty + } + + it should "ignore unrelated metrics" in { + val taskEnd = SparkListenerTaskEnd( + stageId = 1, + stageAttemptId = 0, + taskType = "ResultTask", + reason = Success, + taskInfo = mock(classOf[TaskInfo]), + taskExecutorMetrics = mock(classOf[ExecutorMetrics]), + taskMetrics = mock(classOf[TaskMetrics]) + ) + + val metrics = Map[String, SQLMetric]( + "unknownMetrics" -> new SQLMetric("index", 10) + ) + + // create sparkInternalsBridge mock + val sparkInternalsBridge = mock(classOf[SparkInternalsBridge]) + when(sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric( + ArgumentMatchers.any[Set[String]], + ArgumentMatchers.any[TaskMetrics] + )).thenReturn(metrics) + + val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]()) + partitionIndexMap.put(NormalizedRange("0", "FF"), 1) + + val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + val changeFeedMetricsListener = new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap) + + // set the internal sparkInternalsBridgeField + val sparkInternalsBridgeField: Field = classOf[ChangeFeedMetricsListener].getDeclaredField("sparkInternalsBridge") + sparkInternalsBridgeField.setAccessible(true) + sparkInternalsBridgeField.set(changeFeedMetricsListener, sparkInternalsBridge) + + // because partition index 10 does not exist in the partitionIndexMap, it will be ignored + changeFeedMetricsListener.onTaskEnd(taskEnd) + partitionMetricsMap shouldBe empty + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedMetricsTrackerSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedMetricsTrackerSpec.scala new file mode 100644 index 000000000000..f851f1efed91 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedMetricsTrackerSpec.scala @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.spark + +import com.azure.cosmos.changeFeedMetrics.ChangeFeedMetricsTracker + +class ChangeFeedMetricsTrackerSpec extends UnitSpec { + + it should "track weighted changes per lsn" in { + val metricsTracker = ChangeFeedMetricsTracker(1, NormalizedRange("0", "FF"), 5, 0.5) + // Single tracking + metricsTracker.track(100, 200) // 2 changes per LSN + metricsTracker.getWeightedChangeFeedItemsPerLsn.get shouldBe 2.0 + + // Multiple tracking + metricsTracker.track(50, 150) // 3 changes per LSN + // With default decay factor 0.5, weighted avg should be (2.0 * 0.5 + 3.0 * 1.0)/(0.5 + 1.0) = 2.67 + metricsTracker.getWeightedChangeFeedItemsPerLsn.get shouldBe 2.67 +- 0.01 + } + + it should "return none when no metrics tracked" in { + val testRange = NormalizedRange("0", "FF") + val metricsTracker = ChangeFeedMetricsTracker(1, testRange) + + metricsTracker.getWeightedChangeFeedItemsPerLsn shouldBe None + } + + it should "handle zero LSN gap correctly" in { + val testRange = NormalizedRange("0", "FF") + val metricsTracker = ChangeFeedMetricsTracker(1, testRange, 5, 0.5) + + metricsTracker.track(0, 100) // Should treat as 100 changes + metricsTracker.getWeightedChangeFeedItemsPerLsn.get shouldBe 100.0 + + metricsTracker.track(0, 0) // Should treat as 1 change + metricsTracker.getWeightedChangeFeedItemsPerLsn.get shouldBe (100.0 * 0.5 + 1.0)/(0.5 + 1.0) +- 0.01 + } + + + it should "respect maxHistory limit" in { + val maxHistory = 2 + val tracker = ChangeFeedMetricsTracker(0L, NormalizedRange("", "FF"), maxHistory, 0.5) + + tracker.track(10, 10) // 1.0 changes per LSN + tracker.track(10, 20) // 2.0 changes per LSN + tracker.track(10, 30) // 3.0 changes per LSN - should evict first entry + + // Should only consider last 2 entries: (2.0 * 0.5 + 3.0 * 1.0)/(0.5 + 1.0) + tracker.getWeightedChangeFeedItemsPerLsn.get shouldBe 2.67 +- 0.01 + } + + it should "handle minimum change count of 1" in { + val testRange = NormalizedRange("0", "FF") + val metricsTracker = ChangeFeedMetricsTracker(1, testRange, 5, 0.5) + + // Small values should be normalized to minimum of 1 change + metricsTracker.track(1000, 0) + metricsTracker.getWeightedChangeFeedItemsPerLsn.get shouldBe 0.001 // 1/1000 + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedPartitionReaderITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedPartitionReaderITest.scala index fbd70df087cd..3e0a0833c69c 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedPartitionReaderITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedPartitionReaderITest.scala @@ -227,6 +227,105 @@ class ChangeFeedPartitionReaderITest } + "change feed partition reader" should "report custom metrics" in { + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + val testId = UUID.randomUUID().toString + val sourceContainerResponse = cosmosClient.getDatabase(cosmosDatabase).createContainer( + "source_" + testId, + "/sequenceNumber", + ThroughputProperties.createManualThroughput(400)).block() + val sourceContainer = cosmosClient.getDatabase(cosmosDatabase).getContainer(sourceContainerResponse.getProperties.getId) + val rid = sourceContainerResponse.getProperties.getResourceId + val continuationState = s"""{ + "V": 1, + "Rid": "$rid", + "Mode": "INCREMENTAL", + "StartFrom": { + "Type": "BEGINNING" + }, + "Continuation": { + "V": 1, + "Rid": "$rid", + "Continuation": [ + { + "token": "1", + "range": { + "min": "", + "max": "FF" + } + } + ], + "Range": { + "min": "", + "max": "FF" + } + } +}""" + val encoder = Base64.getEncoder + val encodedBytes = encoder.encode(continuationState.getBytes("UTF-8")) + val continuationStateEncoded = new String(encodedBytes, "UTF-8") + + val changeFeedCfg = Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> sourceContainer.getId(), + "spark.cosmos.read.inferSchema.enabled" -> "false", + ) + var inputtedDocuments = 10 + var lsn1 = 0L + for (_ <- 0 until inputtedDocuments) { + lsn1 = ingestTestDocuments(sourceContainer, 1) + } + + val structs = Array( + StructField("_rawBody", StringType, false), + StructField("_etag", StringType, false), + StructField("_ts", StringType, false), + StructField("id", StringType, false), + StructField("_lsn", StringType, false) + ) + val schema = new StructType(structs) + + val diagnosticContext = DiagnosticsContext(UUID.randomUUID(), "") + val cosmosClientStateHandles = initializeAndBroadcastCosmosClientStatesForContainer(changeFeedCfg) + val diagnosticsConfig = DiagnosticsConfig() + val cosmosInputPartition = + new CosmosInputPartition( + NormalizedRange("", "FF"), + Some(lsn1 + 1), + Some(continuationStateEncoded), + index = Some(2)) + val changeFeedPartitionReader = new ChangeFeedPartitionReader( + cosmosInputPartition, + changeFeedCfg, + schema, + diagnosticContext, + cosmosClientStateHandles, + diagnosticsConfig, + "" + ) + var count = 0 + implicit val ec: ExecutionContext = ExecutionContext.global + Future { + while (changeFeedPartitionReader.next()) { + changeFeedPartitionReader.get() + count += 1 + } + } + sleep(2000) + + val currentMetrics = changeFeedPartitionReader.currentMetricsValues() + currentMetrics.length shouldBe 3 + currentMetrics(0).name() shouldBe CosmosConstants.MetricNames.ChangeFeedLsnRange + currentMetrics(0).value() shouldBe(lsn1 - 1) + currentMetrics(1).name() shouldBe CosmosConstants.MetricNames.ChangeFeedItemsCnt + currentMetrics(1).value() shouldBe 10 + currentMetrics(2).name() shouldBe CosmosConstants.MetricNames.ChangeFeedPartitionIndex + currentMetrics(2).value() shouldBe 2 + } + private[this] def initializeAndBroadcastCosmosClientStatesForContainer(config: Map[String, String]) : Broadcast[CosmosClientMetadataCachesSnapshots] = { val userConfig = CosmosConfig.getEffectiveConfig(None, None, config) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala index c02c0d75b97d..f7c84941a532 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala @@ -886,6 +886,15 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait { config.maxItemCountPerTrigger.get shouldEqual 54 } + it should "parse change feed config with performance monitoring config" in { + val changeFeedConfig = Map( + "spark.cosmos.changeFeed.performance.monitoring.enabled" -> "false" + ) + + val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig) + config.performanceMonitoringEnabled shouldBe false + } + it should "complain when parsing invalid change feed mode" in { val changeFeedConfig = Map( "spark.cosmos.changeFeed.mode" -> "Whatever", diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosPartitionPlannerSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosPartitionPlannerSpec.scala index b8e7a564ad1b..7974058a969d 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosPartitionPlannerSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosPartitionPlannerSpec.scala @@ -3,11 +3,13 @@ package com.azure.cosmos.spark import com.azure.core.management.AzureEnvironment +import com.azure.cosmos.changeFeedMetrics.ChangeFeedMetricsTracker import com.azure.cosmos.{ReadConsistencyStrategy, spark} import org.apache.spark.sql.connector.read.streaming.ReadLimit import java.time.Instant import java.util.UUID +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong class CosmosPartitionPlannerSpec extends UnitSpec { @@ -83,7 +85,8 @@ class CosmosPartitionPlannerSpec extends UnitSpec { val calculate = CosmosPartitionPlanner.calculateEndLsn( Array[PartitionMetadata](metadata1, metadata2), - ReadLimit.allAvailable() + ReadLimit.allAvailable(), + isChangeFeed = true ) calculate(0).endLsn.get shouldBe latestLsn @@ -159,7 +162,8 @@ class CosmosPartitionPlannerSpec extends UnitSpec { val calculate = CosmosPartitionPlanner.calculateEndLsn( Array[PartitionMetadata](metadata1, metadata2), - ReadLimit.allAvailable() + ReadLimit.allAvailable(), + isChangeFeed = true ) calculate(0).endLsn.get shouldBe startLsn @@ -235,7 +239,8 @@ class CosmosPartitionPlannerSpec extends UnitSpec { val calculate = CosmosPartitionPlanner.calculateEndLsn( Array[PartitionMetadata](metadata1, metadata2), - ReadLimit.allAvailable() + ReadLimit.allAvailable(), + isChangeFeed = true ) calculate(0).endLsn.get shouldBe startLsn @@ -308,7 +313,8 @@ class CosmosPartitionPlannerSpec extends UnitSpec { val calculate = CosmosPartitionPlanner.calculateEndLsn( Array[PartitionMetadata](metadata1, metadata2), - ReadLimit.maxRows(maxRows) + ReadLimit.maxRows(maxRows), + isChangeFeed = true ) calculate(0).endLsn.get shouldEqual 2052 // proceeds 2 LSNs @@ -397,7 +403,8 @@ class CosmosPartitionPlannerSpec extends UnitSpec { val calculate = CosmosPartitionPlanner.calculateEndLsn( Array[PartitionMetadata](metadata1, metadata2, metadata3), - ReadLimit.maxRows(maxRows) + ReadLimit.maxRows(maxRows), + isChangeFeed = true ) calculate(0).endLsn.get shouldEqual 2051 // proceeds at least 1 LSN @@ -472,10 +479,142 @@ class CosmosPartitionPlannerSpec extends UnitSpec { val calculate = CosmosPartitionPlanner.calculateEndLsn( Array[PartitionMetadata](metadata1, metadata2), - ReadLimit.maxRows(maxRows) + ReadLimit.maxRows(maxRows), + isChangeFeed = true ) calculate(0).endLsn.get shouldEqual 2150 calculate(1).endLsn.get shouldEqual 2150 } + + it should "calculateEndLsn should distribute rate based on metrics with readLimit" in { + val clientConfig = spark.CosmosClientConfiguration( + UUID.randomUUID().toString, + UUID.randomUUID().toString, + CosmosMasterKeyAuthConfig(UUID.randomUUID().toString), + None, + UUID.randomUUID().toString, + useGatewayMode = false, + enforceNativeTransport = false, + proactiveConnectionInitialization = None, + proactiveConnectionInitializationDurationInSeconds = 120, + httpConnectionPoolSize = 1000, + readConsistencyStrategy = ReadConsistencyStrategy.EVENTUAL, + disableTcpConnectionEndpointRediscovery = false, + preferredRegionsList = Option.empty, + subscriptionId = None, + tenantId = None, + resourceGroupName = None, + azureEnvironmentEndpoints = AzureEnvironment.AZURE.getEndpoints, + sparkEnvironmentInfo = "", + clientBuilderInterceptors = None, + clientInterceptors = None, + sampledDiagnosticsLoggerConfig = None, + azureMonitorConfig = None + ) + + val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString) + val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString) + val docSizeInKB = rnd.nextInt() + val maxRows = 10 + val nowEpochMs = Instant.now.toEpochMilli + val createdAt = new AtomicLong(nowEpochMs) + val lastRetrievedAt = new AtomicLong(nowEpochMs) + + val metadata = PartitionMetadata( + Map[String, String](), + clientConfig, + None, + containerConfig, + normalizedRange, + documentCount = 2150, + docSizeInKB, + firstLsn = Some(0), + latestLsn = 2150, + startLsn = 2050, + None, + createdAt, + lastRetrievedAt) + + val metricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + val metricsTracker = ChangeFeedMetricsTracker(0L, normalizedRange) + // Simulate metrics showing 2 changes per LSN on average + metricsTracker.track(10, 20) + metricsMap.put(normalizedRange, metricsTracker) + + val calculate = CosmosPartitionPlanner.calculateEndLsn( + Array[PartitionMetadata](metadata), + ReadLimit.maxRows(maxRows), + isChangeFeed = true, + Some(metricsMap) + ) + + // With 2 changes per LSN average from metrics and maxRows=10, should allow 5 LSN progress + calculate(0).endLsn.get shouldEqual 2055 + } + + it should "calculateEndLsn should handle when no progress is made even with metrics" in { + val clientConfig = spark.CosmosClientConfiguration( + UUID.randomUUID().toString, + UUID.randomUUID().toString, + CosmosMasterKeyAuthConfig(UUID.randomUUID().toString), + None, + UUID.randomUUID().toString, + useGatewayMode = false, + enforceNativeTransport = false, + proactiveConnectionInitialization = None, + proactiveConnectionInitializationDurationInSeconds = 120, + httpConnectionPoolSize = 1000, + readConsistencyStrategy = ReadConsistencyStrategy.EVENTUAL, + disableTcpConnectionEndpointRediscovery = false, + preferredRegionsList = Option.empty, + subscriptionId = None, + tenantId = None, + resourceGroupName = None, + azureEnvironmentEndpoints = AzureEnvironment.AZURE.getEndpoints, + sparkEnvironmentInfo = "", + clientBuilderInterceptors = None, + clientInterceptors = None, + sampledDiagnosticsLoggerConfig = None, + azureMonitorConfig = None + ) + + val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString) + val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString) + val docSizeInKB = rnd.nextInt() + val maxRows = 10 + val nowEpochMs = Instant.now.toEpochMilli + val createdAt = new AtomicLong(nowEpochMs) + val lastRetrievedAt = new AtomicLong(nowEpochMs) + + val metadata = PartitionMetadata( + Map[String, String](), + clientConfig, + None, + containerConfig, + normalizedRange, + documentCount = 2150, + docSizeInKB, + firstLsn = Some(0), + latestLsn = 2050, // Latest LSN same as start LSN + startLsn = 2050, + None, + createdAt, + lastRetrievedAt) + + val metricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + val metricsTracker = ChangeFeedMetricsTracker(0L, normalizedRange) + metricsTracker.track(2050, 100) + metricsMap.put(normalizedRange, metricsTracker) + + val calculate = CosmosPartitionPlanner.calculateEndLsn( + Array[PartitionMetadata](metadata), + ReadLimit.maxRows(maxRows), + isChangeFeed = true, + Some(metricsMap) + ) + + // Should stay at start LSN since no progress can be made + calculate(0).endLsn.get shouldEqual 2050 + } } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/PartitionMetadataSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/PartitionMetadataSpec.scala index 97e873383a38..daddac4ee0a9 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/PartitionMetadataSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/PartitionMetadataSpec.scala @@ -3,10 +3,12 @@ package com.azure.cosmos.spark import com.azure.core.management.AzureEnvironment +import com.azure.cosmos.changeFeedMetrics.ChangeFeedMetricsTracker import com.azure.cosmos.{ReadConsistencyStrategy, spark} import java.nio.charset.StandardCharsets import java.time.Instant +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import java.util.{Base64, UUID} @@ -410,7 +412,7 @@ class PartitionMetadataSpec extends UnitSpec { createdAt, lastRetrievedAt) - val gap = metadata.getWeightedLsnGap + val gap = metadata.getWeightedLsnGap(isChangeFeed = false, None) gap shouldBe (docCount.toDouble / (latestLsn - firstLsn.get) * (latestLsn - startLsn)).toLong } @@ -466,7 +468,7 @@ class PartitionMetadataSpec extends UnitSpec { createdAt, lastRetrievedAt) - val gap = metadata.getWeightedLsnGap + val gap = metadata.getWeightedLsnGap(isChangeFeed = false, None) gap shouldBe 1 } @@ -522,7 +524,7 @@ class PartitionMetadataSpec extends UnitSpec { createdAt, lastRetrievedAt) - val gap = metadata.getWeightedLsnGap + val gap = metadata.getWeightedLsnGap(isChangeFeed = false, None) gap shouldBe (docCount.toDouble / (latestLsn - firstLsn.get) * (latestLsn - startLsn)).toLong } @@ -578,10 +580,150 @@ class PartitionMetadataSpec extends UnitSpec { createdAt, lastRetrievedAt) - val gap = metadata.getWeightedLsnGap + val gap = metadata.getWeightedLsnGap(isChangeFeed = false, None) gap shouldBe 0 } + it should "calculate weighted gap using tracked metrics for change feed " in { + val clientConfig = spark.CosmosClientConfiguration( + UUID.randomUUID().toString, + UUID.randomUUID().toString, + CosmosMasterKeyAuthConfig(UUID.randomUUID().toString), + None, + UUID.randomUUID().toString, + useGatewayMode = false, + enforceNativeTransport = false, + proactiveConnectionInitialization = None, + proactiveConnectionInitializationDurationInSeconds = 120, + httpConnectionPoolSize = 1000, + readConsistencyStrategy = ReadConsistencyStrategy.EVENTUAL, + disableTcpConnectionEndpointRediscovery = false, + preferredRegionsList = Option.empty, + subscriptionId = None, + tenantId = None, + resourceGroupName = None, + azureEnvironmentEndpoints = AzureEnvironment.AZURE.getEndpoints, + sparkEnvironmentInfo = "", + clientBuilderInterceptors = None, + clientInterceptors = None, + sampledDiagnosticsLoggerConfig = None, + azureMonitorConfig = None + ) + + val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString) + val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString) + val docSizeInKB = rnd.nextInt() + val firstLsn = Some(10L) + val latestLsn = 2160 + val startLsn = 2067 + val docCount = 200174 + val nowEpochMs = Instant.now.toEpochMilli + val createdAt = new AtomicLong(nowEpochMs) + val lastRetrievedAt = new AtomicLong(nowEpochMs) + + var metadata = PartitionMetadata( + Map[String, String](), + clientConfig, + None, + containerConfig, + normalizedRange, + docCount, + docSizeInKB, + firstLsn, + latestLsn, + startLsn, + None, + createdAt, + lastRetrievedAt) + + val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + val changeFeedMetricsTracker = ChangeFeedMetricsTracker(1, normalizedRange) + changeFeedMetricsTracker.track(10, 400) + partitionMetricsMap.put(normalizedRange, changeFeedMetricsTracker) + + var gap = metadata.getWeightedLsnGap(isChangeFeed = true, Some(partitionMetricsMap)) + gap shouldBe ((latestLsn - startLsn) * 40) + + metadata = PartitionMetadata( + Map[String, String](), + clientConfig, + None, + containerConfig, + NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString), + docCount, + docSizeInKB, + firstLsn, + latestLsn, + startLsn, + None, + createdAt, + lastRetrievedAt) + // there is no match metrics, fallback to use default calculation + gap = metadata.getWeightedLsnGap(isChangeFeed = true, Some(partitionMetricsMap)) + gap shouldBe (docCount.toDouble / (latestLsn - firstLsn.get) * (latestLsn - startLsn)).toLong + } + + it should "calculate weighted gap should ignore tracked metrics for non change feed " in { + val clientConfig = spark.CosmosClientConfiguration( + UUID.randomUUID().toString, + UUID.randomUUID().toString, + CosmosMasterKeyAuthConfig(UUID.randomUUID().toString), + None, + UUID.randomUUID().toString, + useGatewayMode = false, + enforceNativeTransport = false, + proactiveConnectionInitialization = None, + proactiveConnectionInitializationDurationInSeconds = 120, + httpConnectionPoolSize = 1000, + readConsistencyStrategy = ReadConsistencyStrategy.EVENTUAL, + disableTcpConnectionEndpointRediscovery = false, + preferredRegionsList = Option.empty, + subscriptionId = None, + tenantId = None, + resourceGroupName = None, + azureEnvironmentEndpoints = AzureEnvironment.AZURE.getEndpoints, + sparkEnvironmentInfo = "", + clientBuilderInterceptors = None, + clientInterceptors = None, + sampledDiagnosticsLoggerConfig = None, + azureMonitorConfig = None + ) + + val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString) + val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString) + val docSizeInKB = rnd.nextInt() + val firstLsn = Some(10L) + val latestLsn = 2160 + val startLsn = 2067 + val docCount = 200174 + val nowEpochMs = Instant.now.toEpochMilli + val createdAt = new AtomicLong(nowEpochMs) + val lastRetrievedAt = new AtomicLong(nowEpochMs) + + val metadata = PartitionMetadata( + Map[String, String](), + clientConfig, + None, + containerConfig, + normalizedRange, + docCount, + docSizeInKB, + firstLsn, + latestLsn, + startLsn, + None, + createdAt, + lastRetrievedAt) + + val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + val changeFeedMetricsTracker = ChangeFeedMetricsTracker(1, normalizedRange) + changeFeedMetricsTracker.track(10, 400) + partitionMetricsMap.put(normalizedRange, changeFeedMetricsTracker) + + val gap = metadata.getWeightedLsnGap(isChangeFeed = false, Some(partitionMetricsMap)) + gap shouldBe (docCount.toDouble / (latestLsn - firstLsn.get) * (latestLsn - startLsn)).toLong + } + it should "calculate avg. document count per LSN correctly when there are no documents" in { val clientConfig = spark.CosmosClientConfiguration( UUID.randomUUID().toString, @@ -634,7 +776,7 @@ class PartitionMetadataSpec extends UnitSpec { createdAt, lastRetrievedAt) - val gap = metadata.getAvgItemsPerLsn + val gap = metadata.getAvgItemsPerLsn(isChangeFeed = false, None) gap shouldBe 1d } @@ -690,7 +832,7 @@ class PartitionMetadataSpec extends UnitSpec { createdAt, lastRetrievedAt) - metadata.getAvgItemsPerLsn shouldBe 10d + metadata.getAvgItemsPerLsn(isChangeFeed = false, None) shouldBe 10d docCount = 215 metadata = PartitionMetadata( @@ -708,7 +850,7 @@ class PartitionMetadataSpec extends UnitSpec { createdAt, lastRetrievedAt) - metadata.getAvgItemsPerLsn shouldBe 0.1d + metadata.getAvgItemsPerLsn(isChangeFeed = false, None) shouldBe 0.1d } it should "calculate avg. document count per LSN correctly when firstLsn was empty" in { @@ -763,7 +905,7 @@ class PartitionMetadataSpec extends UnitSpec { createdAt, lastRetrievedAt) - metadata.getAvgItemsPerLsn shouldBe 10d + metadata.getAvgItemsPerLsn(isChangeFeed = false, None) shouldBe 10d docCount = 216 metadata = PartitionMetadata( @@ -781,7 +923,146 @@ class PartitionMetadataSpec extends UnitSpec { createdAt, lastRetrievedAt) - metadata.getAvgItemsPerLsn shouldBe 1d + metadata.getAvgItemsPerLsn(isChangeFeed = false, None) shouldBe 1d + } + + it should "calculate avg. document count per LSN using tracked metrics for change feed" in { + val clientConfig = spark.CosmosClientConfiguration( + UUID.randomUUID().toString, + UUID.randomUUID().toString, + CosmosMasterKeyAuthConfig(UUID.randomUUID().toString), + None, + UUID.randomUUID().toString, + useGatewayMode = false, + enforceNativeTransport = false, + proactiveConnectionInitialization = None, + proactiveConnectionInitializationDurationInSeconds = 120, + httpConnectionPoolSize = 1000, + readConsistencyStrategy = ReadConsistencyStrategy.EVENTUAL, + disableTcpConnectionEndpointRediscovery = false, + preferredRegionsList = Option.empty, + subscriptionId = None, + tenantId = None, + resourceGroupName = None, + azureEnvironmentEndpoints = AzureEnvironment.AZURE.getEndpoints, + sparkEnvironmentInfo = "", + clientBuilderInterceptors = None, + clientInterceptors = None, + sampledDiagnosticsLoggerConfig = None, + azureMonitorConfig = None + ) + + val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString) + val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString) + val docSizeInKB = rnd.nextInt() + val firstLsn = None + val latestLsn = 2160 + val startLsn = 2067 + var docCount = 21600 + val nowEpochMs = Instant.now.toEpochMilli + val createdAt = new AtomicLong(nowEpochMs) + val lastRetrievedAt = new AtomicLong(nowEpochMs) + + var metadata = PartitionMetadata( + Map[String, String](), + clientConfig, + None, + containerConfig, + normalizedRange, + docCount, + docSizeInKB, + firstLsn, + latestLsn, + startLsn, + None, + createdAt, + lastRetrievedAt) + + val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + val changeFeedMetricsTracker = ChangeFeedMetricsTracker(1, normalizedRange) + changeFeedMetricsTracker.track(10, 10) + partitionMetricsMap.put(normalizedRange, changeFeedMetricsTracker) + + metadata.getAvgItemsPerLsn(isChangeFeed = true, Some(partitionMetricsMap)) shouldBe 1d + + metadata = PartitionMetadata( + Map[String, String](), + clientConfig, + None, + containerConfig, + NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString), + docCount, + docSizeInKB, + firstLsn, + latestLsn, + startLsn, + None, + createdAt, + lastRetrievedAt) + + // there is no matching metrics, so use the default avg item per lsn + metadata.getAvgItemsPerLsn(isChangeFeed = true, Some(partitionMetricsMap)) shouldBe 10d + } + + it should "calculate avg. document count per LSN by ignoring tracked metrics for non change feed request" in { + val clientConfig = spark.CosmosClientConfiguration( + UUID.randomUUID().toString, + UUID.randomUUID().toString, + CosmosMasterKeyAuthConfig(UUID.randomUUID().toString), + None, + UUID.randomUUID().toString, + useGatewayMode = false, + enforceNativeTransport = false, + proactiveConnectionInitialization = None, + proactiveConnectionInitializationDurationInSeconds = 120, + httpConnectionPoolSize = 1000, + readConsistencyStrategy = ReadConsistencyStrategy.EVENTUAL, + disableTcpConnectionEndpointRediscovery = false, + preferredRegionsList = Option.empty, + subscriptionId = None, + tenantId = None, + resourceGroupName = None, + azureEnvironmentEndpoints = AzureEnvironment.AZURE.getEndpoints, + sparkEnvironmentInfo = "", + clientBuilderInterceptors = None, + clientInterceptors = None, + sampledDiagnosticsLoggerConfig = None, + azureMonitorConfig = None + ) + + val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString) + val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString) + val docSizeInKB = rnd.nextInt() + val firstLsn = None + val latestLsn = 2160 + val startLsn = 2067 + var docCount = 21600 + val nowEpochMs = Instant.now.toEpochMilli + val createdAt = new AtomicLong(nowEpochMs) + val lastRetrievedAt = new AtomicLong(nowEpochMs) + + var metadata = PartitionMetadata( + Map[String, String](), + clientConfig, + None, + containerConfig, + normalizedRange, + docCount, + docSizeInKB, + firstLsn, + latestLsn, + startLsn, + None, + createdAt, + lastRetrievedAt) + + val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() + val changeFeedMetricsTracker = ChangeFeedMetricsTracker(1, normalizedRange) + changeFeedMetricsTracker.track(10, 10) + partitionMetricsMap.put(normalizedRange, changeFeedMetricsTracker) + + // partition metrics is only considered for change feed request + metadata.getAvgItemsPerLsn(isChangeFeed = false, Some(partitionMetricsMap)) shouldBe 10d } //scalastyle:off null