Skip to content

Commit 4eabe02

Browse files
xinlian12annie-macFabianMeiswinkel
authored
SparkChangeFeedEndLSNImprovement (#46320)
* improve endLSN calculation * change * update * fix compiling * fix * fix * change * add tests * changes * update changelog * change * update config name * Rename weighted average method to weighted changes * refactor * change * change --------- Co-authored-by: annie-mac <[email protected]> Co-authored-by: Fabian Meiswinkel <[email protected]>
1 parent 7d856b2 commit 4eabe02

33 files changed

+1342
-120
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451)
1111

1212
#### Other Changes
13+
* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)
1314

1415
### 4.38.0 (2025-07-31)
1516

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5+
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
56
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
7+
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
68
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
79
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
810
import org.apache.spark.broadcast.Broadcast
@@ -12,7 +14,12 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFacto
1214
import org.apache.spark.sql.types.StructType
1315

1416
import java.time.Duration
15-
import java.util.UUID
17+
import java.util.concurrent.ConcurrentHashMap
18+
import java.util.concurrent.atomic.AtomicLong
19+
20+
// scalastyle:off underscore.import
21+
import scala.collection.JavaConverters._
22+
// scalastyle:on underscore.import
1623

1724
// scala style rule flaky - even complaining on partial log messages
1825
// scalastyle:off multiple.string.literals
@@ -57,6 +64,18 @@ private class ChangeFeedMicroBatchStream
5764

5865
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
5966

67+
private val partitionIndex = new AtomicLong(0)
68+
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
69+
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
70+
71+
// Register metrics listener
72+
if (changeFeedConfig.performanceMonitoringEnabled) {
73+
log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener")
74+
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
75+
} else {
76+
log.logInfo("ChangeFeed performance monitoring is disabled")
77+
}
78+
6079
override def latestOffset(): Offset = {
6180
// For Spark data streams implementing SupportsAdmissionControl trait
6281
// latestOffset(Offset, ReadLimit) is called instead
@@ -99,11 +118,15 @@ private class ChangeFeedMicroBatchStream
99118
end
100119
.inputPartitions
101120
.get
102-
.map(partition => partition
103-
.withContinuationState(
104-
SparkBridgeImplementationInternal
105-
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
106-
clearEndLsn = false))
121+
.map(partition => {
122+
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet())
123+
partition
124+
.withContinuationState(
125+
SparkBridgeImplementationInternal
126+
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
127+
clearEndLsn = false)
128+
.withIndex(index)
129+
})
107130
}
108131

