Skip to content

Commit 3f5dd25

Browse files
author
annie-mac
committed
change
1 parent 50a5907 commit 3f5dd25

File tree

10 files changed

+105
-57
lines changed

10 files changed

+105
-57
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@ private class ChangeFeedMicroBatchStream
6969
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
7070

7171
// Register metrics listener
72-
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
72+
if (CosmosConstants.ChangeFeedMetricsListenerConfig.metricsListenerEnabled) {
73+
log.logInfo("Register ChangeFeedMetricsListener")
74+
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
75+
} else {
76+
log.logInfo("ChangeFeedMetricsListener is disabled")
77+
}
7378

7479
override def latestOffset(): Offset = {
7580
// For Spark data streams implementing SupportsAdmissionControl trait

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ private class ChangeFeedMicroBatchStream
7070
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
7171
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
7272

73-
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
73+
if (CosmosConstants.ChangeFeedMetricsListenerConfig.metricsListenerEnabled) {
74+
log.logInfo("Register ChangeFeedMetricsListener")
75+
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
76+
} else {
77+
log.logInfo("ChangeFeedMetricsListener is disabled")
78+
}
7479

7580
override def latestOffset(): Offset = {
7681
// For Spark data streams implementing SupportsAdmissionControl trait
@@ -169,7 +174,8 @@ private class ChangeFeedMicroBatchStream
169174
this.containerConfig,
170175
this.partitioningConfig,
171176
this.defaultParallelism,
172-
this.container
177+
this.container,
178+
Some(this.partitionMetricsMap)
173179
)
174180

175181
if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
1111
import org.apache.spark.util.AccumulatorV2
1212

1313
import java.lang.reflect.Method
14-
import java.util.{Locale, Optional}
14+
import java.util.Locale
1515
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
1616

1717
object SparkInternalsBridge extends BasicLoggingTrait {

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,12 @@ private class ChangeFeedMicroBatchStream
7171
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
7272
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
7373

74-
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
74+
if (CosmosConstants.ChangeFeedMetricsListenerConfig.metricsListenerEnabled) {
75+
log.logInfo("Register ChangeFeedMetricsListener")
76+
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
77+
} else {
78+
log.logInfo("ChangeFeedMetricsListener is disabled")
79+
}
7580

7681
override def latestOffset(): Offset = {
7782
// For Spark data streams implementing SupportsAdmissionControl trait
@@ -170,7 +175,8 @@ private class ChangeFeedMicroBatchStream
170175
this.containerConfig,
171176
this.partitioningConfig,
172177
this.defaultParallelism,
173-
this.container
178+
this.container,
179+
Some(this.partitionMetricsMap)
174180
)
175181

176182
if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListener.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.azure.cosmos.implementation.guava25.collect.BiMap
77
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
88
import com.azure.cosmos.spark.{CosmosConstants, NormalizedRange, SparkInternalsBridge}
99
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
10+
import org.apache.spark.sql.execution.metric.SQLMetric
1011

1112
import java.util.concurrent.ConcurrentHashMap
1213

@@ -32,10 +33,17 @@ private[cosmos] class ChangeFeedMetricsListener(
3233
if (normalizedRange != null) {
3334
partitionMetricsMap.putIfAbsent(normalizedRange, new ChangeFeedMetricsTracker(index, normalizedRange))
3435
val metricsTracker = partitionMetricsMap.get(normalizedRange)
35-
metricsTracker.track(
36-
metrics(CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt).value,
37-
metrics(CosmosConstants.MetricNames.ChangeFeedLsnGap).value
38-
)
36+
val changesCnt = getChangesCnt(metrics)
37+
val lsnGap = getLsnGap(metrics)
38+
39+
if (changesCnt >= 0 && lsnGap >= 0) {
40+
metricsTracker.track(
41+
metrics(CosmosConstants.MetricNames.ChangeFeedLsnGap).value,
42+
metrics(CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt).value
43+
)
44+
}
45+
46+
logTrace(s"onTaskEnd for partition index $index, changesCnt $changesCnt, lsnGap $lsnGap")
3947
}
4048
}
4149
} catch {
@@ -45,4 +53,20 @@ private[cosmos] class ChangeFeedMetricsListener(
4553
logWarning("Tracking changeFeed metrics failed", e)
4654
}
4755
}
56+
57+
def getChangesCnt(metrics: Map[String, SQLMetric]): Long = {
58+
if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt)) {
59+
metrics(CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt).value
60+
} else {
61+
-1
62+
}
63+
}
64+
65+
def getLsnGap(metrics: Map[String, SQLMetric]): Long = {
66+
if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedLsnGap)) {
67+
metrics(CosmosConstants.MetricNames.ChangeFeedLsnGap).value
68+
} else {
69+
-1
70+
}
71+
}
4872
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@
44
package com.azure.cosmos.changeFeedMetrics
55

