Skip to content

Commit afd96dd

Browse files
author
annie-mac
committed
update config name
1 parent aaeac30 commit afd96dd

File tree

9 files changed

+48
-48
lines changed

9 files changed

+48
-48
lines changed

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedLsnGapMetric.scala

Lines changed: 0 additions & 17 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.changeFeedMetrics
5+
6+
import com.azure.cosmos.spark.CosmosConstants.MetricNames
7+
import org.apache.spark.sql.connector.metric.CustomSumMetric
8+
9+
/***
10+
* This metric is used to track the lsn range for partition within a change feed micro-batch
11+
*/
12+
private[cosmos] class ChangeFeedLsnRangeMetric extends CustomSumMetric {
13+
14+
override def name(): String = MetricNames.ChangeFeedLsnRange
15+
16+
override def description(): String = MetricNames.ChangeFeedLsnRange
17+
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListener.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ private[cosmos] class ChangeFeedMetricsListener(
1919
try {
2020
val metrics = SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
2121
Set(
22-
CosmosConstants.MetricNames.ChangeFeedLsnGap,
22+
CosmosConstants.MetricNames.ChangeFeedLsnRange,
2323
CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt,
2424
CosmosConstants.MetricNames.ChangeFeedPartitionIndex
2525
),
@@ -34,16 +34,16 @@ private[cosmos] class ChangeFeedMetricsListener(
3434
partitionMetricsMap.putIfAbsent(normalizedRange, new ChangeFeedMetricsTracker(index, normalizedRange))
3535
val metricsTracker = partitionMetricsMap.get(normalizedRange)
3636
val changesCnt = getChangesCnt(metrics)
37-
val lsnGap = getLsnGap(metrics)
37+
val lsnRange = getLsnRange(metrics)
3838

39-
if (changesCnt >= 0 && lsnGap >= 0) {
39+
if (changesCnt >= 0 && lsnRange >= 0) {
4040
metricsTracker.track(
41-
metrics(CosmosConstants.MetricNames.ChangeFeedLsnGap).value,
41+
metrics(CosmosConstants.MetricNames.ChangeFeedLsnRange).value,
4242
metrics(CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt).value
4343
)
4444
}
4545

46-
logInfo(s"onTaskEnd for partition index $index, changesCnt $changesCnt, lsnGap $lsnGap")
46+
logInfo(s"onTaskEnd for partition index $index, changesCnt $changesCnt, lsnRange $lsnRange")
4747
}
4848
}
4949
} catch {
@@ -62,9 +62,9 @@ private[cosmos] class ChangeFeedMetricsListener(
6262
}
6363
}
6464

65-
def getLsnGap(metrics: Map[String, SQLMetric]): Long = {
66-
if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedLsnGap)) {
67-
metrics(CosmosConstants.MetricNames.ChangeFeedLsnGap).value
65+
def getLsnRange(metrics: Map[String, SQLMetric]): Long = {
66+
if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedLsnRange)) {
67+
metrics(CosmosConstants.MetricNames.ChangeFeedLsnRange).value
6868
} else {
6969
-1
7070
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ private[cosmos] class ChangeFeedMetricsTracker(
4040
/**
4141
* Track the normalized change feed changes per lsn
4242
*
43-
* @param lsnGap the lsn gap has been observed.
43+
* @param lsnRange the lsn range of the fetched changes.
4444
* @param changesFetchedCnt the total fetched changes.
4545
*/
46-
def track(lsnGap: Long, changesFetchedCnt: Long): Unit = {
46+
def track(lsnRange: Long, changesFetchedCnt: Long): Unit = {
4747
val effectiveChangesFetchedCnt = Math.max(1, changesFetchedCnt)
48-
val changesPerLsn = if (lsnGap == 0) effectiveChangesFetchedCnt.toDouble else effectiveChangesFetchedCnt.toDouble / lsnGap
48+
val changesPerLsn = if (lsnRange == 0) effectiveChangesFetchedCnt.toDouble else effectiveChangesFetchedCnt.toDouble / lsnRange
4949
synchronized {
5050
changeFeedChangesPerLsnHistory.add(changesPerLsn)
5151
calculateWeightedChangesPerLsn()
@@ -57,7 +57,7 @@ private[cosmos] class ChangeFeedMetricsTracker(
5757
* Uses exponential weighting: weight = decayFactor^(n-i-1) where:
5858
* n = number of measurements
5959
* i = index of measurement (0 being oldest)
60-
* @return Weighted average of LSN gaps
60+
* @return Weighted changes per lsn
6161
*/
6262
private def calculateWeightedChangesPerLsn(): Unit = {
6363
if (!changeFeedChangesPerLsnHistory.isEmpty) {
@@ -80,8 +80,8 @@ private[cosmos] class ChangeFeedMetricsTracker(
8080
* Gets current weighted changes per lsn
8181
* @return weighted changes per lsn
8282
*/
83-
def getWeightedAvgChangesPerLsn: Option[Double] = {
84-
logDebug(s"getWeightedAvgChangesPerLsn for feedRangeIndex [$partitionIndex], feedRange [$feedRange] value [$currentChangesPerLsnOpt]")
83+
def getWeightedChangesPerLsn: Option[Double] = {
84+
logDebug(s"getWeightedChangesPerLsn for feedRangeIndex [$partitionIndex], feedRange [$feedRange] value [$currentChangesPerLsnOpt]")
8585
this.currentChangesPerLsnOpt
8686
}
8787
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private class ChangeFeedBatch
193193
SparkBridgeImplementationInternal
194194
.extractChangeFeedStateForRange(initialOffsetJson, partition.feedRange),
195195
clearEndLsn = !hasBatchCheckpointLocation))
196-
.map(_.asInstanceOf[InputPartition])
196+
.map(_.asInstanceOf[InputPartition])
197197

198198
log.logInfo(s"<-- planInputPartitions $batchId (creating ${inputPartitions.length} partitions)")
199199
inputPartitions

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ private case class ChangeFeedPartitionReader
5151
private val startLsn = getPartitionStartLsn
5252
private var latestLsnReturned: Option[Long] = None
5353

54-
private val changeFeedLSNGapMetric = new CustomTaskMetric {
55-
override def name(): String = MetricNames.ChangeFeedLsnGap
56-
override def value(): Long = getChangeFeedLSNGap
54+
private val changeFeedLSNRangeMetric = new CustomTaskMetric {
55+
override def name(): String = MetricNames.ChangeFeedLsnRange
56+
override def value(): Long = getChangeFeedLSNRange
5757
}
5858
private val changeFeedFetchedChangesCntMetric = new CustomTaskMetric {
5959
override def name(): String = MetricNames.ChangeFeedFetchedChangesCnt
@@ -109,7 +109,7 @@ private case class ChangeFeedPartitionReader
109109

110110
override def currentMetricsValues(): Array[CustomTaskMetric] = {
111111
Array(
112-
changeFeedLSNGapMetric,
112+
changeFeedLSNRangeMetric,
113113
changeFeedFetchedChangesCntMetric,
114114
changeFeedPartitionIndexMetric
115115
)
@@ -290,7 +290,7 @@ private case class ChangeFeedPartitionReader
290290
this.iterator.getTotalChangesFetched
291291
}
292292

293-
private def getChangeFeedLSNGap: Long = {
293+
private def getChangeFeedLSNRange: Long = {
294294
// calculate the changes per lsn
295295
val latestLsnOpt = this.iterator.getLatestContinuationToken match {
296296
case Some(continuationToken) =>

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedScan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5-
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedFetchedChangesCntMetric, ChangeFeedLsnGapMetric, ChangeFeedPartitionIndexMetric}
5+
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedFetchedChangesCntMetric, ChangeFeedLsnRangeMetric, ChangeFeedPartitionIndexMetric}
66
import com.azure.cosmos.spark.diagnostics.LoggerHelper
77
import org.apache.spark.broadcast.Broadcast
88
import org.apache.spark.sql.SparkSession
@@ -69,7 +69,7 @@ private case class ChangeFeedScan
6969

7070
override def supportedCustomMetrics(): Array[CustomMetric] = {
7171
Array(
72-
new ChangeFeedLsnGapMetric,
72+
new ChangeFeedLsnRangeMetric,
7373
new ChangeFeedFetchedChangesCntMetric,
7474
new ChangeFeedPartitionIndexMetric
7575
)

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private[cosmos] object CosmosConstants {
3535
val RecordsWritten = "recordsWritten"
3636
val TotalRequestCharge = "cosmos.totalRequestCharge"
3737

38-
val ChangeFeedLsnGap = "cosmos.changeFeed.partition.lsnGap"
38+
val ChangeFeedLsnRange = "cosmos.changeFeed.partition.lsnRange"
3939
val ChangeFeedFetchedChangesCnt = "cosmos.changeFeed.partition.fetchedChangesCnt"
4040
val ChangeFeedPartitionIndex = "cosmos.changeFeed.partition.index"
4141

@@ -79,11 +79,11 @@ private[cosmos] object CosmosConstants {
7979
}
8080

8181
object ChangeFeedMetricsListenerConfig {
82-
val metricsListenerEnabledPropertyName = "cosmos.changeFeed.metricsListener.enabled"
83-
val metricsListenerEnabledEnvName = "cosmos_changeFeed_metricsListener_enabled"
84-
val enableByDefalult = "true"
82+
val metricsListenerEnabledPropertyName = "cosmos.changeFeed.performance.monitoring.enabled"
83+
val metricsListenerEnabledEnvName = "COSMOS_CHANGEFEED_PERFORMANCE_MONITORING_ENABLED"
84+
val enableByDefault = "true"
8585
val metricsListenerEnabled = Option(System.getProperty(metricsListenerEnabledPropertyName))
8686
.orElse(sys.env.get(metricsListenerEnabledEnvName))
87-
.getOrElse(enableByDefalult).toBoolean
87+
.getOrElse(enableByDefault).toBoolean
8888
}
8989
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
555555
partitionMetricsMap match {
556556
case Some(metricsMap) =>
557557
if (metricsMap.containsKey(metadata.feedRange)) {
558-
changesPerLsnFromMetricsOpt = metricsMap.get(metadata.feedRange).getWeightedAvgChangesPerLsn
558+
changesPerLsnFromMetricsOpt = metricsMap.get(metadata.feedRange).getWeightedChangesPerLsn
559559
}
560560
case None =>
561561
}
@@ -572,9 +572,9 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
572572
var changesPerLsnFromMetricsOpt: Option[Double] = None
573573
if (isChangeFeed) {
574574
partitionMetricsMap match {
575-
case Some(matricsMap) =>
576-
if (matricsMap.containsKey(metadata.feedRange)) {
577-
changesPerLsnFromMetricsOpt = partitionMetricsMap.get.get(metadata.feedRange).getWeightedAvgChangesPerLsn
575+
case Some(metricsMap) =>
576+
if (metricsMap.containsKey(metadata.feedRange)) {
577+
changesPerLsnFromMetricsOpt = partitionMetricsMap.get.get(metadata.feedRange).getWeightedChangesPerLsn
578578
}
579579
case None =>
580580
}

0 commit comments

Comments
 (0)