Skip to content

Commit 75aea12

Browse files
committed
Address PR feedback: fix resource leak, improve logging, clarify docs
1 parent 65810f5 commit 75aea12

File tree

5 files changed

+82
-44
lines changed

5 files changed

+82
-44
lines changed

sdk/cosmos/azure-cosmos-spark_3-5/src/main/scala/com/azure/cosmos/spark/ItemsWriterBuilder.scala

Lines changed: 59 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -59,26 +59,29 @@ private class ItemsWriterBuilder
5959
)
6060
}
6161

62+
// Extract userConfig conversion to avoid repeated calls
63+
private[this] val userConfigMap = userConfig.asCaseSensitiveMap().asScala.toMap
64+
6265
private[this] val writeConfig = CosmosWriteConfig.parseWriteConfig(
63-
userConfig.asCaseSensitiveMap().asScala.toMap,
66+
userConfigMap,
6467
inputSchema
6568
)
6669

6770
private[this] val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(
68-
userConfig.asCaseSensitiveMap().asScala.toMap
71+
userConfigMap
6972
)
7073

7174
override def toBatch(): BatchWrite =
7275
new ItemsBatchWriter(
73-
userConfig.asCaseSensitiveMap().asScala.toMap,
76+
userConfigMap,
7477
inputSchema,
7578
cosmosClientStateHandles,
7679
diagnosticsConfig,
7780
sparkEnvironmentInfo)
7881

