Skip to content

Commit 66ecb91

Browse files
author
annie-mac
committed
change
1 parent 6a75d9f commit 66ecb91

File tree

3 files changed

+33
-15
lines changed

3 files changed

+33
-15
lines changed

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5+
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
56
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
7+
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
68
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
79
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
810
import org.apache.spark.broadcast.Broadcast
@@ -13,6 +15,12 @@ import org.apache.spark.sql.types.StructType
1315

1416
import java.time.Duration
1517
import java.util.UUID
18+
import java.util.concurrent.ConcurrentHashMap
19+
import java.util.concurrent.atomic.AtomicLong
20+
21+
// scalastyle:off underscore.import
22+
import scala.collection.JavaConverters._
23+
// scalastyle:on underscore.import
1624

1725
// scala style rule flaky - even complaining on partial log messages
1826
// scalastyle:off multiple.string.literals
@@ -59,6 +67,12 @@ private class ChangeFeedMicroBatchStream
5967

6068
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
6169

70+
private val partitionIndex = new AtomicLong(0)
71+
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
72+
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
73+
74+
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
75+
6276
override def latestOffset(): Offset = {
6377
// For Spark data streams implementing SupportsAdmissionControl trait
6478
// latestOffset(Offset, ReadLimit) is called instead
@@ -101,11 +115,15 @@ private class ChangeFeedMicroBatchStream
101115
end
102116
.inputPartitions
103117
.get
104-
.map(partition => partition
105-
.withContinuationState(
106-
SparkBridgeImplementationInternal
118+
.map(partition => {
119+
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet())
120+
partition
121+
.withContinuationState(
122+
SparkBridgeImplementationInternal
107123
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
108-
clearEndLsn = false))
124+
clearEndLsn = false)
125+
.withIndex(index)
126+
})
109127
}
110128

111129
/**

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -550,37 +550,37 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
550550
metadata: PartitionMetadata,
551551
isChangeFeed: Boolean,
552552
partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None): Long = {
553-
var normalizedChangesPerLsnOpt: Option[Double] = None
553+
var changesPerLsnFromMetricsOpt: Option[Double] = None
554554
if (isChangeFeed) {
555555
partitionMetricsMap match {
556556
case Some(metricsMap) =>
557557
if (metricsMap.containsKey(metadata.feedRange)) {
558-
normalizedChangesPerLsnOpt = metricsMap.get(metadata.feedRange).getWeightedAvgChangesPerLsn
558+
changesPerLsnFromMetricsOpt = metricsMap.get(metadata.feedRange).getWeightedAvgChangesPerLsn
559559
}
560560
case None =>
561561
}
562562
}
563563

564-
metadata.getWeightedLsnGap(normalizedChangesPerLsnOpt)
564+
metadata.getWeightedLsnGap(changesPerLsnFromMetricsOpt)
565565
}
566566

567567
private[spark] def getAvgChangesPerLsn(
568568
metadata: PartitionMetadata,
569569
isChangeFeed: Boolean,
570570
partitionMetricsMap: Option[ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]] = None): Double = {
571-
var normalizedChangesPerLsnOpt: Option[Double] = None
571+
var changesPerLsnFromMetricsOpt: Option[Double] = None
572572
if (isChangeFeed) {
573573
partitionMetricsMap match {
574574
case Some(matricsMap) =>
575575
if (matricsMap.containsKey(metadata.feedRange)) {
576-
normalizedChangesPerLsnOpt = partitionMetricsMap.get.get(metadata.feedRange).getWeightedAvgChangesPerLsn
576+
changesPerLsnFromMetricsOpt = partitionMetricsMap.get.get(metadata.feedRange).getWeightedAvgChangesPerLsn
577577
}
578578
case None =>
579579
}
580580
}
581581

582-
if (normalizedChangesPerLsnOpt.isDefined) {
583-
normalizedChangesPerLsnOpt.get
582+
if (changesPerLsnFromMetricsOpt.isDefined) {
583+
changesPerLsnFromMetricsOpt.get
584584
} else {
585585
metadata.getAvgChangesPerLsn
586586
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PartitionMetadata.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,18 +114,18 @@ private[cosmos] case class PartitionMetadata
114114
)
115115
}
116116

117-
def getWeightedLsnGap(normalizedChangesPerLsn: Option[Double] = None): Long = {
117+
def getWeightedLsnGap(changesPerLsnFromMetrics: Option[Double] = None): Long = {
118118
val progressFactor = math.max(this.getAndValidateLatestLsn - this.startLsn, 0)
119119
if (progressFactor == 0) {
120120
0
121121
} else {
122-
val effectiveNormalizedChangesPerLsn =
123-
normalizedChangesPerLsn match {
122+
val effectiveChangesPerLsn =
123+
changesPerLsnFromMetrics match {
124124
case Some(changesPerLsn) => changesPerLsn
125125
case None => this.getAvgChangesPerLsn
126126
}
127127

128-
val weightedGap: Double = progressFactor * effectiveNormalizedChangesPerLsn
128+
val weightedGap: Double = progressFactor * effectiveChangesPerLsn
129129
// Any double less than 1 gets rounded to 0 when toLong is invoked
130130
weightedGap.toLong.max(1)
131131
}

0 commit comments

Comments
 (0)