Skip to content

Commit 6a75d9f

Browse files
author
annie-mac
committed
improve endLSN calculation
1 parent 77f5e05 commit 6a75d9f

21 files changed

+484
-96
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5+
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
56
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
7+
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
68
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
79
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
810
import org.apache.spark.broadcast.Broadcast
@@ -12,7 +14,12 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFacto
1214
import org.apache.spark.sql.types.StructType
1315

1416
import java.time.Duration
15-
import java.util.UUID
17+
import java.util.concurrent.ConcurrentHashMap
18+
import java.util.concurrent.atomic.AtomicLong
19+
20+
// scalastyle:off underscore.import
21+
import scala.collection.JavaConverters._
22+
// scalastyle:on underscore.import
1623

1724
// scala style rule flaky - even complaining on partial log messages
1825
// scalastyle:off multiple.string.literals
@@ -57,6 +64,13 @@ private class ChangeFeedMicroBatchStream
5764

5865
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
5966

67+
private val partitionIndex = new AtomicLong(0)
68+
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
69+
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
70+
71+
// Register metrics listener
72+
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
73+
6074
override def latestOffset(): Offset = {
6175
// For Spark data streams implementing SupportsAdmissionControl trait
6276
// latestOffset(Offset, ReadLimit) is called instead
@@ -99,11 +113,15 @@ private class ChangeFeedMicroBatchStream
99113
end
100114
.inputPartitions
101115
.get
102-
.map(partition => partition
103-
.withContinuationState(
104-
SparkBridgeImplementationInternal
105-
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
106-
clearEndLsn = false))
116+
.map(partition => {
117+
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet())
118+
partition
119+
.withContinuationState(
120+
SparkBridgeImplementationInternal
121+
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
122+
clearEndLsn = false)
123+
.withIndex(index)
124+
})
107125
}
108126

