Skip to content

Commit 5087bf2

Browse files
Fixes perf issue when Split results in 410 when trying to get latestN in Spark partitioner (Azure#29152)
* Fixes perf issue when Splitresults in 410 when trying to get latestLSN in Spark partitioner * Fixing unit test issues (regressions due to bad test data) * Making sure consistency level is passed to service * Change logs * Reacted to code review feedback
1 parent 240243e commit 5087bf2

File tree

8 files changed

+174
-15
lines changed

8 files changed

+174
-15
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#### Breaking Changes
1010

1111
#### Bugs Fixed
12+
* Fixed possible perf issue when Split results in 410 when trying to get latest LSN in Spark partitioner that could result in reprocessing change feed events (causing "hot partition2") - See [PR 29152](https://github.com/Azure/azure-sdk-for-java/pull/29152)
13+
* Fixed a bug resulting in ChangeFeed requests using the account's default consistency model instead of falling back to eventual if `spark.cosmos.read.forceEventualConsistency` is `true` (the default config). - See [PR 29152](https://github.com/Azure/azure-sdk-for-java/pull/29152)
1214

1315
#### Other Changes
1416

sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#### Breaking Changes
1010

1111
#### Bugs Fixed
12+
* Fixed possible perf issue when Split results in 410 when trying to get latest LSN in Spark partitioner that could result in reprocessing change feed events (causing "hot partition2") - See [PR 29152](https://github.com/Azure/azure-sdk-for-java/pull/29152)
13+
* Fixed a bug resulting in ChangeFeed requests using the account's default consistency model instead of falling back to eventual if `spark.cosmos.read.forceEventualConsistency` is `true` (the default config). - See [PR 29152](https://github.com/Azure/azure-sdk-for-java/pull/29152)
1214

1315
#### Other Changes
1416

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -543,29 +543,34 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
543543
// Update endLsn - which depends on read limit
544544
.map(metadata => {
545545
val endLsn = readLimit match {
546-
case _: ReadAllAvailable => metadata.latestLsn
546+
case _: ReadAllAvailable => metadata.getAndValidateLatestLsn
547547
case maxRowsLimit: ReadMaxRows =>
548548
if (totalWeightedLsnGap.get <= maxRowsLimit.maxRows) {
549+
val effectiveLatestLsn = metadata.getAndValidateLatestLsn
549550
if (isDebugLogEnabled) {
550551
val calculateDebugLine = s"calculateEndLsn (feedRange: ${metadata.feedRange}) - avg. Docs " +
551552
s"per LSN: ${metadata.getAvgItemsPerLsn} documentCount ${metadata.documentCount} firstLsn " +
552553
s"${metadata.firstLsn} latestLsn ${metadata.latestLsn} startLsn ${metadata.startLsn} weightedGap " +
553-
s"${metadata.getWeightedLsnGap} effectiveEndLsn ${metadata.latestLsn} maxRows ${maxRowsLimit.maxRows}"
554+
s"${metadata.getWeightedLsnGap} effectiveEndLsn $effectiveLatestLsn maxRows ${maxRowsLimit.maxRows}"
554555
logDebug(calculateDebugLine)
555556
}
556-
metadata.latestLsn
557+
effectiveLatestLsn
557558
} else {
558559
// the weight of this feedRange compared to other feedRanges
559560
val feedRangeWeightFactor = metadata.getWeightedLsnGap.toDouble / totalWeightedLsnGap.get
560561

561562
val allowedRate = (feedRangeWeightFactor * maxRowsLimit.maxRows() / metadata.getAvgItemsPerLsn)
562563
.toLong
563564
.max(1)
564-
val effectiveEndLsn = math.min(metadata.latestLsn, metadata.startLsn + allowedRate)
565+
val effectiveLatestLsn = metadata.getAndValidateLatestLsn
566+
val effectiveEndLsn = math.min(
567+
effectiveLatestLsn,
568+
metadata.startLsn + allowedRate)
565569
if (isDebugLogEnabled) {
566570
val calculateDebugLine = s"calculateEndLsn (feedRange: ${metadata.feedRange}) - avg. Docs/LSN: " +
567571
s"${metadata.getAvgItemsPerLsn} feedRangeWeightFactor $feedRangeWeightFactor documentCount " +
568-
s"${metadata.documentCount} firstLsn ${metadata.firstLsn} latestLsn ${metadata.latestLsn} startLsn " +
572+
s"${metadata.documentCount} firstLsn ${metadata.firstLsn} latestLsn ${metadata.latestLsn} " +
573+
s"effectiveLatestLsn $effectiveLatestLsn startLsn " +
569574
s"${metadata.startLsn} allowedRate $allowedRate weightedGap ${metadata.getWeightedLsnGap} " +
570575
s"effectiveEndLsn $effectiveEndLsn maxRows $maxRowsLimit.maxRows"
571576
logDebug(calculateDebugLine)
@@ -584,7 +589,8 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
584589
storageSizeInMB: Double,
585590
metadata: PartitionMetadata): Double = {
586591

587-
val effectiveEndLsn = metadata.endLsn.getOrElse(metadata.latestLsn)
592+
val effectiveLatestLsn = metadata.getAndValidateLatestLsn
593+
val effectiveEndLsn = metadata.endLsn.getOrElse(effectiveLatestLsn)
588594
if (metadata.startLsn <= 0 || storageSizeInMB == 0) {
589595
// No progress has been made so far - use one Spark partition per GB
590596
1
@@ -595,7 +601,7 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
595601
} else {
596602
// Use weight factor based on progress. This estimate assumes equal distribution of storage
597603
// size per LSN - which is a "good enough" simplification
598-
(effectiveEndLsn - metadata.startLsn) / metadata.latestLsn.toDouble
604+
(effectiveEndLsn - metadata.startLsn) / effectiveLatestLsn.toDouble
599605
}
600606
}
601607

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/PartitionMetadata.scala

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private[cosmos] case class PartitionMetadata
114114
}
115115

116116
def getWeightedLsnGap: Long = {
117-
val progressFactor = math.max(this.latestLsn - this.startLsn, 0)
117+
val progressFactor = math.max(this.getAndValidateLatestLsn - this.startLsn, 0)
118118
if (progressFactor == 0) {
119119
0
120120
} else {
@@ -128,11 +128,26 @@ private[cosmos] case class PartitionMetadata
128128

129129
def getAvgItemsPerLsn: Double = {
130130
if (this.firstLsn.isEmpty) {
131-
math.max(1d, this.documentCount.toDouble / this.latestLsn)
132-
} else if (this.documentCount == 0 || (this.latestLsn - this.firstLsn.get) <= 0) {
131+
math.max(1d, this.documentCount.toDouble / this.getAndValidateLatestLsn)
132+
} else if (this.documentCount == 0 || (this.getAndValidateLatestLsn - this.firstLsn.get) <= 0) {
133133
1d
134134
} else {
135-
this.documentCount.toDouble / (this.latestLsn - this.firstLsn.get)
135+
this.documentCount.toDouble / (this.getAndValidateLatestLsn- this.firstLsn.get)
136+
}
137+
}
138+
139+
def getAndValidateLatestLsn(): Long = {
140+
if (this.latestLsn == 0) {
141+
// latestLsn == 0 but startLsn > 0 means there was an issue where change feed continuation
142+
// was null - endLsn created here will be used as the startLsn for the next micro batch iteration
143+
// so it should never be smaller than startLsn
144+
this.startLsn
145+
} else {
146+
if (this.latestLsn < this.startLsn) {
147+
throw new IllegalStateException(
148+
s"Latest LSN ${this.latestLsn} must not be smaller than start LSN ${this.startLsn}")
149+
}
150+
this.latestLsn
136151
}
137152
}
138153
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosPartitionPlannerITest.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class CosmosPartitionPlannerITest
138138
evaluateStorageBasedStrategy(
139139
128 * 10 * 1024,
140140
1 * cosmosBEPartitionCount,
141-
Some(200))
141+
Some(100))
142142

143143
evaluateStorageBasedStrategy(
144144
128 * 10 * 1024,
@@ -170,7 +170,7 @@ class CosmosPartitionPlannerITest
170170
evaluateStorageBasedStrategy(
171171
128 * 10 * 1024,
172172
4 * cosmosBEPartitionCount, // would usually be just 1 because progress > 100%
173-
Some(200),
173+
Some(100),
174174
defaultMinimalPartitionCount = 4 * cosmosBEPartitionCount)
175175
}
176176

@@ -189,15 +189,15 @@ class CosmosPartitionPlannerITest
189189
"Custom",
190190
128 * 10 * 1024,
191191
23 * cosmosBEPartitionCount, // would usually be just 1 because progress > 100%
192-
Some(200),
192+
Some(100),
193193
customPartitionCount = Some(23 * cosmosBEPartitionCount))
194194

195195
// targetPartitionCount is ignore when Strategy is != Custom
196196
evaluateStrategy(
197197
"Default",
198198
128 * 10 * 1024,
199199
1 * cosmosBEPartitionCount, // would usually be just 1 because progress > 100%
200-
Some(200),
200+
Some(100),
201201
customPartitionCount = Some(23 * cosmosBEPartitionCount))
202202
}
203203

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosPartitionPlannerSpec.scala

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,135 @@ class CosmosPartitionPlannerSpec extends UnitSpec {
7373
calculate(0).endLsn.get shouldBe latestLsn
7474
}
7575

76+
it should "calculateEndLsn should have latestLsn >= startLsn when latestLsn==0 (no continuation)" in {
77+
78+
val clientConfig = CosmosClientConfiguration(
79+
UUID.randomUUID().toString,
80+
UUID.randomUUID().toString,
81+
UUID.randomUUID().toString,
82+
useGatewayMode = false,
83+
useEventualConsistency = true,
84+
enableClientTelemetry = false,
85+
disableTcpConnectionEndpointRediscovery = false,
86+
clientTelemetryEndpoint = None,
87+
preferredRegionsList = Option.empty)
88+
89+
val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString)
90+
val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString)
91+
val docSizeInKB = rnd.nextInt()
92+
val firstLsn = None
93+
val latestLsn = 0
94+
val startLsn = 2057
95+
val docCount = 200174
96+
val nowEpochMs = Instant.now.toEpochMilli
97+
val createdAt = new AtomicLong(nowEpochMs)
98+
val lastRetrievedAt = new AtomicLong(nowEpochMs)
99+
100+
val metadata1 = PartitionMetadata(
101+
Map[String, String](),
102+
clientConfig,
103+
None,
104+
containerConfig,
105+
normalizedRange,
106+
docCount,
107+
docSizeInKB,
108+
firstLsn,
109+
latestLsn,
110+
startLsn,
111+
None,
112+
createdAt,
113+
lastRetrievedAt)
114+
115+
val metadata2 = PartitionMetadata(
116+
Map[String, String](),
117+
clientConfig,
118+
None,
119+
containerConfig,
120+
normalizedRange,
121+
docCount,
122+
docSizeInKB,
123+
firstLsn,
124+
latestLsn,
125+
startLsn,
126+
None,
127+
createdAt,
128+
lastRetrievedAt)
129+
130+
val calculate = CosmosPartitionPlanner.calculateEndLsn(
131+
Array[PartitionMetadata](metadata1, metadata2),
132+
ReadLimit.allAvailable()
133+
)
134+
135+
calculate(0).endLsn.get shouldBe startLsn
136+
}
137+
138+
it should "calculateEndLsn should throw when latestLsn > 0 but < startLsn" in {
139+
140+
val clientConfig = CosmosClientConfiguration(
141+
UUID.randomUUID().toString,
142+
UUID.randomUUID().toString,
143+
UUID.randomUUID().toString,
144+
useGatewayMode = false,
145+
useEventualConsistency = true,
146+
enableClientTelemetry = false,
147+
disableTcpConnectionEndpointRediscovery = false,
148+
clientTelemetryEndpoint = None,
149+
preferredRegionsList = Option.empty)
150+
151+
val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString)
152+
val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString)
153+
val docSizeInKB = rnd.nextInt()
154+
val firstLsn = None
155+
val latestLsn = 2056
156+
val startLsn = 2057
157+
val docCount = 200174
158+
val nowEpochMs = Instant.now.toEpochMilli
159+
val createdAt = new AtomicLong(nowEpochMs)
160+
val lastRetrievedAt = new AtomicLong(nowEpochMs)
161+
162+
val metadata1 = PartitionMetadata(
163+
Map[String, String](),
164+
clientConfig,
165+
None,
166+
containerConfig,
167+
normalizedRange,
168+
docCount,
169+
docSizeInKB,
170+
firstLsn,
171+
latestLsn,
172+
startLsn,
173+
None,
174+
createdAt,
175+
lastRetrievedAt)
176+
177+
val metadata2 = PartitionMetadata(
178+
Map[String, String](),
179+
clientConfig,
180+
None,
181+
containerConfig,
182+
normalizedRange,
183+
docCount,
184+
docSizeInKB,
185+
firstLsn,
186+
latestLsn,
187+
startLsn,
188+
None,
189+
createdAt,
190+
lastRetrievedAt)
191+
192+
try {
193+
CosmosPartitionPlanner.calculateEndLsn(
194+
Array[PartitionMetadata](metadata1, metadata2),
195+
ReadLimit.allAvailable()
196+
)
197+
198+
fail("Should have thrown on invalid data (latestLsn > 0 but < startLsn")
199+
}
200+
catch {
201+
case _: Exception => succeed
202+
}
203+
}
204+
76205
it should "calculateEndLsn with readLimit should honor estimated lag" in {
77206

78207
val clientConfig = CosmosClientConfiguration(

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Making CosmosPatchOperations thread-safe. Usually there is no reason to modify a CosmosPatchOperations instance concurrently form multiple threads - but making it thread-safe acts as protection in case this is done anyway - See [PR 29143](https://github.com/Azure/azure-sdk-for-java/pull/29143)
1213

1314
### 4.30.0 (2022-05-20)
1415
#### Bugs Fixed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ private RxDocumentServiceRequest createDocumentServiceRequest() {
126126
headers.put(HttpConstants.HttpHeaders.POPULATE_QUOTA_INFO, String.valueOf(true));
127127
}
128128

129+
if (this.client.getConsistencyLevel() != null) {
130+
headers.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, this.client.getConsistencyLevel().toString());
131+
}
132+
129133
return RxDocumentServiceRequest.create(clientContext,
130134
OperationType.ReadFeed,
131135
resourceType,

0 commit comments

Comments
 (0)