66
import com.azure.cosmos.implementation.guava25.collect.EvictingQueue
7-
import com.azure.cosmos.spark.NormalizedRange
8-
import org.slf4j.{Logger, LoggerFactory}
7+
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
8+
import com.azure.cosmos.spark.{CosmosConstants, NormalizedRange}
99

1010
// scalastyle:off underscore.import
1111
import scala.collection.JavaConverters._
1212
// scalastyle:on underscore.import
1313

1414
private[cosmos] object ChangeFeedMetricsTracker {
15-
// Default values for tracker configuration
16-
private val DefaultMaxHistory: Int = 5
17-
private val DefaultDecayFactor: Double = 0.85
15+
private val DefaultMaxHistory: Int = CosmosConstants.ChangeFeedTrackerConfigs.changeFeedMetricsTrackerMaxHistory
16+
private val DefaultDecayFactor: Double = CosmosConstants.ChangeFeedTrackerConfigs.changeFeedMetricsTrackerDecayFactor
1817

1918
def apply(partitionIndex: Long, feedRange: NormalizedRange): ChangeFeedMetricsTracker = {
2019
new ChangeFeedMetricsTracker(partitionIndex, feedRange)
@@ -33,20 +32,20 @@ private[cosmos] class ChangeFeedMetricsTracker(
3332
private val partitionIndex: Long,
3433
private val feedRange: NormalizedRange,
3534
private val maxHistory: Int = ChangeFeedMetricsTracker.DefaultMaxHistory,
36-
private val decayFactor: Double = ChangeFeedMetricsTracker.DefaultDecayFactor
37-
) {
38-
@transient private lazy val log : Logger = LoggerFactory.getLogger(ChangeFeedMetricsTracker.getClass)
35+
private val decayFactor: Double = ChangeFeedMetricsTracker.DefaultDecayFactor) extends BasicLoggingTrait {
3936

4037
private val changeFeedChangesPerLsnHistory = EvictingQueue.create[Double](maxHistory)
4138
private var currentChangesPerLsnOpt: Option[Double] = None
4239

4340
/**
4441
* Track the normalized change feed changes per lsn
45-
* @param changesPerLsn The latest changes per lsn metric.
42+
*
43+
* @param lsnGap the lsn gap has been observed.
44+
* @param changesFetchedCnt the total fetched changes.
4645
*/
4746
def track(lsnGap: Long, changesFetchedCnt: Long): Unit = {
4847
val effectiveChangesFetchedCnt = Math.max(1, changesFetchedCnt)
49-
val changesPerLsn = effectiveChangesFetchedCnt.toDouble / lsnGap
48+
val changesPerLsn = if (lsnGap == 0) effectiveChangesFetchedCnt.toDouble else effectiveChangesFetchedCnt.toDouble / lsnGap
5049
synchronized {
5150
changeFeedChangesPerLsnHistory.add(changesPerLsn)
5251
calculateWeightedChangesPerLsn()
@@ -61,33 +60,28 @@ private[cosmos] class ChangeFeedMetricsTracker(
6160
* @return Weighted average of LSN gaps
6261
*/
6362
private def calculateWeightedChangesPerLsn(): Unit = {
64-
synchronized {
65-
if (changeFeedChangesPerLsnHistory.isEmpty) {
66-
None
67-
} else {
68-
val gaps = changeFeedChangesPerLsnHistory.asScala.toArray
69-
val n = gaps.length
70-
var weightedSum = 0.0
71-
var weightSum = 0.0
72-
73-
for (i <- gaps.indices) {
74-
val weight = math.pow(decayFactor, n - i - 1)
75-
weightedSum += gaps(i) * weight
76-
weightSum += weight
77-
}
63+
if (!changeFeedChangesPerLsnHistory.isEmpty) {
64+
val gaps = changeFeedChangesPerLsnHistory.asScala.toArray
65+
val n = gaps.length
66+
var weightedSum = 0.0
67+
var weightSum = 0.0
7868

79-
currentChangesPerLsnOpt = Some(weightedSum / weightSum)
69+
for (i <- gaps.indices) {
70+
val weight = math.pow(decayFactor, n - i - 1)
71+
weightedSum += gaps(i) * weight
72+
weightSum += weight
8073
}
74+
75+
currentChangesPerLsnOpt = Some(weightedSum / weightSum)
8176
}
8277
}
8378

8479
/**
8580
* Gets current weighted changes per lsn
86-
* @return Current weighted LSN gap
81+
* @return weighted changes per lsn
8782
*/
8883
def getWeightedAvgChangesPerLsn: Option[Double] = {
89-
// TODO: remove
90-
log.info(s"getWeightedAvgChangesPerLsn for feedRangeIndex $partitionIndex, $feedRange $feedRange value $currentChangesPerLsnOpt")
84+
logDebug(s"getWeightedAvgChangesPerLsn for feedRangeIndex [$partitionIndex], feedRange [$feedRange] value [$currentChangesPerLsnOpt]")
9185
this.currentChangesPerLsnOpt
9286
}
9387
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedPartitionIndexMetric.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import com.azure.cosmos.spark.CosmosConstants.MetricNames
77
import org.apache.spark.sql.connector.metric.CustomMetric
88

99
/***
10-
* This metric is used to capture the consistent cosmos partition index
10+
* This metric is used to capture the cosmos partition index in a consistent way
1111
*/
1212
private[cosmos] class ChangeFeedPartitionIndexMetric extends CustomMetric {
1313

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ private case class ChangeFeedPartitionReader
6565
}
6666

6767
private val containerTargetConfig = CosmosContainerConfig.parseCosmosContainerConfig(config)
68-
log.logInfo(s"Reading from feed range ${partition.feedRange}, endLsn ${partition.endLsn} of " +
68+
log.logInfo(s"Reading from feed range ${partition.feedRange}, startLsn $getPartitionStartLsn, " +
69+
s"endLsn ${partition.endLsn} of " +
6970
s"container ${containerTargetConfig.database}.${containerTargetConfig.container}")
7071
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
7172
private val clientCacheItem = CosmosClientCache(
@@ -107,7 +108,6 @@ private case class ChangeFeedPartitionReader
107108

108109

109110
override def currentMetricsValues(): Array[CustomTaskMetric] = {
110-
log.logDebug(s"calling get current metrics values ${partition.feedRange}")
111111
Array(
112112
changeFeedLSNGapMetric,
113113
changeFeedFetchedChangesCntMetric,
@@ -302,6 +302,6 @@ private case class ChangeFeedPartitionReader
302302
case None => if (this.partition.endLsn.isDefined) this.partition.endLsn else this.latestLsnReturned
303303
}
304304

305-
latestLsnOpt.get - startLsn
305+
if (latestLsnOpt.isDefined) latestLsnOpt.get - startLsn else 0
306306
}
307307
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@ private[cosmos] object CosmosConstants {
1919
val maxRetryIntervalForTransientFailuresInMs = 5000
2020
val maxRetryCountForTransientFailures = 100
2121
val defaultDirectRequestTimeoutInSeconds = 10L
22-
val defaultHttpRequestTimeoutInSeconds = 70L
2322
val feedRangesCacheIntervalInMinutes = 1L
2423
val defaultIoThreadCountFactorPerCore = 4
2524
val smallestPossibleReactorQueueSizeLargerThanOne: Int = math.min(8, Queues.XS_BUFFER_SIZE)
26-
val defaultMetricsIntervalInSeconds = 60
27-
val defaultSlf4jMetricReporterEnabled = false
2825
val readOperationEndToEndTimeoutInSeconds = 65
2926
val batchOperationEndToEndTimeoutInSeconds = 65
3027

@@ -75,4 +72,18 @@ private[cosmos] object CosmosConstants {
7572
val DefaultTtlInSeconds = "DefaultTtlInSeconds"
7673
val AnalyticalStoreTtlInSeconds = "AnalyticalStoreTtlInSeconds"
7774
}
75+
76+
object ChangeFeedTrackerConfigs {
77+
val changeFeedMetricsTrackerMaxHistory = 5
78+
val changeFeedMetricsTrackerDecayFactor = 0.85
79+
}
80+
81+
object ChangeFeedMetricsListenerConfig {
82+
val metricsListenerEnabledPropertyName = "cosmos.changeFeed.metricsListener.enabled"
83+
val metricsListenerEnabledEnvName = "cosmos_changeFeed_metricsListener_enabled"
84+
val enableByDefalult = "true"
85+
val metricsListenerEnabled = Option(System.getProperty(metricsListenerEnabledPropertyName))
86+
.orElse(sys.env.get(metricsListenerEnabledEnvName))
87+
.getOrElse(enableByDefalult).toBoolean
88+
}
7889
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -565,9 +565,10 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
565565
}
566566

567567
private[spark] def getAvgChangesPerLsn(
568-
metadata: PartitionMetadata,
569-
isChangeFeed: Boolean,
570-
partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None): Double = {
568+
metadata: PartitionMetadata,
569+
isChangeFeed: Boolean,
570+
partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None): Double = {
571+
571572
var changesPerLsnFromMetricsOpt: Option[Double] = None
572573
if (isChangeFeed) {
573574
partitionMetricsMap match {
@@ -617,11 +618,12 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
617618
effectiveLatestLsn
618619
} else {
619620
// the weight of this feedRange compared to other feedRanges
621+
val weightedLsnGap = this.getWeightedLsnGap(metadata, isChangeFeed, partitionMetricsMap).toDouble
622+
val avgChangesPerLsn = this.getAvgChangesPerLsn(metadata, isChangeFeed, partitionMetricsMap)
623+
val feedRangeWeightFactor = weightedLsnGap / totalWeightedLsnGap.get
620624

621-
val feedRangeWeightFactor = this.getWeightedLsnGap(metadata, isChangeFeed, partitionMetricsMap).toDouble / totalWeightedLsnGap.get
622-
623-
val allowedRate =
624-
(feedRangeWeightFactor * maxRowsLimit.maxRows() / this.getAvgChangesPerLsn(metadata, isChangeFeed, partitionMetricsMap))
625+
val allowedRate =
626+
(feedRangeWeightFactor * maxRowsLimit.maxRows() / avgChangesPerLsn)
625627
.toLong
626628
.max(1)
627629
val effectiveLatestLsn = metadata.getAndValidateLatestLsn
@@ -630,11 +632,11 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
630632
metadata.startLsn + allowedRate)
631633
if (isDebugLogEnabled) {
632634
val calculateDebugLine = s"calculateEndLsn (feedRange: ${metadata.feedRange}) - avg. Docs/LSN: " +
633-
s"${this.getAvgChangesPerLsn(metadata, isChangeFeed, partitionMetricsMap)} feedRangeWeightFactor $feedRangeWeightFactor documentCount " +
634-
s"${metadata.documentCount} firstLsn ${metadata.firstLsn} latestLsn ${metadata.latestLsn} " +
635-
s"effectiveLatestLsn $effectiveLatestLsn startLsn " +
636-
s"${metadata.startLsn} allowedRate $allowedRate weightedGap ${this.getWeightedLsnGap(metadata, isChangeFeed, partitionMetricsMap)} " +
637-
s"effectiveEndLsn $effectiveEndLsn maxRows $maxRowsLimit.maxRows"
635+
s"$avgChangesPerLsn feedRangeWeightFactor $feedRangeWeightFactor documentCount " +
636+
s"${metadata.documentCount} firstLsn ${metadata.firstLsn} latestLsn ${metadata.latestLsn} " +
637+
s"effectiveLatestLsn $effectiveLatestLsn startLsn " +
638+
s"${metadata.startLsn} allowedRate $allowedRate weightedGap $weightedLsnGap " +
639+
s"effectiveEndLsn $effectiveEndLsn maxRows $maxRowsLimit.maxRows"
638640
logDebug(calculateDebugLine)
639641
}
640642

0 commit comments

Comments
 (0)