109132
/**
@@ -150,7 +173,8 @@ private class ChangeFeedMicroBatchStream
150173
this.containerConfig,
151174
this.partitioningConfig,
152175
this.defaultParallelism,
153-
this.container
176+
this.container,
177+
Some(this.partitionMetricsMap)
154178
)
155179

156180
if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull
66
import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull
77
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
88
import org.apache.spark.TaskContext
9+
import org.apache.spark.executor.TaskMetrics
910
import org.apache.spark.sql.execution.metric.SQLMetric
1011
import org.apache.spark.util.AccumulatorV2
1112

@@ -14,6 +15,15 @@ import java.util.Locale
1415
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
1516
import scala.collection.mutable.ArrayBuffer
1617

18+
class SparkInternalsBridge {
19+
// Only used in ChangeFeedMetricsListener, which is easier for test validation
20+
def getInternalCustomTaskMetricsAsSQLMetric(
21+
knownCosmosMetricNames: Set[String],
22+
taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
23+
SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
24+
}
25+
}
26+
1727
object SparkInternalsBridge extends BasicLoggingTrait {
1828
private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED"
1929
private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED"
@@ -41,20 +51,23 @@ object SparkInternalsBridge extends BasicLoggingTrait {
4151
private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed)
4252

4353
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]): Map[String, SQLMetric] = {
54+
Option.apply(TaskContext.get()) match {
55+
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx.taskMetrics())
56+
case None => Map.empty[String, SQLMetric]
57+
}
58+
}
59+
60+
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
4461

4562
if (!reflectionAccessAllowed.get) {
4663
Map.empty[String, SQLMetric]
4764
} else {
48-
Option.apply(TaskContext.get()) match {
49-
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx)
50-
case None => Map.empty[String, SQLMetric]
51-
}
65+
getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
5266
}
5367
}
5468

55-
private def getAccumulators(taskCtx: TaskContext): Option[ArrayBuffer[AccumulatorV2[_, _]]] = {
69+
private def getAccumulators(taskMetrics: TaskMetrics): Option[ArrayBuffer[AccumulatorV2[_, _]]] = {
5670
try {
57-
val taskMetrics: Object = taskCtx.taskMetrics()
5871
val method = Option(accumulatorsMethod.get) match {
5972
case Some(existing) => existing
6073
case None =>
@@ -79,8 +92,8 @@ object SparkInternalsBridge extends BasicLoggingTrait {
7992

8093
private def getInternalCustomTaskMetricsAsSQLMetricInternal(
8194
knownCosmosMetricNames: Set[String],
82-
taskCtx: TaskContext): Map[String, SQLMetric] = {
83-
getAccumulators(taskCtx) match {
95+
taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
96+
getAccumulators(taskMetrics) match {
8497
case Some(accumulators) => accumulators
8598
.filter(accumulable => accumulable.isInstanceOf[SQLMetric]
8699
&& accumulable.name.isDefined

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451)
1111

1212
#### Other Changes
13+
* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)
1314

1415
### 4.38.0 (2025-07-31)
1516

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5+
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
56
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
7+
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
68
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
79
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
810
import org.apache.spark.broadcast.Broadcast
@@ -12,7 +14,12 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFacto
1214
import org.apache.spark.sql.types.StructType
1315

1416
import java.time.Duration
15-
import java.util.UUID
17+
import java.util.concurrent.ConcurrentHashMap
18+
import java.util.concurrent.atomic.AtomicLong
19+
20+
// scalastyle:off underscore.import
21+
import scala.collection.JavaConverters._
22+
// scalastyle:on underscore.import
1623

1724
// scala style rule flaky - even complaining on partial log messages
1825
// scalastyle:off multiple.string.literals
@@ -59,6 +66,17 @@ private class ChangeFeedMicroBatchStream
5966

6067
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
6168

69+
private val partitionIndex = new AtomicLong(0)
70+
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
71+
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
72+
73+
if (changeFeedConfig.performanceMonitoringEnabled) {
74+
log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener")
75+
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
76+
} else {
77+
log.logInfo("ChangeFeed performance monitoring is disabled")
78+
}
79+
6280
override def latestOffset(): Offset = {
6381
// For Spark data streams implementing SupportsAdmissionControl trait
6482
// latestOffset(Offset, ReadLimit) is called instead
@@ -101,11 +119,15 @@ private class ChangeFeedMicroBatchStream
101119
end
102120
.inputPartitions
103121
.get
104-
.map(partition => partition
105-
.withContinuationState(
106-
SparkBridgeImplementationInternal
122+
.map(partition => {
123+
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet())
124+
partition
125+
.withContinuationState(
126+
SparkBridgeImplementationInternal
107127
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
108-
clearEndLsn = false))
128+
clearEndLsn = false)
129+
.withIndex(index)
130+
})
109131
}
110132

111133
/**
@@ -152,7 +174,8 @@ private class ChangeFeedMicroBatchStream
152174
this.containerConfig,
153175
this.partitioningConfig,
154176
this.defaultParallelism,
155-
this.container
177+
this.container,
178+
Some(this.partitionMetricsMap)
156179
)
157180

158181
if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,23 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull
66
import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull
77
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
88
import org.apache.spark.TaskContext
9+
import org.apache.spark.executor.TaskMetrics
910
import org.apache.spark.sql.execution.metric.SQLMetric
1011
import org.apache.spark.util.AccumulatorV2
1112

1213
import java.lang.reflect.Method
1314
import java.util.Locale
1415
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
1516

17+
class SparkInternalsBridge {
18+
// Only used in ChangeFeedMetricsListener, which is easier for test validation
19+
def getInternalCustomTaskMetricsAsSQLMetric(
20+
knownCosmosMetricNames: Set[String],
21+
taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
22+
SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
23+
}
24+
}
25+
1626
object SparkInternalsBridge extends BasicLoggingTrait {
1727
private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED"
1828
private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED"
@@ -40,20 +50,23 @@ object SparkInternalsBridge extends BasicLoggingTrait {
4050
private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed)
4151

4252
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = {
53+
Option.apply(TaskContext.get()) match {
54+
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames, taskCtx.taskMetrics())
55+
case None => Map.empty[String, SQLMetric]
56+
}
57+
}
58+
59+
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
4360

4461
if (!reflectionAccessAllowed.get) {
4562
Map.empty[String, SQLMetric]
4663
} else {
47-
Option.apply(TaskContext.get()) match {
48-
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx)
49-
case None => Map.empty[String, SQLMetric]
50-
}
64+
getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
5165
}
5266
}
5367

54-
private def getAccumulators(taskCtx: TaskContext): Option[Seq[AccumulatorV2[_, _]]] = {
68+
private def getAccumulators(taskMetrics: TaskMetrics): Option[Seq[AccumulatorV2[_, _]]] = {
5569
try {
56-
val taskMetrics: Object = taskCtx.taskMetrics()
5770
val method = Option(accumulatorsMethod.get) match {
5871
case Some(existing) => existing
5972
case None =>
@@ -78,8 +91,8 @@ object SparkInternalsBridge extends BasicLoggingTrait {
7891

7992
private def getInternalCustomTaskMetricsAsSQLMetricInternal(
8093
knownCosmosMetricNames: Set[String],
81-
taskCtx: TaskContext): Map[String, SQLMetric] = {
82-
getAccumulators(taskCtx) match {
94+
taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
95+
getAccumulators(taskMetrics) match {
8396
case Some(accumulators) => accumulators
8497
.filter(accumulable => accumulable.isInstanceOf[SQLMetric]
8598
&& accumulable.name.isDefined

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451)
1111

1212
#### Other Changes
13+
* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)
1314

1415
### 4.38.0 (2025-07-31)
1516

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5+
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
56
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
7+
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
68
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
79
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
810
import org.apache.spark.broadcast.Broadcast
@@ -13,6 +15,12 @@ import org.apache.spark.sql.types.StructType
1315

1416
import java.time.Duration
1517
import java.util.UUID
18+
import java.util.concurrent.ConcurrentHashMap
19+
import java.util.concurrent.atomic.AtomicLong
20+
21+
// scalastyle:off underscore.import
22+
import scala.collection.JavaConverters._
23+
// scalastyle:on underscore.import
1624

1725
// scala style rule flaky - even complaining on partial log messages
1826
// scalastyle:off multiple.string.literals
@@ -59,6 +67,17 @@ private class ChangeFeedMicroBatchStream
5967

6068
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
6169

70+
private val partitionIndex = new AtomicLong(0)
71+
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
72+
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
73+
74+
if (changeFeedConfig.performanceMonitoringEnabled) {
75+
log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener")
76+
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
77+
} else {
78+
log.logInfo("ChangeFeed performance monitoring is disabled")
79+
}
80+
6281
override def latestOffset(): Offset = {
6382
// For Spark data streams implementing SupportsAdmissionControl trait
6483
// latestOffset(Offset, ReadLimit) is called instead
@@ -101,11 +120,15 @@ private class ChangeFeedMicroBatchStream
101120
end
102121
.inputPartitions
103122
.get
104-
.map(partition => partition
105-
.withContinuationState(
106-
SparkBridgeImplementationInternal
123+
.map(partition => {
124+
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet())
125+
partition
126+
.withContinuationState(
127+
SparkBridgeImplementationInternal
107128
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
108-
clearEndLsn = false))
129+
clearEndLsn = false)
130+
.withIndex(index)
131+
})
109132
}
110133

111134
/**
@@ -152,7 +175,8 @@ private class ChangeFeedMicroBatchStream
152175
this.containerConfig,
153176
this.partitioningConfig,
154177
this.defaultParallelism,
155-
this.container
178+
this.container,
179+
Some(this.partitionMetricsMap)
156180
)
157181

158182
if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {

0 commit comments

Comments
 (0)