109127
/**
@@ -150,7 +168,8 @@ private class ChangeFeedMicroBatchStream
150168
this.containerConfig,
151169
this.partitioningConfig,
152170
this.defaultParallelism,
153-
this.container
171+
this.container,
172+
Some(this.partitionMetricsMap)
154173
)
155174

156175
if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull
66
import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull
77
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
88
import org.apache.spark.TaskContext
9+
import org.apache.spark.executor.TaskMetrics
910
import org.apache.spark.sql.execution.metric.SQLMetric
1011
import org.apache.spark.util.AccumulatorV2
1112

@@ -41,20 +42,23 @@ object SparkInternalsBridge extends BasicLoggingTrait {
4142
private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed)
4243

4344
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]): Map[String, SQLMetric] = {
45+
Option.apply(TaskContext.get()) match {
46+
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx.taskMetrics())
47+
case None => Map.empty[String, SQLMetric]
48+
}
49+
}
50+
51+
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
4452

4553
if (!reflectionAccessAllowed.get) {
4654
Map.empty[String, SQLMetric]
4755
} else {
48-
Option.apply(TaskContext.get()) match {
49-
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx)
50-
case None => Map.empty[String, SQLMetric]
51-
}
56+
getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
5257
}
5358
}
5459

55-
private def getAccumulators(taskCtx: TaskContext): Option[ArrayBuffer[AccumulatorV2[_, _]]] = {
60+
private def getAccumulators(taskMetrics: TaskMetrics): Option[ArrayBuffer[AccumulatorV2[_, _]]] = {
5661
try {
57-
val taskMetrics: Object = taskCtx.taskMetrics()
5862
val method = Option(accumulatorsMethod.get) match {
5963
case Some(existing) => existing
6064
case None =>
@@ -79,8 +83,8 @@ object SparkInternalsBridge extends BasicLoggingTrait {
7983

8084
private def getInternalCustomTaskMetricsAsSQLMetricInternal(
8185
knownCosmosMetricNames: Set[String],
82-
taskCtx: TaskContext): Map[String, SQLMetric] = {
83-
getAccumulators(taskCtx) match {
86+
taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
87+
getAccumulators(taskMetrics) match {
8488
case Some(accumulators) => accumulators
8589
.filter(accumulable => accumulable.isInstanceOf[SQLMetric]
8690
&& accumulable.name.isDefined

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5+
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
56
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
7+
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
68
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
79
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
810
import org.apache.spark.broadcast.Broadcast
@@ -12,7 +14,12 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFacto
1214
import org.apache.spark.sql.types.StructType
1315

1416
import java.time.Duration
15-
import java.util.UUID
17+
import java.util.concurrent.ConcurrentHashMap
18+
import java.util.concurrent.atomic.AtomicLong
19+
20+
// scalastyle:off underscore.import
21+
import scala.collection.JavaConverters._
22+
// scalastyle:on underscore.import
1623

1724
// scala style rule flaky - even complaining on partial log messages
1825
// scalastyle:off multiple.string.literals
@@ -59,6 +66,12 @@ private class ChangeFeedMicroBatchStream
5966

6067
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
6168

69+
private val partitionIndex = new AtomicLong(0)
70+
private val partitionIndexMap = Maps.synchronizedBiMap(new HashBiMap[NormalizedRange, Long]())
71+
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
72+
73+
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
74+
6275
override def latestOffset(): Offset = {
6376
// For Spark data streams implementing SupportsAdmissionControl trait
6477
// latestOffset(Offset, ReadLimit) is called instead
@@ -101,11 +114,15 @@ private class ChangeFeedMicroBatchStream
101114
end
102115
.inputPartitions
103116
.get
104-
.map(partition => partition
105-
.withContinuationState(
106-
SparkBridgeImplementationInternal
117+
.map(partition => {
118+
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet())
119+
partition
120+
.withContinuationState(
121+
SparkBridgeImplementationInternal
107122
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
108-
clearEndLsn = false))
123+
clearEndLsn = false)
124+
.withIndex(index)
125+
})
109126
}
110127

111128
/**

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull
66
import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull
77
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
88
import org.apache.spark.TaskContext
9+
import org.apache.spark.executor.TaskMetrics
910
import org.apache.spark.sql.execution.metric.SQLMetric
1011
import org.apache.spark.util.AccumulatorV2
1112

1213
import java.lang.reflect.Method
13-
import java.util.Locale
14+
import java.util.{Locale, Optional}
1415
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
1516

1617
object SparkInternalsBridge extends BasicLoggingTrait {
@@ -40,20 +41,23 @@ object SparkInternalsBridge extends BasicLoggingTrait {
4041
private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed)
4142

4243
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = {
44+
Option.apply(TaskContext.get()) match {
45+
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames, taskCtx.taskMetrics())
46+
case None => Map.empty[String, SQLMetric]
47+
}
48+
}
49+
50+
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
4351

4452
if (!reflectionAccessAllowed.get) {
4553
Map.empty[String, SQLMetric]
4654
} else {
47-
Option.apply(TaskContext.get()) match {
48-
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx)
49-
case None => Map.empty[String, SQLMetric]
50-
}
55+
getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
5156
}
5257
}
5358

54-
private def getAccumulators(taskCtx: TaskContext): Option[Seq[AccumulatorV2[_, _]]] = {
59+
private def getAccumulators(taskMetrics: TaskMetrics): Option[Seq[AccumulatorV2[_, _]]] = {
5560
try {
56-
val taskMetrics: Object = taskCtx.taskMetrics()
5761
val method = Option(accumulatorsMethod.get) match {
5862
case Some(existing) => existing
5963
case None =>
@@ -78,8 +82,8 @@ object SparkInternalsBridge extends BasicLoggingTrait {
7882

7983
private def getInternalCustomTaskMetricsAsSQLMetricInternal(
8084
knownCosmosMetricNames: Set[String],
81-
taskCtx: TaskContext): Map[String, SQLMetric] = {
82-
getAccumulators(taskCtx) match {
85+
taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
86+
getAccumulators(taskMetrics) match {
8387
case Some(accumulators) => accumulators
8488
.filter(accumulable => accumulable.isInstanceOf[SQLMetric]
8589
&& accumulable.name.isDefined
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.changeFeedMetrics
5+
6+
import com.azure.cosmos.spark.CosmosConstants.MetricNames
7+
import org.apache.spark.sql.connector.metric.CustomSumMetric
8+
9+
/***
10+
* This metric is used to track the partition total fetched changes within a change feed micro-batch
11+
* Note: not all the fetch changes will be returned back to spark
12+
*/
13+
private[cosmos] class ChangeFeedFetchedChangesCntMetric extends CustomSumMetric {
14+
15+
override def name(): String = MetricNames.ChangeFeedFetchedChangesCnt
16+
17+
override def description(): String = MetricNames.ChangeFeedFetchedChangesCnt
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.changeFeedMetrics
5+
6+
import com.azure.cosmos.spark.CosmosConstants.MetricNames
7+
import org.apache.spark.sql.connector.metric.CustomSumMetric
8+
9+
/***
10+
* This metric is used to track the lsn gap/range for partition within a change feed micro-batch
11+
*/
12+
private[cosmos] class ChangeFeedLsnGapMetric extends CustomSumMetric {
13+
14+
override def name(): String = MetricNames.ChangeFeedLsnGap
15+
16+
override def description(): String = MetricNames.ChangeFeedLsnGap
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.changeFeedMetrics
5+
6+
import com.azure.cosmos.implementation.guava25.collect.BiMap
7+
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
8+
import com.azure.cosmos.spark.{CosmosConstants, NormalizedRange, SparkInternalsBridge}
9+
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
10+
11+
import java.util.concurrent.ConcurrentHashMap
12+
13+
private[cosmos] class ChangeFeedMetricsListener(
14+
partitionIndexMap: BiMap[NormalizedRange, Long],
15+
partitionMetricsMap: ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]) extends SparkListener with BasicLoggingTrait{
16+
17+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
18+
try {
19+
val metrics = SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
20+
Set(
21+
CosmosConstants.MetricNames.ChangeFeedLsnGap,
22+
CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt,
23+
CosmosConstants.MetricNames.ChangeFeedPartitionIndex
24+
),
25+
taskEnd.taskMetrics
26+
)
27+
28+
if (metrics.contains(CosmosConstants.MetricNames.ChangeFeedPartitionIndex)) {
29+
val index = metrics(CosmosConstants.MetricNames.ChangeFeedPartitionIndex).value
30+
31+
val normalizedRange = partitionIndexMap.inverse().get(index)
32+
if (normalizedRange != null) {
33+
partitionMetricsMap.putIfAbsent(normalizedRange, new ChangeFeedMetricsTracker(index, normalizedRange))
34+
val metricsTracker = partitionMetricsMap.get(normalizedRange)
35+
metricsTracker.track(
36+
metrics(CosmosConstants.MetricNames.ChangeFeedFetchedChangesCnt).value,
37+
metrics(CosmosConstants.MetricNames.ChangeFeedLsnGap).value
38+
)
39+
}
40+
}
41+
} catch {
42+
// using metrics to tune the change feed micro batch is optimization
43+
// suppress any exceptions captured
44+
case e: Throwable =>
45+
logWarning("Tracking changeFeed metrics failed", e)
46+
}
47+
}
48+
}

0 commit comments

Comments
 (0)