Skip to content

Commit 30f5247

Browse files
author
annie-mac
committed
refactor
1 parent 677026b commit 30f5247

File tree

20 files changed

+189
-181
lines changed

20 files changed

+189
-181
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451)
1111

1212
#### Other Changes
13-
* Added improvement for end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)
13+
* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)
1414

1515
### 4.38.0 (2025-07-31)
1616

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@ private class ChangeFeedMicroBatchStream
6969
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
7070

7171
// Register metrics listener
72-
if (CosmosConstants.ChangeFeedMetricsListenerConfig.metricsListenerEnabled) {
73-
log.logInfo("Register ChangeFeedMetricsListener")
72+
if (changeFeedConfig.performanceMonitoringEnabled) {
73+
log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener")
7474
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
7575
} else {
76-
log.logInfo("ChangeFeedMetricsListener is disabled")
76+
log.logInfo("ChangeFeed performance monitoring is disabled")
7777
}
7878

7979
override def latestOffset(): Offset = {

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451)
1111

1212
#### Other Changes
13-
* Added improvement for end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)
13+
* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)
1414

1515
### 4.38.0 (2025-07-31)
1616

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ private class ChangeFeedMicroBatchStream
7070
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
7171
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
7272

73-
if (CosmosConstants.ChangeFeedMetricsListenerConfig.metricsListenerEnabled) {
74-
log.logInfo("Register ChangeFeedMetricsListener")
73+
if (changeFeedConfig.performanceMonitoringEnabled) {
74+
log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener")
7575
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
7676
} else {
77-
log.logInfo("ChangeFeedMetricsListener is disabled")
77+
log.logInfo("ChangeFeed performance monitoring is disabled")
7878
}
7979

8080
override def latestOffset(): Offset = {

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451)
1111

1212
#### Other Changes
13-
* Added improvement for end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)
13+
* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)
1414

1515
### 4.38.0 (2025-07-31)
1616

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,11 @@ private class ChangeFeedMicroBatchStream
7171
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
7272
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
7373

