Skip to content

Commit 50a5907

Browse files
author
annie-mac
committed
fix
1 parent d212d66 commit 50a5907

File tree

3 files changed

+7
-2
lines changed

3 files changed

+7
-2
lines changed

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package com.azure.cosmos.changeFeedMetrics
55

66
import com.azure.cosmos.implementation.guava25.collect.EvictingQueue
77
import com.azure.cosmos.spark.NormalizedRange
8+
import org.slf4j.{Logger, LoggerFactory}
89

910
// scalastyle:off underscore.import
1011
import scala.collection.JavaConverters._
@@ -34,6 +35,8 @@ private[cosmos] class ChangeFeedMetricsTracker(
3435
private val maxHistory: Int = ChangeFeedMetricsTracker.DefaultMaxHistory,
3536
private val decayFactor: Double = ChangeFeedMetricsTracker.DefaultDecayFactor
3637
) {
38+
@transient private lazy val log : Logger = LoggerFactory.getLogger(ChangeFeedMetricsTracker.getClass)
39+
3740
private val changeFeedChangesPerLsnHistory = EvictingQueue.create[Double](maxHistory)
3841
private var currentChangesPerLsnOpt: Option[Double] = None
3942

@@ -83,6 +86,8 @@ private[cosmos] class ChangeFeedMetricsTracker(
8386
* @return Current weighted LSN gap
8487
*/
8588
def getWeightedAvgChangesPerLsn: Option[Double] = {
89+
// TODO: remove
90+
log.info(s"getWeightedAvgChangesPerLsn for feedRangeIndex $partitionIndex, $feedRange $feedRange value $currentChangesPerLsnOpt")
8691
this.currentChangesPerLsnOpt
8792
}
8893
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private case class ChangeFeedPartitionReader
6565
}
6666

6767
private val containerTargetConfig = CosmosContainerConfig.parseCosmosContainerConfig(config)
68-
log.logInfo(s"Reading from feed range ${partition.feedRange} of " +
68+
log.logInfo(s"Reading from feed range ${partition.feedRange}, endLsn ${partition.endLsn} of " +
6969
s"container ${containerTargetConfig.database}.${containerTargetConfig.container}")
7070
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
7171
private val clientCacheItem = CosmosClientCache(

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
618618
} else {
619619
// the weight of this feedRange compared to other feedRanges
620620

621-
val feedRangeWeightFactor = this.getWeightedLsnGap(metadata, isChangeFeed, partitionMetricsMap) / totalWeightedLsnGap.get
621+
val feedRangeWeightFactor = this.getWeightedLsnGap(metadata, isChangeFeed, partitionMetricsMap).toDouble / totalWeightedLsnGap.get
622622

623623
val allowedRate =
624624
(feedRangeWeightFactor * maxRowsLimit.maxRows() / this.getAvgChangesPerLsn(metadata, isChangeFeed, partitionMetricsMap))

0 commit comments

Comments
 (0)