Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451)

#### Other Changes
* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)

### 4.38.0 (2025-07-31)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
import org.apache.spark.broadcast.Broadcast
Expand All @@ -12,7 +14,12 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFacto
import org.apache.spark.sql.types.StructType

import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

// scalastyle:off underscore.import
import scala.collection.JavaConverters._
// scalastyle:on underscore.import

// scala style rule flaky - even complaining on partial log messages
// scalastyle:off multiple.string.literals
Expand Down Expand Up @@ -57,6 +64,18 @@ private class ChangeFeedMicroBatchStream

private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None

private val partitionIndex = new AtomicLong(0)
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()

// Register metrics listener
if (changeFeedConfig.performanceMonitoringEnabled) {
log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener")
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
} else {
log.logInfo("ChangeFeed performance monitoring is disabled")
}

override def latestOffset(): Offset = {
// For Spark data streams implementing SupportsAdmissionControl trait
// latestOffset(Offset, ReadLimit) is called instead
Expand Down Expand Up @@ -99,11 +118,15 @@ private class ChangeFeedMicroBatchStream
end
.inputPartitions
.get
.map(partition => partition
.withContinuationState(
SparkBridgeImplementationInternal
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
clearEndLsn = false))
.map(partition => {
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet())
partition
.withContinuationState(
SparkBridgeImplementationInternal
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
clearEndLsn = false)
.withIndex(index)
})
}

/**
Expand Down Expand Up @@ -150,7 +173,8 @@ private class ChangeFeedMicroBatchStream
this.containerConfig,
this.partitioningConfig,
this.defaultParallelism,
this.container
this.container,
Some(this.partitionMetricsMap)
)

if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull
import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import org.apache.spark.TaskContext
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.util.AccumulatorV2

Expand All @@ -14,6 +15,15 @@ import java.util.Locale
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.collection.mutable.ArrayBuffer

class SparkInternalsBridge {
// Only used in ChangeFeedMetricsListener, which is easier for test validation
def getInternalCustomTaskMetricsAsSQLMetric(
knownCosmosMetricNames: Set[String],
taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
}
}

object SparkInternalsBridge extends BasicLoggingTrait {
private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED"
private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED"
Expand Down Expand Up @@ -41,20 +51,23 @@ object SparkInternalsBridge extends BasicLoggingTrait {
private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed)

def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]): Map[String, SQLMetric] = {
Option.apply(TaskContext.get()) match {
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx.taskMetrics())
case None => Map.empty[String, SQLMetric]
}
}

def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics): Map[String, SQLMetric] = {

if (!reflectionAccessAllowed.get) {
Map.empty[String, SQLMetric]
} else {
Option.apply(TaskContext.get()) match {
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx)
case None => Map.empty[String, SQLMetric]
}
getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
}
}

private def getAccumulators(taskCtx: TaskContext): Option[ArrayBuffer[AccumulatorV2[_, _]]] = {
private def getAccumulators(taskMetrics: TaskMetrics): Option[ArrayBuffer[AccumulatorV2[_, _]]] = {
try {
val taskMetrics: Object = taskCtx.taskMetrics()
val method = Option(accumulatorsMethod.get) match {
case Some(existing) => existing
case None =>
Expand All @@ -79,8 +92,8 @@ object SparkInternalsBridge extends BasicLoggingTrait {

private def getInternalCustomTaskMetricsAsSQLMetricInternal(
knownCosmosMetricNames: Set[String],
taskCtx: TaskContext): Map[String, SQLMetric] = {
getAccumulators(taskCtx) match {
taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
getAccumulators(taskMetrics) match {
case Some(accumulators) => accumulators
.filter(accumulable => accumulable.isInstanceOf[SQLMetric]
&& accumulable.name.isDefined
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451)

#### Other Changes
* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)

### 4.38.0 (2025-07-31)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
import org.apache.spark.broadcast.Broadcast
Expand All @@ -12,7 +14,12 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFacto
import org.apache.spark.sql.types.StructType

import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

// scalastyle:off underscore.import
import scala.collection.JavaConverters._
// scalastyle:on underscore.import

// scala style rule flaky - even complaining on partial log messages
// scalastyle:off multiple.string.literals
Expand Down Expand Up @@ -59,6 +66,17 @@ private class ChangeFeedMicroBatchStream

private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None

private val partitionIndex = new AtomicLong(0)
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()

if (changeFeedConfig.performanceMonitoringEnabled) {
log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener")
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
} else {
log.logInfo("ChangeFeed performance monitoring is disabled")
}

override def latestOffset(): Offset = {
// For Spark data streams implementing SupportsAdmissionControl trait
// latestOffset(Offset, ReadLimit) is called instead
Expand Down Expand Up @@ -101,11 +119,15 @@ private class ChangeFeedMicroBatchStream
end
.inputPartitions
.get
.map(partition => partition
.withContinuationState(
SparkBridgeImplementationInternal
.map(partition => {
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet())
partition
.withContinuationState(
SparkBridgeImplementationInternal
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
clearEndLsn = false))
clearEndLsn = false)
.withIndex(index)
})
}

/**
Expand Down Expand Up @@ -152,7 +174,8 @@ private class ChangeFeedMicroBatchStream
this.containerConfig,
this.partitioningConfig,
this.defaultParallelism,
this.container
this.container,
Some(this.partitionMetricsMap)
)

if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,23 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull
import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import org.apache.spark.TaskContext
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.util.AccumulatorV2

import java.lang.reflect.Method
import java.util.Locale
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

class SparkInternalsBridge {
// Only used in ChangeFeedMetricsListener, which is easier for test validation
def getInternalCustomTaskMetricsAsSQLMetric(
knownCosmosMetricNames: Set[String],
taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {
SparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
}
}

object SparkInternalsBridge extends BasicLoggingTrait {
private val SPARK_REFLECTION_ACCESS_ALLOWED_PROPERTY = "COSMOS.SPARK_REFLECTION_ACCESS_ALLOWED"
private val SPARK_REFLECTION_ACCESS_ALLOWED_VARIABLE = "COSMOS_SPARK_REFLECTION_ACCESS_ALLOWED"
Expand Down Expand Up @@ -40,20 +50,23 @@ object SparkInternalsBridge extends BasicLoggingTrait {
private final lazy val reflectionAccessAllowed = new AtomicBoolean(getSparkReflectionAccessAllowed)

def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String]) : Map[String, SQLMetric] = {
Option.apply(TaskContext.get()) match {
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames, taskCtx.taskMetrics())
case None => Map.empty[String, SQLMetric]
}
}

def getInternalCustomTaskMetricsAsSQLMetric(knownCosmosMetricNames: Set[String], taskMetrics: TaskMetrics) : Map[String, SQLMetric] = {

if (!reflectionAccessAllowed.get) {
Map.empty[String, SQLMetric]
} else {
Option.apply(TaskContext.get()) match {
case Some(taskCtx) => getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskCtx)
case None => Map.empty[String, SQLMetric]
}
getInternalCustomTaskMetricsAsSQLMetricInternal(knownCosmosMetricNames, taskMetrics)
}
}

private def getAccumulators(taskCtx: TaskContext): Option[Seq[AccumulatorV2[_, _]]] = {
private def getAccumulators(taskMetrics: TaskMetrics): Option[Seq[AccumulatorV2[_, _]]] = {
try {
val taskMetrics: Object = taskCtx.taskMetrics()
val method = Option(accumulatorsMethod.get) match {
case Some(existing) => existing
case None =>
Expand All @@ -78,8 +91,8 @@ object SparkInternalsBridge extends BasicLoggingTrait {

private def getInternalCustomTaskMetricsAsSQLMetricInternal(
knownCosmosMetricNames: Set[String],
taskCtx: TaskContext): Map[String, SQLMetric] = {
getAccumulators(taskCtx) match {
taskMetrics: TaskMetrics): Map[String, SQLMetric] = {
getAccumulators(taskMetrics) match {
case Some(accumulators) => accumulators
.filter(accumulable => accumulable.isInstanceOf[SQLMetric]
&& accumulable.name.isDefined
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Added log4j-core to the list of shaded packages to avoid conflicts when customers use log4j in a different version. - See [PR 45924](https://github.com/Azure/azure-sdk-for-java/pull/46451)

#### Other Changes
* Added change feed performance monitoring which is used to improve end lsn calculation in `CosmosPartitionPlanner`. - See [PR 46320](https://github.com/Azure/azure-sdk-for-java/pull/46320)

### 4.38.0 (2025-07-31)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
import org.apache.spark.broadcast.Broadcast
Expand All @@ -13,6 +15,12 @@ import org.apache.spark.sql.types.StructType

import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

// scalastyle:off underscore.import
import scala.collection.JavaConverters._
// scalastyle:on underscore.import

// scala style rule flaky - even complaining on partial log messages
// scalastyle:off multiple.string.literals
Expand Down Expand Up @@ -59,6 +67,17 @@ private class ChangeFeedMicroBatchStream

private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None

private val partitionIndex = new AtomicLong(0)
private val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
private val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()

if (changeFeedConfig.performanceMonitoringEnabled) {
log.logInfo("ChangeFeed performance monitoring is enabled, registering ChangeFeedMetricsListener")
session.sparkContext.addSparkListener(new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap))
} else {
log.logInfo("ChangeFeed performance monitoring is disabled")
}

override def latestOffset(): Offset = {
// For Spark data streams implementing SupportsAdmissionControl trait
// latestOffset(Offset, ReadLimit) is called instead
Expand Down Expand Up @@ -101,11 +120,15 @@ private class ChangeFeedMicroBatchStream
end
.inputPartitions
.get
.map(partition => partition
.withContinuationState(
SparkBridgeImplementationInternal
.map(partition => {
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet())
partition
.withContinuationState(
SparkBridgeImplementationInternal
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
clearEndLsn = false))
clearEndLsn = false)
.withIndex(index)
})
}

/**
Expand Down Expand Up @@ -152,7 +175,8 @@ private class ChangeFeedMicroBatchStream
this.containerConfig,
this.partitioningConfig,
this.defaultParallelism,
this.container
this.container,
Some(this.partitionMetricsMap)
)

if (offset.changeFeedState != startChangeFeedOffset.changeFeedState) {
Expand Down
Loading