74-
if (CosmosConstants.ChangeFeedMetricsListenerConfig.metricsListenerEnabled) {
75-
log.logInfo("Register ChangeFeedMetricsListener")
74+
if (changeFeedConfig.performanceMonitoringEnabled) {
75+
log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener")
7676
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
7777
} else {
78-
log.logInfo("ChangeFeedMetricsListener is disabled")
78+
log.logInfo("ChangeFeed performance monitoring is disabled")
7979
}
8080

8181
override def latestOffset(): Offset = {

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedFetchedChangesCntMetric.scala

Lines changed: 0 additions & 18 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.changeFeedMetrics
5+
6+
import com.azure.cosmos.spark.CosmosConstants.MetricNames
7+
import org.apache.spark.sql.connector.metric.CustomSumMetric
8+
9+
/***
10+
* This metric is used to track the partition total items count within a change feed micro-batch
11+
* Note: not all the items will be returned back to spark
12+
*/
13+
private[cosmos] class ChangeFeedItemsCntMetric extends CustomSumMetric {
14+
15+
override def name(): String = MetricNames.ChangeFeedItemsCnt
16+
17+
override def description(): String = MetricNames.ChangeFeedItemsCnt
18+
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListener.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ private[cosmos] class ChangeFeedMetricsListener(
2020
val metrics = SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
2121
Set(
2222
CosmosConstants.MetricNames.ChangeFeedLsnRange,
23-
CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt,
23+
CosmosConstants.MetricNames.ChangeFeedItemsCnt,
2424
CosmosConstants.MetricNames.ChangeFeedPartitionIndex
2525
),
2626
taskEnd.taskMetrics
@@ -31,19 +31,19 @@ private[cosmos] class ChangeFeedMetricsListener(
3131

3232
val normalizedRange = partitionIndexMap.inverse().get(index)
3333
if (normalizedRange != null) {
34-
partitionMetricsMap.putIfAbsent(normalizedRange, new ChangeFeedMetricsTracker(index, normalizedRange))
34+
partitionMetricsMap.putIfAbsent(normalizedRange, ChangeFeedMetricsTracker(index, normalizedRange))
3535
val metricsTracker = partitionMetricsMap.get(normalizedRange)
36-
val changesCnt = getChangesCnt(metrics)
36+
val fetchedItemCnt = getFetchedItemCnt(metrics)
3737
val lsnRange = getLsnRange(metrics)
3838

39-
if (changesCnt >= 0 && lsnRange >= 0) {
39+
if (fetchedItemCnt >= 0 && lsnRange >= 0) {
4040
metricsTracker.track(
4141
metrics(CosmosConstants.MetricNames.ChangeFeedLsnRange).value,
42-
metrics(CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt).value
42+
metrics(CosmosConstants.MetricNames.ChangeFeedItemsCnt).value
4343
)
4444
}
4545

46-
logInfo(s"onTaskEnd for partition index $index, changesCnt $changesCnt, lsnRange $lsnRange")
46+
logInfo(s"onTaskEnd for partition index $index, fetchedItemCnt $fetchedItemCnt, lsnRange $lsnRange")
4747
}
4848
}
4949
} catch {
@@ -54,15 +54,15 @@ private[cosmos] class ChangeFeedMetricsListener(
5454
}
5555
}
5656

57-
def getChangesCnt(metrics: Map[String, SQLMetric]): Long = {
58-
if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt)) {
59-
metrics(CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt).value
57+
private def getFetchedItemCnt(metrics: Map[String, SQLMetric]): Long = {
58+
if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedItemsCnt)) {
59+
metrics(CosmosConstants.MetricNames.ChangeFeedItemsCnt).value
6060
} else {
6161
-1
6262
}
6363
}
6464

65-
def getLsnRange(metrics: Map[String, SQLMetric]): Long = {
65+
private def getLsnRange(metrics: Map[String, SQLMetric]): Long = {
6666
if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedLsnRange)) {
6767
metrics(CosmosConstants.MetricNames.ChangeFeedLsnRange).value
6868
} else {

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,76 +12,81 @@ import scala.collection.JavaConverters._
1212
// scalastyle:on underscore.import
1313

1414
private[cosmos] object ChangeFeedMetricsTracker {
15-
private val DefaultMaxHistory: Int = CosmosConstants.ChangeFeedTrackerConfigs.changeFeedMetricsTrackerMaxHistory
16-
private val DefaultDecayFactor: Double = CosmosConstants.ChangeFeedTrackerConfigs.changeFeedMetricsTrackerDecayFactor
15+
private val MetricsHistory: Int = CosmosConstants.ChangeFeedMetricsConfigs.MetricsHistory
16+
private val MetricsHistoryDecayFactor: Double = CosmosConstants.ChangeFeedMetricsConfigs.MetricsHistoryDecayFactor
1717

18-
def apply(partitionIndex: Long, feedRange: NormalizedRange): ChangeFeedMetricsTracker = {
19-
new ChangeFeedMetricsTracker(partitionIndex, feedRange)
18+
def apply(
19+
partitionIndex: Long,
20+
feedRange: NormalizedRange,
21+
metricsHistory: Int,
22+
metricsHistoryDecayFactor: Double): ChangeFeedMetricsTracker = {
23+
new ChangeFeedMetricsTracker(partitionIndex, feedRange, MetricsHistory, MetricsHistoryDecayFactor)
2024
}
2125

22-
def apply(partitionIndex: Long, feedRange: NormalizedRange, decayFactor: Double): ChangeFeedMetricsTracker = {
23-
new ChangeFeedMetricsTracker(partitionIndex, feedRange, DefaultMaxHistory, decayFactor)
26+
def apply(partitionIndex: Long, feedRange: NormalizedRange): ChangeFeedMetricsTracker = {
27+
new ChangeFeedMetricsTracker(partitionIndex, feedRange, MetricsHistory, MetricsHistoryDecayFactor)
2428
}
2529
}
2630

2731
/**
28-
* Tracks metrics for Change Feed operations including partition indices and LSN gaps
32+
* Tracks metrics for Change Feed operations including partition indices and LSN range
2933
* with exponential weighting for recent metrics
3034
*/
3135
private[cosmos] class ChangeFeedMetricsTracker(
3236
private val partitionIndex: Long,
3337
private val feedRange: NormalizedRange,
34-
private val maxHistory: Int = ChangeFeedMetricsTracker.DefaultMaxHistory,
35-
private val decayFactor: Double = ChangeFeedMetricsTracker.DefaultDecayFactor) extends BasicLoggingTrait {
38+
private val maxHistory: Int = ChangeFeedMetricsTracker.MetricsHistory,
39+
private val decayFactor: Double = ChangeFeedMetricsTracker.MetricsHistoryDecayFactor) extends BasicLoggingTrait {
3640

37-
private val changeFeedChangesPerLsnHistory = EvictingQueue.create[Double](maxHistory)
38-
private var currentChangesPerLsnOpt: Option[Double] = None
41+
private val changeFeedItemsPerLsnHistory = EvictingQueue.create[Double](maxHistory)
42+
private var currentChangeFeedItemsPerLsnOpt: Option[Double] = None
3943

4044
/**
41-
* Track the normalized change feed changes per lsn
45+
* Track the normalized change feed items per lsn
4246
*
43-
* @param lsnRange the lsn range of the fetched changes.
44-
* @param changesFetchedCnt the total fetched changes.
47+
* @param lsnRange the lsn range of the fetched items.
48+
* @param fetchedItemCnt the total fetched item cnt within a micro-batch for the feed range.
4549
*/
46-
def track(lsnRange: Long, changesFetchedCnt: Long): Unit = {
47-
val effectiveChangesFetchedCnt = Math.max(1, changesFetchedCnt)
50+
def track(lsnRange: Long, fetchedItemCnt: Long): Unit = {
51+
val effectiveChangesFetchedCnt = Math.max(1, fetchedItemCnt)
4852
val changesPerLsn = if (lsnRange == 0) effectiveChangesFetchedCnt.toDouble else effectiveChangesFetchedCnt.toDouble / lsnRange
4953
synchronized {
50-
changeFeedChangesPerLsnHistory.add(changesPerLsn)
51-
calculateWeightedChangesPerLsn()
54+
changeFeedItemsPerLsnHistory.add(changesPerLsn)
55+
calculateWeightedItemsPerLsn()
5256
}
5357
}
5458

5559
/**
56-
* Calculates weighted normalized changes per lsn where recent values have more impact
60+
* Calculates weighted normalized items per lsn where recent values have more impact
5761
* Uses exponential weighting: weight = decayFactor^(n-i-1) where:
5862
* n = number of measurements
5963
* i = index of measurement (0 being oldest)
6064
* @return Weighted changes per lsn
6165
*/
62-
private def calculateWeightedChangesPerLsn(): Unit = {
63-
if (!changeFeedChangesPerLsnHistory.isEmpty) {
64-
val gaps = changeFeedChangesPerLsnHistory.asScala.toArray
65-
val n = gaps.length
66+
private def calculateWeightedItemsPerLsn(): Unit = {
67+
if (!changeFeedItemsPerLsnHistory.isEmpty) {
68+
val histories = changeFeedItemsPerLsnHistory.asScala.toArray
69+
val n = histories.length
6670
var weightedSum = 0.0
6771
var weightSum = 0.0
6872

69-
for (i <- gaps.indices) {
73+
for (i <- histories.indices) {
7074
val weight = math.pow(decayFactor, n - i - 1)
71-
weightedSum += gaps(i) * weight
75+
weightedSum += histories(i) * weight
7276
weightSum += weight
7377
}
7478

75-
currentChangesPerLsnOpt = Some(weightedSum / weightSum)
79+
currentChangeFeedItemsPerLsnOpt = Some(weightedSum / weightSum)
7680
}
7781
}
7882

7983
/**
80-
* Gets current weighted changes per lsn
81-
* @return weighted changes per lsn
84+
* Gets current weighted items per lsn
85+
* @return weighted items per lsn
8286
*/
83-
def getWeightedChangesPerLsn: Option[Double] = {
84-
logDebug(s"getWeightedChangesPerLsn for feedRangeIndex [$partitionIndex], feedRange [$feedRange] value [$currentChangesPerLsnOpt]")
85-
this.currentChangesPerLsnOpt
87+
def getWeightedChangeFeedItemsPerLsn: Option[Double] = {
88+
logDebug(s"getWeightedChangeFeedItemsPerLsn for feedRangeIndex [$partitionIndex], " +
89+
s"feedRange [$feedRange] value [$currentChangeFeedItemsPerLsnOpt]")
90+
this.currentChangeFeedItemsPerLsnOpt
8691
}
8792
}

0 commit comments

Comments
 (0)