Skip to content

Commit 1056d8f

Browse files
author
annie-mac
committed
change
1 parent 30f5247 commit 1056d8f

File tree

10 files changed

+544
-9
lines changed

10 files changed

+544
-9
lines changed

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ import org.apache.spark.util.AccumulatorV2
1313
import java.lang.reflect.Method
1414
import java.util.Locale
1515
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
16+
class SparkInternalsBridge {
17+
// Only used in ChangeFeedMetricsListener, which is easier for test validation
18+
def getInternalCustomTaskMetricsAsSQLMetric(
19+
knownCosmosMetricNames: Set[String],
20+
taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
21+
SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
22+
}
23+
}
1624

1725
object SparkInternalsBridge extends BasicLoggingTrait {
1826
private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED"

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListener.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ private[cosmos] class ChangeFeedMetricsListener(
1515
partitionIndexMap: BiMap[NormalizedRange, Long],
1616
partitionMetricsMap: ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]) extends SparkListener with BasicLoggingTrait{
1717

18+
private val sparkInternalsBridge = new SparkInternalsBridge()
1819
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
1920
try {
20-
val metrics = SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
21+
val metrics = sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
2122
Set(
2223
CosmosConstants.MetricNames.ChangeFeedLsnRange,
2324
CosmosConstants.MetricNames.ChangeFeedItemsCnt,
@@ -33,17 +34,17 @@ private[cosmos] class ChangeFeedMetricsListener(
3334
if (normalizedRange != null) {
3435
partitionMetricsMap.putIfAbsent(normalizedRange, ChangeFeedMetricsTracker(index, normalizedRange))
3536
val metricsTracker = partitionMetricsMap.get(normalizedRange)
36-
val fetchedItemCnt = getFetchedItemCnt(metrics)
37+
val changeFeedItemsCnt = getFetchedItemCnt(metrics)
3738
val lsnRange = getLsnRange(metrics)
3839

39-
if (fetchedItemCnt >= 0 && lsnRange >= 0) {
40+
if (changeFeedItemsCnt >= 0 && lsnRange >= 0) {
4041
metricsTracker.track(
4142
metrics(CosmosConstants.MetricNames.ChangeFeedLsnRange).value,
4243
metrics(CosmosConstants.MetricNames.ChangeFeedItemsCnt).value
4344
)
4445
}
4546

46-
logInfo(s"onTaskEnd for partition index $index, fetchedItemCnt $fetchedItemCnt, lsnRange $lsnRange")
47+
logInfo(s"onTaskEnd for partition index $index, changeFeedItemsCnt $changeFeedItemsCnt, lsnRange $lsnRange")
4748
}
4849
}
4950
} catch {

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ private[cosmos] object ChangeFeedMetricsTracker {
2020
feedRange: NormalizedRange,
2121
metricsHistory: Int,
2222
metricsHistoryDecayFactor: Double): ChangeFeedMetricsTracker = {
23-
new ChangeFeedMetricsTracker(partitionIndex, feedRange, MetricsHistory, MetricsHistoryDecayFactor)
23+
new ChangeFeedMetricsTracker(partitionIndex, feedRange, metricsHistory, metricsHistoryDecayFactor)
2424
}
2525

2626
def apply(partitionIndex: Long, feedRange: NormalizedRange): ChangeFeedMetricsTracker = {

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,7 @@ private case class ChangeFeedPartitionReader
271271
}
272272

273273
override def get(): InternalRow = {
274-
val changeFeedSparkRowItem = this.iterator.next()
275-
cosmosRowConverter.fromRowToInternalRow(changeFeedSparkRowItem.row, rowSerializer)
274+
cosmosRowConverter.fromRowToInternalRow(this.iterator.next().row, rowSerializer)
276275
}
277276

278277
override def close(): Unit = {
@@ -297,9 +296,9 @@ private case class ChangeFeedPartitionReader
297296
Some(SparkBridgeImplementationInternal
298297
.extractContinuationTokensFromChangeFeedStateJson(continuationToken)
299298
.minBy(_._2)._2)
299+
case None =>
300300
// for change feed, we would only reach here before the first page got fetched
301301
// fallback to use the continuation token from the partition instead
302-
case None =>
303302
Some(SparkBridgeImplementationInternal
304303
.extractContinuationTokensFromChangeFeedStateJson(partition.continuationState.get)
305304
.minBy(_._2)._2)

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
583583
val avgItemsPerLsn = metadata.getAvgItemsPerLsn(isChangeFeed, partitionMetricsMap)
584584
val weightedLsnGap = metadata.getWeightedLsnGap(isChangeFeed, partitionMetricsMap)
585585

586-
val feedRangeWeightFactor = weightedLsnGap / totalWeightedLsnGap.get
586+
val feedRangeWeightFactor = weightedLsnGap.toDouble / totalWeightedLsnGap.get
587587

588588
val allowedRate =
589589
(feedRangeWeightFactor * maxRowsLimit.maxRows() / avgItemsPerLsn)

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsListenerSpec.scala

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.spark
5+
6+
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
7+
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
8+
import org.apache.spark.Success
9+
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
10+
import org.apache.spark.scheduler.{SparkListenerTaskEnd, TaskInfo}
11+
import org.apache.spark.sql.execution.metric.SQLMetric
12+
import org.mockito.ArgumentMatchers
13+
import org.mockito.Mockito.{mock, when}
14+
15+
import java.lang.reflect.Field
16+
import java.util.concurrent.ConcurrentHashMap
17+
18+
class ChangeFeedMetricsListenerSpec extends UnitSpec {
19+
"ChangeFeedMetricsListener" should "be able to capture changeFeed performance metrics" in {
20+
val taskEnd = SparkListenerTaskEnd(
21+
stageId = 1,
22+
stageAttemptId = 0,
23+
taskType = "ResultTask",
24+
reason = Success,
25+
taskInfo = mock(classOf[TaskInfo]),
26+
taskExecutorMetrics = mock(classOf[ExecutorMetrics]),
27+
taskMetrics = mock(classOf[TaskMetrics])
28+
)
29+
30+
val metrics = Map[String, SQLMetric](
31+
CosmosConstants.MetricNames.ChangeFeedPartitionIndex -> new SQLMetric("index", 1),
32+
CosmosConstants.MetricNames.ChangeFeedLsnRange -> new SQLMetric("lsn", 100),
33+
CosmosConstants.MetricNames.ChangeFeedItemsCnt -> new SQLMetric("items", 100)
34+
)
35+
36+
// create sparkInternalsBridge mock
37+
val sparkInternalsBridge = mock(classOf[SparkInternalsBridge])
38+
when(sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
39+
ArgumentMatchers.any[Set[String]],
40+
ArgumentMatchers.any[TaskMetrics]
41+
)).thenReturn(metrics)
42+
43+
val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
44+
partitionIndexMap.put(NormalizedRange("0", "FF"), 1)
45+
46+
val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
47+
val changeFeedMetricsListener = new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap)
48+
49+
// set the internal sparkInternalsBridgeField
50+
val sparkInternalsBridgeField: Field = classOf[ChangeFeedMetricsListener].getDeclaredField("sparkInternalsBridge")
51+
sparkInternalsBridgeField.setAccessible(true)
52+
sparkInternalsBridgeField.set(changeFeedMetricsListener, sparkInternalsBridge)
53+
54+
// verify that metrics will be properly tracked
55+
changeFeedMetricsListener.onTaskEnd(taskEnd)
56+
partitionMetricsMap.size() shouldBe 1
57+
partitionMetricsMap.containsKey(NormalizedRange("0", "FF")) shouldBe true
58+
partitionMetricsMap.get(NormalizedRange("0", "FF")).getWeightedChangeFeedItemsPerLsn.get shouldBe 1
59+
}
60+
61+
it should "ignore metrics for unknown partition index" in {
62+
val taskEnd = SparkListenerTaskEnd(
63+
stageId = 1,
64+
stageAttemptId = 0,
65+
taskType = "ResultTask",
66+
reason = Success,
67+
taskInfo = mock(classOf[TaskInfo]),
68+
taskExecutorMetrics = mock(classOf[ExecutorMetrics]),
69+
taskMetrics = mock(classOf[TaskMetrics])
70+
)
71+
72+
val metrics = Map[String, SQLMetric](
73+
CosmosConstants.MetricNames.ChangeFeedPartitionIndex -> new SQLMetric("index", 10),
74+
CosmosConstants.MetricNames.ChangeFeedLsnRange -> new SQLMetric("lsn", 100),
75+
CosmosConstants.MetricNames.ChangeFeedItemsCnt -> new SQLMetric("items", 100)
76+
)
77+
78+
// create sparkInternalsBridge mock
79+
val sparkInternalsBridge = mock(classOf[SparkInternalsBridge])
80+
when(sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
81+
ArgumentMatchers.any[Set[String]],
82+
ArgumentMatchers.any[TaskMetrics]
83+
)).thenReturn(metrics)
84+
85+
val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
86+
partitionIndexMap.put(NormalizedRange("0", "FF"), 1)
87+
88+
val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
89+
val changeFeedMetricsListener = new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap)
90+
91+
// set the internal sparkInternalsBridgeField
92+
val sparkInternalsBridgeField: Field = classOf[ChangeFeedMetricsListener].getDeclaredField("sparkInternalsBridge")
93+
sparkInternalsBridgeField.setAccessible(true)
94+
sparkInternalsBridgeField.set(changeFeedMetricsListener, sparkInternalsBridge)
95+
96+
// because partition index 10 does not exist in the partitionIndexMap, it will be ignored
97+
changeFeedMetricsListener.onTaskEnd(taskEnd)
98+
partitionMetricsMap shouldBe empty
99+
}
100+
101+
it should "ignore unrelated metrics" in {
102+
val taskEnd = SparkListenerTaskEnd(
103+
stageId = 1,
104+
stageAttemptId = 0,
105+
taskType = "ResultTask",
106+
reason = Success,
107+
taskInfo = mock(classOf[TaskInfo]),
108+
taskExecutorMetrics = mock(classOf[ExecutorMetrics]),
109+
taskMetrics = mock(classOf[TaskMetrics])
110+
)
111+
112+
val metrics = Map[String, SQLMetric](
113+
"unknownMetrics" -> new SQLMetric("index", 10)
114+
)
115+
116+
// create sparkInternalsBridge mock
117+
val sparkInternalsBridge = mock(classOf[SparkInternalsBridge])
118+
when(sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
119+
ArgumentMatchers.any[Set[String]],
120+
ArgumentMatchers.any[TaskMetrics]
121+
)).thenReturn(metrics)
122+
123+
val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
124+
partitionIndexMap.put(NormalizedRange("0", "FF"), 1)
125+
126+
val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
127+
val changeFeedMetricsListener = new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap)
128+
129+
// set the internal sparkInternalsBridgeField
130+
val sparkInternalsBridgeField: Field = classOf[ChangeFeedMetricsListener].getDeclaredField("sparkInternalsBridge")
131+
sparkInternalsBridgeField.setAccessible(true)
132+
sparkInternalsBridgeField.set(changeFeedMetricsListener, sparkInternalsBridge)
133+
134+
// because partition index 10 does not exist in the partitionIndexMap, it will be ignored
135+
changeFeedMetricsListener.onTaskEnd(taskEnd)
136+
partitionMetricsMap shouldBe empty
137+
}
138+
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedPartitionReaderITest.scala

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,105 @@ class ChangeFeedPartitionReaderITest
227227

228228
}
229229

230+
"change feed partition reader" should "report custom metrics" in {
231+
val cosmosEndpoint = TestConfigurations.HOST
232+
val cosmosMasterKey = TestConfigurations.MASTER_KEY
233+
val testId = UUID.randomUUID().toString
234+
val sourceContainerResponse = cosmosClient.getDatabase(cosmosDatabase).createContainer(
235+
"source_" + testId,
236+
"/sequenceNumber",
237+
ThroughputProperties.createManualThroughput(400)).block()
238+
val sourceContainer = cosmosClient.getDatabase(cosmosDatabase).getContainer(sourceContainerResponse.getProperties.getId)
239+
val rid = sourceContainerResponse.getProperties.getResourceId
240+
val continuationState = s"""{
241+
"V": 1,
242+
"Rid": "$rid",
243+
"Mode": "INCREMENTAL",
244+
"StartFrom": {
245+
"Type": "BEGINNING"
246+
},
247+
"Continuation": {
248+
"V": 1,
249+
"Rid": "$rid",
250+
"Continuation": [
251+
{
252+
"token": "1",
253+
"range": {
254+
"min": "",
255+
"max": "FF"
256+
}
257+
}
258+
],
259+
"Range": {
260+
"min": "",
261+
"max": "FF"
262+
}
263+
}
264+
}"""
265+
val encoder = Base64.getEncoder
266+
val encodedBytes = encoder.encode(continuationState.getBytes("UTF-8"))
267+
val continuationStateEncoded = new String(encodedBytes, "UTF-8")
268+
269+
val changeFeedCfg = Map(
270+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
271+
"spark.cosmos.accountKey" -> cosmosMasterKey,
272+
"spark.cosmos.database" -> cosmosDatabase,
273+
"spark.cosmos.container" -> sourceContainer.getId(),
274+
"spark.cosmos.read.inferSchema.enabled" -> "false",
275+
)
276+
var inputtedDocuments = 10
277+
var lsn1 = 0L
278+
for (_ <- 0 until inputtedDocuments) {
279+
lsn1 = ingestTestDocuments(sourceContainer, 1)
280+
}
281+
282+
val structs = Array(
283+
StructField("_rawBody", StringType, false),
284+
StructField("_etag", StringType, false),
285+
StructField("_ts", StringType, false),
286+
StructField("id", StringType, false),
287+
StructField("_lsn", StringType, false)
288+
)
289+
val schema = new StructType(structs)
290+
291+
val diagnosticContext = DiagnosticsContext(UUID.randomUUID(), "")
292+
val cosmosClientStateHandles = initializeAndBroadcastCosmosClientStatesForContainer(changeFeedCfg)
293+
val diagnosticsConfig = DiagnosticsConfig()
294+
val cosmosInputPartition =
295+
new CosmosInputPartition(
296+
NormalizedRange("", "FF"),
297+
Some(lsn1 + 1),
298+
Some(continuationStateEncoded),
299+
index = Some(2))
300+
val changeFeedPartitionReader = new ChangeFeedPartitionReader(
301+
cosmosInputPartition,
302+
changeFeedCfg,
303+
schema,
304+
diagnosticContext,
305+
cosmosClientStateHandles,
306+
diagnosticsConfig,
307+
""
308+
)
309+
var count = 0
310+
implicit val ec: ExecutionContext = ExecutionContext.global
311+
Future {
312+
while (changeFeedPartitionReader.next()) {
313+
changeFeedPartitionReader.get()
314+
count += 1
315+
}
316+
}
317+
sleep(2000)
318+
319+
val currentMetrics = changeFeedPartitionReader.currentMetricsValues()
320+
currentMetrics.length shouldBe 3
321+
currentMetrics(0).name() shouldBe CosmosConstants.MetricNames.ChangeFeedLsnRange
322+
currentMetrics(0).value() shouldBe(lsn1 - 1)
323+
currentMetrics(1).name() shouldBe CosmosConstants.MetricNames.ChangeFeedItemsCnt
324+
currentMetrics(1).value() shouldBe 10
325+
currentMetrics(2).name() shouldBe CosmosConstants.MetricNames.ChangeFeedPartitionIndex
326+
currentMetrics(2).value() shouldBe 2
327+
}
328+
230329
private[this] def initializeAndBroadcastCosmosClientStatesForContainer(config: Map[String, String])
231330
: Broadcast[CosmosClientMetadataCachesSnapshots] = {
232331
val userConfig = CosmosConfig.getEffectiveConfig(None, None, config)

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,15 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
886886
config.maxItemCountPerTrigger.get shouldEqual 54
887887
}
888888

889+
it should "parse change feed config with performance monitoring config" in {
890+
val changeFeedConfig = Map(
891+
"spark.cosmos.changeFeed.performance.monitoring.enabled" -> "false"
892+
)
893+
894+
val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig)
895+
config.performanceMonitoringEnabled shouldBe false
896+
}
897+
889898
it should "complain when parsing invalid change feed mode" in {
890899
val changeFeedConfig = Map(
891900
"spark.cosmos.changeFeed.mode" -> "Whatever",

0 commit comments

Comments
 (0)