7982
override def toStreaming: StreamingWrite =
8083
new ItemsBatchWriter(
81-
userConfig.asCaseSensitiveMap().asScala.toMap,
84+
userConfigMap,
8285
inputSchema,
8386
cosmosClientStateHandles,
8487
diagnosticsConfig,
@@ -88,6 +91,7 @@ private class ItemsWriterBuilder
8891

8992
override def requiredDistribution(): Distribution = {
9093
if (writeConfig.bulkEnabled && writeConfig.bulkTransactional) {
94+
log.logInfo("Transactional batch mode enabled - configuring data distribution by partition key columns")
9195
// For transactional writes, partition by all partition key columns
9296
val partitionKeyPaths = getPartitionKeyColumnNames()
9397
if (partitionKeyPaths.nonEmpty) {
@@ -125,44 +129,63 @@ private class ItemsWriterBuilder
125129

126130
private def getPartitionKeyColumnNames(): Seq[String] = {
127131
try {
128-
// Need to create a temporary container client to get partition key definition
129-
val clientCacheItem = CosmosClientCache(
130-
CosmosClientConfiguration(
131-
userConfig.asCaseSensitiveMap().asScala.toMap,
132-
ReadConsistencyStrategy.EVENTUAL,
133-
sparkEnvironmentInfo
134-
),
135-
Some(cosmosClientStateHandles.value.cosmosClientMetadataCaches),
136-
"ItemsWriterBuilder-PKLookup"
137-
)
138-
139-
val container = ThroughputControlHelper.getContainer(
140-
userConfig.asCaseSensitiveMap().asScala.toMap,
141-
containerConfig,
142-
clientCacheItem,
143-
None
144-
)
145-
146-
val containerProperties = SparkBridgeInternal.getContainerPropertiesFromCollectionCache(container)
147-
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
148-
149-
// Release the client
150-
clientCacheItem.close()
151-
152-
if (partitionKeyDefinition != null && partitionKeyDefinition.getPaths != null) {
153-
val paths = partitionKeyDefinition.getPaths.asScala
154-
paths.map(path => {
155-
// Remove leading '/' from partition key path (e.g., "/pk" -> "pk")
156-
if (path.startsWith("/")) path.substring(1) else path
157-
}).toSeq
158-
} else {
159-
Seq.empty[String]
132+
// Use loan pattern to ensure client is properly closed
133+
using(createClientForPartitionKeyLookup()) { clientCacheItem =>
134+
val container = ThroughputControlHelper.getContainer(
135+
userConfigMap,
136+
containerConfig,
137+
clientCacheItem,
138+
None
139+
)
140+
141+
// Simplified retrieval using SparkBridgeInternal directly
142+
val containerProperties = SparkBridgeInternal.getContainerPropertiesFromCollectionCache(container)
143+
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition
144+
145+
extractPartitionKeyPaths(partitionKeyDefinition)
160146
}
161147
} catch {
162148
case ex: Exception =>
163149
log.logWarning(s"Failed to get partition key definition for transactional writes: ${ex.getMessage}")
164150
Seq.empty[String]
165151
}
166152
}
153+
154+
private def createClientForPartitionKeyLookup(): CosmosClientCacheItem = {
155+
CosmosClientCache(
156+
CosmosClientConfiguration(
157+
userConfigMap,
158+
ReadConsistencyStrategy.EVENTUAL,
159+
sparkEnvironmentInfo
160+
),
161+
Some(cosmosClientStateHandles.value.cosmosClientMetadataCaches),
162+
"ItemsWriterBuilder-PKLookup"
163+
)
164+
}
165+
166+
private def extractPartitionKeyPaths(partitionKeyDefinition: com.azure.cosmos.models.PartitionKeyDefinition): Seq[String] = {
167+
if (partitionKeyDefinition != null && partitionKeyDefinition.getPaths != null) {
168+
val paths = partitionKeyDefinition.getPaths.asScala
169+
if (paths.isEmpty) {
170+
log.logError("Partition key definition has 0 columns - this should not happen for modern containers")
171+
}
172+
paths.map(path => {
173+
// Remove leading '/' from partition key path (e.g., "/pk" -> "pk")
174+
if (path.startsWith("/")) path.substring(1) else path
175+
}).toSeq
176+
} else {
177+
log.logError("Partition key definition is null - this should not happen for modern containers")
178+
Seq.empty[String]
179+
}
180+
}
181+
182+
// Scala loan pattern to ensure resources are properly cleaned up
183+
private def using[A <: { def close(): Unit }, B](resource: A)(f: A => B): B = {
184+
try {
185+
f(resource)
186+
} finally {
187+
if (resource != null) resource.close()
188+
}
189+
}
167190
}
168191
}

sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
6868
| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` |
6969
| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
70-
| `spark.cosmos.write.bulk.transactional` | `false` | Enable transactional batch mode for bulk writes. When enabled, all operations for the same partition key are executed atomically (all succeed or all fail). Requires ordering and clustering by partition key columns. Only supports upsert operations. Cannot exceed 100 operations or 2MB per partition key. See [Transactional Batch documentation](https://learn.microsoft.com/azure/cosmos-db/nosql/transactional-batch) for details. |\n| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
70+
| `spark.cosmos.write.bulk.transactional` | `false` | Enable transactional batch mode for bulk writes. When enabled, all operations for the same partition key are executed atomically (all succeed or all fail). Requires ordering and clustering by partition key columns. Only supports upsert operations. Cannot exceed 100 operations or 2MB per partition key. **Note**: For containers using hierarchical partition keys (HPK), transactional scope applies only to **logical partitions** (complete partition key paths), not partial top-level keys. See [Transactional Batch documentation](https://learn.microsoft.com/azure/cosmos-db/nosql/transactional-batch) for details. |\n| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
7171
| `spark.cosmos.write.bulk.initialBatchSize` | `100` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume too many RUs. |
7272
| `spark.cosmos.write.bulk.maxBatchSize` | `100` | Cosmos DB max. bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the max. micro batch size is 100. Use this setting only when migrating Spark 2.4 workloads - for other scenarios relying on the auto-tuning combined with throughput control will result in better experience. |
7373
| `spark.cosmos.write.flush.noProgress.maxIntervalInSeconds` | `180` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes before forcing a retry. The retry will reinitialize the bulk write process - so, any delays on the retry can be sure to be actual service issues. The default value of 3 min should be sufficient to prevent false negatives when there is a short service-side write unavailability - like for partition splits or merges. Increase it only if you regularly see these transient errors to exceed a time period of 180 seconds. |

0 commit comments

Comments
 (0)