Skip to content

Commit d790eb1

Browse files
author
annie-mac
committed
fix compiling
1 parent 860ea64 commit d790eb1

File tree

6 files changed

+25
-29
lines changed

6 files changed

+25
-29
lines changed

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ private class ChangeFeedMicroBatchStream
6767
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
6868

6969
private val partitionIndex = new AtomicLong(0)
70-
private val partitionIndexMap = Maps.synchronizedBiMap(new HashBiMap[NormalizedRange, Long]())
70+
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
7171
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
7272

7373
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ object SparkInternalsBridge extends BasicLoggingTrait {
8080
}
8181
}
8282

83-
private def getInternalCustomTaskMetricsAsSQLMetricInternal(
83+
private def ConsistencygetInternalCustomTaskMetricsAsSQLMetricInternal(
8484
knownCosmosMetricNames: Set[String],
8585
taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
8686
getAccumulators(taskMetrics) match {

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull
66
import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull
77
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
88
import org.apache.spark.TaskContext
9+
import org.apache.spark.executor.TaskMetrics
910
import org.apache.spark.sql.execution.metric.SQLMetric
1011
import org.apache.spark.util.AccumulatorV2
1112

@@ -40,20 +41,23 @@ object SparkInternalsBridge extends BasicLoggingTrait {
4041
private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed)
4142

4243
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = {
44+
Option.apply(TaskContext.get()) match {
45+
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames, taskCtx.taskMetrics())
46+
case None => Map.empty[String, SQLMetric]
47+
}
48+
}
49+
50+
def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
4351

4452
if (!reflectionAccessAllowed.get) {
4553
Map.empty[String, SQLMetric]
4654
} else {
47-
Option.apply(TaskContext.get()) match {
48-
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx)
49-
case None => Map.empty[String, SQLMetric]
50-
}
55+
getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
5156
}
5257
}
5358

54-
private def getAccumulators(taskCtx: TaskContext): Option[Seq[AccumulatorV2[_, _]]] = {
59+
private def getAccumulators(taskMetrics: TaskMetrics): Option[Seq[AccumulatorV2[_, _]]] = {
5560
try {
56-
val taskMetrics: Object = taskCtx.taskMetrics()
5761
val method = Option(accumulatorsMethod.get) match {
5862
case Some(existing) => existing
5963
case None =>
@@ -78,8 +82,8 @@ object SparkInternalsBridge extends BasicLoggingTrait {
7882

7983
private def getInternalCustomTaskMetricsAsSQLMetricInternal(
8084
knownCosmosMetricNames: Set[String],
81-
taskCtx: TaskContext): Map[String, SQLMetric] = {
82-
getAccumulators(taskCtx) match {
85+
taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
86+
getAccumulators(taskMetrics) match {
8387
case Some(accumulators) => accumulators
8488
.filter(accumulable => accumulable.isInstanceOf[SQLMetric]
8589
&& accumulable.name.isDefined

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ private[cosmos] class ChangeFeedMetricsTracker(
3535
private val decayFactor: Double = ChangeFeedMetricsTracker.DefaultDecayFactor
3636
) {
3737
private val changeFeedChangesPerLsnHistory = EvictingQueue.create[Double](maxHistory)
38+
private var currentChangesPerLsnOpt: Option[Double] = None
3839

3940
/**
4041
* Track the normalized change feed changes per lsn
@@ -56,7 +57,7 @@ private[cosmos] class ChangeFeedMetricsTracker(
5657
* i = index of measurement (0 being oldest)
5758
* @return Weighted average of LSN gaps
5859
*/
59-
private def calculateWeightedChangesPerLsn(): Option[Double] = {
60+
private def calculateWeightedChangesPerLsn(): Unit = {
6061
synchronized {
6162
if (changeFeedChangesPerLsnHistory.isEmpty) {
6263
None
@@ -71,8 +72,8 @@ private[cosmos] class ChangeFeedMetricsTracker(
7172
weightedSum += gaps(i) * weight
7273
weightSum += weight
7374
}
74-
75-
Some(weightedSum / weightSum)
75+
76+
currentChangesPerLsnOpt = Some(weightedSum / weightSum)
7677
}
7778
}
7879
}
@@ -82,8 +83,6 @@ private[cosmos] class ChangeFeedMetricsTracker(
8283
* @return Current weighted LSN gap
8384
*/
8485
def getWeightedAvgChangesPerLsn: Option[Double] = {
85-
synchronized {
86-
calculateWeightedChangesPerLsn()
87-
}
86+
this.currentChangesPerLsnOpt
8887
}
8988
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PartitionMetadata.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,9 @@ private[cosmos] case class PartitionMetadata
132132
}
133133

134134
def getAvgChangesPerLsn: Double = {
135-
val effectiveFirstLsn = if (this.firstLsn.isEmpty) 0 else this.firstLsn.get
136-
if (this.documentCount == 0 || (this.getAndValidateLatestLsn - effectiveFirstLsn) <= 0) {
135+
if (this.firstLsn.isEmpty) {
136+
math.max(1d, this.documentCount.toDouble / this.getAndValidateLatestLsn)
137+
} else if (this.documentCount == 0 || (this.getAndValidateLatestLsn - this.firstLsn.get) <= 0) {
137138
1d
138139
} else {
139140
this.documentCount.toDouble / (this.getAndValidateLatestLsn- this.firstLsn.get)

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SampleStructuredStreamingE2EMain.scala

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ object SampleStructuredStreamingE2EMain {
1414
def main(args: Array[String]) {
1515
val cosmosEndpoint = TestConfigurations.HOST
1616
val cosmosMasterKey = TestConfigurations.MASTER_KEY
17-
val cosmosDatabase = "SampleDatabase"
18-
val cosmosContainer = "GreenTaxiRecords"
17+
val cosmosDatabase = "TestDatabase"
18+
val cosmosContainer = "TestContainerMulti"
1919

2020
// val client = new CosmosClientBuilder()
2121
// .endpoint(cosmosEndpoint)
@@ -53,15 +53,7 @@ object SampleStructuredStreamingE2EMain {
5353
"spark.cosmos.read.inferSchema.enabled" -> "false",
5454
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
5555
"spark.cosmos.changeFeed.mode" -> "Incremental",
56-
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "10000",
57-
"spark.cosmos.preferredRegionsList" -> "[West Europe, North Europe]",
58-
"spark.cosmos.throughputControl.enabled" -> "true",
59-
"spark.cosmos.throughputControl.name" -> "CopyReadFromSource",
60-
"spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.95",
61-
"spark.cosmos.throughputControl.globalControl.database" -> "SampleDatabase",
62-
"spark.cosmos.throughputControl.globalControl.container" -> "ThroughputControl",
63-
"spark.cosmos.clientTelemetry.enabled" -> "true",
64-
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
56+
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "1",
6557
"spark.cosmos.diagnostics" -> "feed"
6658
)
6759
val changeFeed_df = spark.readStream.format("cosmos.oltp.changeFeed").options(changeFeedCfg).load()

0 commit comments

Comments
 (0)