Skip to content

Commit c976e82

Browse files
xinlian12annie-mac
andauthored
improvePerfDuringPartitionPlanning (#46727)
* improve perf for partition planning --------- Co-authored-by: annie-mac <[email protected]>
1 parent b58648d commit c976e82

File tree

9 files changed

+21
-11
lines changed

9 files changed

+21
-11
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Added improvement to reduce partition planning time for large containers. - See [46727](https://github.com/Azure/azure-sdk-for-java/pull/46727)
1213

1314
### 4.39.0 (2025-09-05)
1415

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.spark
44

5+
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
56
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
67
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, UUIDs}
7-
import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
88
import com.azure.cosmos.spark.CosmosPredicates.{assertNotNull, assertNotNullOrEmpty, assertOnSparkDriver}
99
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, LoggerHelper}
1010
import org.apache.spark.broadcast.Broadcast
@@ -115,6 +115,7 @@ private class ChangeFeedMicroBatchStream
115115

116116
assert(end.inputPartitions.isDefined, "Argument 'endOffset.inputPartitions' must not be null or empty.")
117117

118+
val parsedStartChangeFeedState = SparkBridgeImplementationInternal.parseChangeFeedState(start.changeFeedState)
118119
end
119120
.inputPartitions
120121
.get
@@ -123,7 +124,7 @@ private class ChangeFeedMicroBatchStream
123124
partition
124125
.withContinuationState(
125126
SparkBridgeImplementationInternal
126-
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
127+
.extractChangeFeedStateForRange(parsedStartChangeFeedState, partition.feedRange),
127128
clearEndLsn = false)
128129
.withIndex(index)
129130
})

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Added improvement to reduce partition planning time for large containers. - See [46727](https://github.com/Azure/azure-sdk-for-java/pull/46727)
1213

1314
### 4.39.0 (2025-09-05)
1415

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ private class ChangeFeedMicroBatchStream
116116

117117
assert(end.inputPartitions.isDefined, "Argument 'endOffset.inputPartitions' must not be null or empty.")
118118

119+
val parsedStartChangeFeedState = SparkBridgeImplementationInternal.parseChangeFeedState(start.changeFeedState)
119120
end
120121
.inputPartitions
121122
.get
@@ -124,7 +125,7 @@ private class ChangeFeedMicroBatchStream
124125
partition
125126
.withContinuationState(
126127
SparkBridgeImplementationInternal
127-
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
128+
.extractChangeFeedStateForRange(parsedStartChangeFeedState, partition.feedRange),
128129
clearEndLsn = false)
129130
.withIndex(index)
130131
})

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Added improvement to reduce partition planning time for large containers. - See [46727](https://github.com/Azure/azure-sdk-for-java/pull/46727)
1213

1314
### 4.39.0 (2025-09-05)
1415

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ private class ChangeFeedMicroBatchStream
117117

118118
assert(end.inputPartitions.isDefined, "Argument 'endOffset.inputPartitions' must not be null or empty.")
119119

120+
val parsedStartChangeFeedState = SparkBridgeImplementationInternal.parseChangeFeedState(start.changeFeedState)
120121
end
121122
.inputPartitions
122123
.get
@@ -125,7 +126,7 @@ private class ChangeFeedMicroBatchStream
125126
partition
126127
.withContinuationState(
127128
SparkBridgeImplementationInternal
128-
.extractChangeFeedStateForRange(start.changeFeedState, partition.feedRange),
129+
.extractChangeFeedStateForRange(parsedStartChangeFeedState, partition.feedRange),
129130
clearEndLsn = false)
130131
.withIndex(index)
131132
})

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,19 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra
172172

173173
def extractChangeFeedStateForRange
174174
(
175-
stateJsonBase64: String,
175+
changeFeedState: ChangeFeedState,
176176
feedRange: NormalizedRange
177177
): String = {
178-
assert(!Strings.isNullOrWhiteSpace(stateJsonBase64), s"Argument 'stateJsonBase64' must not be null or empty.")
179-
ChangeFeedState
180-
.fromString(stateJsonBase64)
178+
changeFeedState
181179
.extractForEffectiveRange(toCosmosRange(feedRange))
182180
.toString
183181
}
184182

183+
def parseChangeFeedState(changeFeedStateJsonString: String): ChangeFeedState = {
184+
assert(!Strings.isNullOrWhiteSpace(changeFeedStateJsonString), s"Argument 'changeFeedStateJsonString' must not be null or empty.")
185+
ChangeFeedState.fromString(changeFeedStateJsonString)
186+
}
187+
185188
def toFeedRange(range: NormalizedRange): FeedRange = {
186189
new FeedRangeEpkImpl(toCosmosRange(range))
187190
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,14 @@ private class ChangeFeedBatch
185185

186186
// Latest offset above has the EndLsn specified based on the point-in-time latest offset
187187
// For batch mode instead we need to reset it so that the change feed will get fully drained
188+
val parsedInitialOffset = SparkBridgeImplementationInternal.parseChangeFeedState(initialOffsetJson)
188189
val inputPartitions = latestOffset
189190
.inputPartitions
190191
.get
191192
.map(partition => partition
192193
.withContinuationState(
193194
SparkBridgeImplementationInternal
194-
.extractChangeFeedStateForRange(initialOffsetJson, partition.feedRange),
195+
.extractChangeFeedStateForRange(parsedInitialOffset, partition.feedRange),
195196
clearEndLsn = !hasBatchCheckpointLocation))
196197
.map(_.asInstanceOf[InputPartition])
197198

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/CreateSpark2ContinuationsFromChangeFeedOffset.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,11 @@ class CreateSpark2ContinuationsFromChangeFeedOffset extends UDF2[Map[String, Str
8080
pkRanges
8181
.foreach(pkRange => {
8282
val normalizedRange = rangeToNormalizedRange(pkRange.toRange)
83-
83+
val parsedChangeFeedState = SparkBridgeImplementationInternal.parseChangeFeedState(offset.changeFeedState)
8484
val effectiveChangeFeedState = ChangeFeedState
8585
.fromString(
8686
SparkBridgeImplementationInternal
87-
.extractChangeFeedStateForRange(offset.changeFeedState, normalizedRange)
87+
.extractChangeFeedStateForRange(parsedChangeFeedState, normalizedRange)
8888
)
8989

9090
val containerResourceId = effectiveChangeFeedState.getContainerRid

0 commit comments

Comments
 (0)