-
Notifications
You must be signed in to change notification settings - Fork 2.1k
SparkChangeFeedEndLSNImprovement #46320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SparkChangeFeedEndLSNImprovement #46320
Conversation
1623250
to
d790eb1
Compare
...cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala
Outdated
Show resolved
Hide resolved
...azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/SparkInternalsBridge.scala
Show resolved
Hide resolved
...cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds an improvement for end LSN calculation in change feed processing by introducing a metrics-based approach to optimize micro-batch sizing. The changes implement a system that tracks per-partition metrics to better estimate how many changes per LSN to expect, allowing for more accurate end LSN calculations when LSN can increase without producing actual change feed changes.
Key changes:
- Added
ChangeFeedMetricsTracker
to calculate weighted average changes per LSN using exponential decay - Introduced Spark metrics listener infrastructure to capture and track change feed metrics across micro-batches
- Enhanced
CosmosPartitionPlanner
to utilize metrics data when available for improved end LSN calculations
Reviewed Changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
ChangeFeedMetricsTracker.scala | New metrics tracker with exponential weighting for changes per LSN calculation |
ChangeFeedMetricsListener.scala | Spark listener to capture and process change feed metrics |
CosmosPartitionPlanner.scala | Enhanced end LSN calculation to use metrics data when available |
ChangeFeedPartitionReader.scala | Added custom metrics collection for LSN gaps and fetched changes |
CosmosInputPartition.scala | Added optional partition index field for metrics tracking |
Multiple test files | Updated method signatures and added comprehensive test coverage |
...azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala
Outdated
Show resolved
Hide resolved
...cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala
Outdated
Show resolved
Hide resolved
...cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala
Outdated
Show resolved
Hide resolved
...zure-cosmos-spark_3-5_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedMetricsTest.scala
Outdated
Show resolved
Hide resolved
...zure-cosmos-spark_3-5_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedMetricsTest.scala
Outdated
Show resolved
Hide resolved
...park_3_2-12/src/main/scala/com/azure/cosmos/changeFeedMetrics/ChangeFeedMetricsTracker.scala
Outdated
Show resolved
Hide resolved
183bcb6
to
afd96dd
Compare
...cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - thanks @xinlian12 - this looks really great - the design explanation in the PR description helped a lot reviwing it and the implementation looks very clean! Only small ask woudl be to move the configurability to Spark config. Thanks again!
ed77d5d
to
592661a
Compare
/azp run java - cosmos - spark |
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - Thanks
Issue
The current change feed endLSN calculation is not optimized for cases where LSN can increase without producing any change feed changes, for example updates changed purely due to binary encoding.
In this PR, we are trying to improve the endLSN calculation by utilizing per task metrics.
High level Design
Use three custom metrics and two coordinator components working together to track and analyze change feed performance which is used to tune the batch size during partition planning stage.
Core Metrics
ChangeFeedItemsCntMetric
- A CustomSumMetric that tracks the total number of items fetched within a change feed micro-batch for each partition. Important to note that not all fetched changes are necessarily returned to Spark.ChangeFeedLsnRangeMetric
- A CustomSumMetric tracking the LSN (Log Sequence Number) range for partitions within each micro-batch. LSN represents the sequence of changes in the partition.ChangeFeedPartitionIndexMetric
- A CustomMetric that captures and maintains consistent partition indices, essential for correlating metrics across the system.Coordinator Components
ChangeFeedMetricsTracker
Manages metrics calculation using an exponential weighting approach
ChangeFeedMetricsListener
A Spark listener that integrates with Spark's metrics system
partitionIndexMap
: BiMap for mapping between partition ranges and numeric indicespartitionMetricsMap
: ConcurrentHashMap storing metrics trackers per partitionConfigs
By default, the dynamic batch tuning will be enabled, but customer can choose to opt out by setting
spark.cosmos.changeFeed.performance.monitoring.enabled
to befalse
Test
GreenTaxiRecords
by using 01_Batch with a mix of bulk and point writes.Dev branch
Master branch
Notes: