@@ -104,12 +104,6 @@ private class TransactionalBulkWriter
104104 private val currentBatchOperations = new mutable.ListBuffer [(ObjectNode , OperationContext , String )]()
105105 private val batchConstructionLock = new Object ()
106106
107- private val cosmosPatchHelperOpt = writeConfig.itemWriteStrategy match {
108- case ItemWriteStrategy .ItemPatch | ItemWriteStrategy .ItemPatchIfExists =>
109- Some (new CosmosPatchHelper (diagnosticsConfig, writeConfig.patchConfigs.get))
110- case _ => None
111- }
112-
113107 private val activeBulkWriteOperations = java.util.concurrent.ConcurrentHashMap .newKeySet[CosmosItemOperation ]().asScala
114108 private val operationContextMap = new java.util.concurrent.ConcurrentHashMap [CosmosItemOperation , OperationContext ]().asScala
115109 private val semaphore = new Semaphore (maxPendingOperations)
@@ -514,40 +508,12 @@ private class TransactionalBulkWriter
514508 }
515509 }
516510
517- private [this ] def getPatchOperationsForBatch (itemId : String , objectNode : ObjectNode ): CosmosPatchOperations = {
518- assert(writeConfig.patchConfigs.isDefined)
519- assert(cosmosPatchHelperOpt.isDefined)
520- val cosmosPatchHelper = cosmosPatchHelperOpt.get
521- cosmosPatchHelper.createCosmosPatchOperations(itemId, partitionKeyDefinition, objectNode)
522- }
523-
524511 private [this ] def finalFlushBatch (): Unit = {
525512 batchConstructionLock.synchronized {
526513 flushCurrentBatch()
527514 }
528515 }
529516
530- private [this ] def getPatchItemOperation (itemId : String ,
531- partitionKey : PartitionKey ,
532- partitionKeyDefinition : PartitionKeyDefinition ,
533- objectNode : ObjectNode ,
534- context : OperationContext ): CosmosItemOperation = {
535-
536- assert(writeConfig.patchConfigs.isDefined)
537- assert(cosmosPatchHelperOpt.isDefined)
538- val patchConfigs = writeConfig.patchConfigs.get
539- val cosmosPatchHelper = cosmosPatchHelperOpt.get
540-
541- val cosmosPatchOperations = cosmosPatchHelper.createCosmosPatchOperations(itemId, partitionKeyDefinition, objectNode)
542-
543- val requestOptions = new CosmosBulkPatchItemRequestOptions ()
544- if (patchConfigs.filter.isDefined && ! StringUtils .isEmpty(patchConfigs.filter.get)) {
545- requestOptions.setFilterPredicate(patchConfigs.filter.get)
546- }
547-
548- CosmosBulkOperations .getPatchItemOperation(itemId, partitionKey, cosmosPatchOperations, requestOptions, context)
549- }
550-
551517 // scalastyle:off method.length
552518 // scalastyle:off cyclomatic.complexity
553519 private [this ] def handleNonSuccessfulStatusCode
@@ -918,53 +884,26 @@ private class TransactionalBulkWriter
918884 }
919885
920886 private def shouldIgnore (statusCode : Int , subStatusCode : Int ): Boolean = {
921- val returnValue = writeConfig.itemWriteStrategy match {
922- case ItemWriteStrategy .ItemAppend => Exceptions .isResourceExistsException(statusCode)
923- case ItemWriteStrategy .ItemPatchIfExists => Exceptions .isNotFoundExceptionCore(statusCode, subStatusCode)
924- case ItemWriteStrategy .ItemDelete => Exceptions .isNotFoundExceptionCore(statusCode, subStatusCode)
925- case ItemWriteStrategy .ItemDeleteIfNotModified => Exceptions .isNotFoundExceptionCore(statusCode, subStatusCode) ||
926- Exceptions .isPreconditionFailedException(statusCode)
927- case ItemWriteStrategy .ItemOverwriteIfNotModified =>
928- Exceptions .isResourceExistsException(statusCode) ||
929- Exceptions .isNotFoundExceptionCore(statusCode, subStatusCode) ||
930- Exceptions .isPreconditionFailedException(statusCode)
931- case _ => false
932- }
933-
934- returnValue
887+ // Transactional batches only support ItemOverwrite - no errors to ignore
888+ false
935889 }
936890
937891 private def shouldRetry (statusCode : Int , subStatusCode : Int , operationContext : OperationContext ): Boolean = {
938- var returnValue = false
939- if (operationContext.attemptNumber < writeConfig.maxRetryCount) {
940- returnValue = writeConfig.itemWriteStrategy match {
941- case ItemWriteStrategy .ItemBulkUpdate =>
942- this .shouldRetryForItemPatchBulkUpdate(statusCode, subStatusCode)
943- // Upsert can return 404/0 in rare cases (when due to TTL expiration there is a race condition
944- case ItemWriteStrategy .ItemOverwrite =>
945- Exceptions .canBeTransientFailure(statusCode, subStatusCode) ||
946- statusCode == 0 || // Gateway mode reports inability to connect due to PoolAcquirePendingLimitException as status code 0
947- Exceptions .isNotFoundExceptionCore(statusCode, subStatusCode)
948- case _ =>
949- Exceptions .canBeTransientFailure(statusCode, subStatusCode) ||
950- statusCode == 0 // Gateway mode reports inability to connect due to PoolAcquirePendingLimitException as status code 0
951- }
952- }
892+ var returnValue = false
893+ if (operationContext.attemptNumber < writeConfig.maxRetryCount) {
894+ // Transactional batches only support ItemOverwrite (upsert)
895+ // Upsert can return 404/0 in rare cases (when due to TTL expiration there is a race condition)
896+ returnValue = Exceptions .canBeTransientFailure(statusCode, subStatusCode) ||
897+ statusCode == 0 || // Gateway mode reports inability to connect due to PoolAcquirePendingLimitException as status code 0
898+ Exceptions .isNotFoundExceptionCore(statusCode, subStatusCode)
899+ }
953900
954901 log.logDebug(s " Should retry statusCode ' $statusCode: $subStatusCode' -> $returnValue, " +
955902 s " Context: ${operationContext.toString} $getThreadInfo" )
956903
957904 returnValue
958905 }
959906
960- private def shouldRetryForItemPatchBulkUpdate (statusCode : Int , subStatusCode : Int ): Boolean = {
961- Exceptions .canBeTransientFailure(statusCode, subStatusCode) ||
962- statusCode == 0 || // Gateway mode reports inability to connect due to
963- // PoolAcquirePendingLimitException as status code 0
964- Exceptions .isResourceExistsException(statusCode) ||
965- Exceptions .isPreconditionFailedException(statusCode)
966- }
967-
968907 private def getId (objectNode : ObjectNode ) = {
969908 val idField = objectNode.get(CosmosConstants .Properties .Id )
970909 assume(idField != null && idField.isTextual)
0 commit comments