Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@
import com.azure.cosmos.TestObject;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.models.CosmosBatch;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.rx.TestSuiteBase;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public abstract class FaultInjectionTestBase extends TestSuiteBase {
public FaultInjectionTestBase(CosmosClientBuilder cosmosClientBuilder) {
super(cosmosClientBuilder);
Expand All @@ -32,6 +39,24 @@ protected CosmosDiagnostics performDocumentOperation(
OperationType operationType,
TestObject createdItem,
boolean isReadMany) {

return performDocumentOperation(
cosmosAsyncContainer,
operationType,
createdItem,
isReadMany,
true,
false);
}

protected CosmosDiagnostics performDocumentOperation(
CosmosAsyncContainer cosmosAsyncContainer,
OperationType operationType,
TestObject createdItem,
boolean isReadMany,
boolean fetchFeedRangesBeforehandForChangeFeed,
boolean isBulkOperation) {

try {
if (operationType == OperationType.Query && !isReadMany) {
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
Expand Down Expand Up @@ -87,24 +112,61 @@ protected CosmosDiagnostics performDocumentOperation(
}

if (operationType == OperationType.Batch) {
CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(createdItem.getId()));

batch.upsertItemOperation(createdItem);
batch.readItemOperation(createdItem.getId());
if (isBulkOperation) {

List<CosmosItemOperation> cosmosItemOperations = new ArrayList<>();

CosmosItemOperation cosmosItemOperation = CosmosBulkOperations.getReadItemOperation(
createdItem.getId(),
new PartitionKey(createdItem.getId()),
TestObject.class);

cosmosItemOperations.add(cosmosItemOperation);

return cosmosAsyncContainer.executeCosmosBatch(batch).block().getDiagnostics();
Flux<CosmosItemOperation> operationsFlux = Flux.fromIterable(cosmosItemOperations);

CosmosBulkOperationResponse<Object> response = cosmosAsyncContainer.executeBulkOperations(operationsFlux).blockLast();

assertThat(response).isNotNull();

return response.getResponse().getCosmosDiagnostics();
} else {
CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(createdItem.getId()));

batch.upsertItemOperation(createdItem);
batch.readItemOperation(createdItem.getId());

return cosmosAsyncContainer.executeCosmosBatch(batch).block().getDiagnostics();
}
}
}

if (operationType == OperationType.ReadFeed) {
List<FeedRange> feedRanges = cosmosAsyncContainer.getFeedRanges().block();

if (fetchFeedRangesBeforehandForChangeFeed) {
List<FeedRange> feedRanges = cosmosAsyncContainer.getFeedRanges().block();

assertThat(feedRanges).isNotNull();
assertThat(feedRanges.size()).isGreaterThan(0);

CosmosChangeFeedRequestOptions changeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(feedRanges.get(0));

FeedResponse<TestObject> firstPage = cosmosAsyncContainer
.queryChangeFeed(changeFeedRequestOptions, TestObject.class)
.byPage()
.blockLast();
return firstPage.getCosmosDiagnostics();
}

CosmosChangeFeedRequestOptions changeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(feedRanges.get(0));
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());

FeedResponse<TestObject> firstPage = cosmosAsyncContainer
.queryChangeFeed(changeFeedRequestOptions, TestObject.class)
.byPage()
.blockFirst();
.blockLast();
return firstPage.getCosmosDiagnostics();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public void staledException(int statusCode, int subStatusCode, boolean expectRet
null,
null,
sessionContainer,
TestUtils.mockDiagnosticsClientContext()
TestUtils.mockDiagnosticsClientContext(),
null
);

CosmosException exception = BridgeInternal.createCosmosException(statusCode);
Expand Down Expand Up @@ -88,7 +89,8 @@ public void suppressRetryForExternalCollectionRid() {
null,
customHeaders,
sessionContainer,
TestUtils.mockDiagnosticsClientContext()
TestUtils.mockDiagnosticsClientContext(),
null
);

InvalidPartitionException invalidPartitionException = new InvalidPartitionException();
Expand Down Expand Up @@ -125,7 +127,8 @@ public void cleanSessionToken() {
null,
null,
sessionContainer,
TestUtils.mockDiagnosticsClientContext()
TestUtils.mockDiagnosticsClientContext(),
null
);

InvalidPartitionException invalidPartitionException = new InvalidPartitionException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,13 @@ public void setRequestUri(CosmosException cosmosException, Uri requestUri) {
public Uri getRequestUri(CosmosException cosmosException) {
return cosmosException.getRequestUri();
}

@Override
public void setSubStatusCode(CosmosException cosmosException, int subStatusCode) {
cosmosException.setSubStatusCode(subStatusCode);
}


});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,7 @@ public interface CosmosExceptionAccessor {
List<String> getFaultInjectionEvaluationResults(CosmosException cosmosException);
void setRequestUri(CosmosException cosmosException, Uri requestUri);
Uri getRequestUri(CosmosException cosmosException);
void setSubStatusCode(CosmosException cosmosException, int subStatusCode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,9 @@ private <T> Flux<FeedResponse<T>> createQuery(
qryOptAccessor.getProperties(nonNullQueryOptions),
qryOptAccessor.getHeaders(nonNullQueryOptions),
this.sessionContainer,
diagnosticsFactory);
diagnosticsFactory,
ResourceType.Document
);

return
ObservableHelper.fluxInlineIfPossibleAsObs(
Expand Down Expand Up @@ -2605,7 +2607,8 @@ private Mono<ResourceResponse<Document>> createDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
collectionLink);
collectionLink
);

AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Expand Down Expand Up @@ -2992,7 +2995,8 @@ private Mono<ResourceResponse<Document>> upsertDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
collectionLink);;
collectionLink
);
AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Consumer<CosmosException> gwModeE2ETimeoutDiagnosticHandler
Expand Down Expand Up @@ -3134,7 +3138,8 @@ private Mono<ResourceResponse<Document>> replaceDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
Utils.getCollectionName(documentLink));
Utils.getCollectionName(documentLink)
);
AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Consumer<CosmosException> gwModeE2ETimeoutDiagnosticHandler
Expand Down Expand Up @@ -3467,7 +3472,8 @@ private Mono<ResourceResponse<Document>> patchDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
Utils.getCollectionName(documentLink));
Utils.getCollectionName(documentLink)
);

AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Expand Down Expand Up @@ -3680,7 +3686,8 @@ private Mono<ResourceResponse<Document>> deleteDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
Utils.getCollectionName(documentLink));
Utils.getCollectionName(documentLink)
);

AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Expand Down Expand Up @@ -3872,7 +3879,8 @@ private Mono<ResourceResponse<Document>> readDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
Utils.getCollectionName(documentLink));
Utils.getCollectionName(documentLink)
);

AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Expand Down Expand Up @@ -4021,7 +4029,9 @@ public <T> Mono<FeedResponse<T>> readMany(
qryOptAccessor.getProperties(state.getQueryOptions()),
qryOptAccessor.getHeaders(state.getQueryOptions()),
this.sessionContainer,
diagnosticsFactory);
diagnosticsFactory,
ResourceType.Document
);

return ObservableHelper
.inlineIfPossibleAsObs(
Expand Down Expand Up @@ -4718,7 +4728,9 @@ public <T> Flux<FeedResponse<T>> queryDocumentChangeFeedFromPagedFlux(
changeFeedOptionsAccessor.getProperties(state.getChangeFeedOptions()),
changeFeedOptionsAccessor.getHeaders(state.getChangeFeedOptions()),
this.sessionContainer,
diagnosticsFactory);
diagnosticsFactory,
ResourceType.Document
);

return ObservableHelper
.fluxInlineIfPossibleAsObs(
Expand All @@ -4743,6 +4755,17 @@ private <T> Flux<FeedResponse<T>> queryDocumentChangeFeedFromPagedFluxInternal(

return this.getCollectionCache()
.resolveByNameAsync(null, collectionLink, null)
.onErrorMap(throwable -> {
if (throwable instanceof CosmosException) {

CosmosException cosmosException = Utils.as(throwable, CosmosException.class);
BridgeInternal.setSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS);

return cosmosException;
}

return throwable;
})
.flatMapMany(collection -> {
if (collection == null) {
throw new IllegalStateException("Collection can not be null");
Expand Down Expand Up @@ -4863,7 +4886,9 @@ public <T> Flux<FeedResponse<T>> readAllDocuments(
qryOptAccessor.getProperties(effectiveOptions),
qryOptAccessor.getHeaders(effectiveOptions),
this.sessionContainer,
diagnosticsFactory);
diagnosticsFactory,
ResourceType.Document
);

Flux<FeedResponse<T>> innerFlux = ObservableHelper.fluxInlineIfPossibleAsObs(
() -> {
Expand Down Expand Up @@ -5192,7 +5217,9 @@ public Mono<CosmosBatchResponse> executeBatchRequest(String collectionLink,
nonNullRequestOptions.getProperties(),
nonNullRequestOptions.getHeaders(),
this.sessionContainer,
scopedDiagnosticsFactory);
scopedDiagnosticsFactory,
ResourceType.Document
);
}

final DocumentClientRetryPolicy finalRetryPolicy = documentClientRetryPolicy;
Expand Down Expand Up @@ -6629,7 +6656,9 @@ public Mono<List<FeedRange>> getFeedRanges(String collectionLink, boolean forceR
new HashMap<>(),
new HashMap<>(),
this.sessionContainer,
null);
null,
ResourceType.PartitionKeyRange
);

RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
this,
Expand Down Expand Up @@ -8016,7 +8045,9 @@ private DocumentClientRetryPolicy getRetryPolicyForPointOperation(
requestOptions.getProperties(),
requestOptions.getHeaders(),
this.sessionContainer,
diagnosticsClientContext);
diagnosticsClientContext,
ResourceType.Document
);

return requestRetryPolicy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class StaleResourceRetryPolicy extends DocumentClientRetryPolicy {

private final static Logger logger = LoggerFactory.getLogger(StaleResourceRetryPolicy.class);

private final static ImplementationBridgeHelpers.CosmosExceptionHelper.CosmosExceptionAccessor cosmosExceptionAccessor =
ImplementationBridgeHelpers.CosmosExceptionHelper.getCosmosExceptionAccessor();

private final RxCollectionCache clientCollectionCache;
private final DocumentClientRetryPolicy nextPolicy;
private final String collectionLink;
Expand All @@ -33,6 +36,7 @@ public class StaleResourceRetryPolicy extends DocumentClientRetryPolicy {
private RxDocumentServiceRequest request;
private final DiagnosticsClientContext diagnosticsClientContext;
private final AtomicReference<CosmosDiagnostics> cosmosDiagnosticsHolder;
private final ResourceType enclosingOperationTargetResourceType;

private volatile boolean retried = false;

Expand All @@ -43,7 +47,8 @@ public StaleResourceRetryPolicy(
Map<String, Object> requestOptionProperties,
Map<String, String> requestCustomHeaders,
ISessionContainer sessionContainer,
DiagnosticsClientContext diagnosticsClientContext) {
DiagnosticsClientContext diagnosticsClientContext,
ResourceType enclosingOperationTargetResourceType) {

this.clientCollectionCache = collectionCache;
this.nextPolicy = nextPolicy;
Expand All @@ -56,6 +61,8 @@ public StaleResourceRetryPolicy(

this.diagnosticsClientContext = diagnosticsClientContext;
this.cosmosDiagnosticsHolder = new AtomicReference<>(null); // will only create one if no request is bound to the retry policy

this.enclosingOperationTargetResourceType = enclosingOperationTargetResourceType;
}

@Override
Expand Down Expand Up @@ -120,6 +127,29 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
}

return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
})
.onErrorMap(throwable -> {

if (throwable instanceof CosmosException) {

CosmosException cosmosException = Utils.as(throwable, CosmosException.class);

if (this.request != null && !ResourceType.DocumentCollection.equals(this.request.getResourceType()) && Exceptions.isNotFound(cosmosException)) {
cosmosExceptionAccessor.setSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS);

return cosmosException;
}

if (this.enclosingOperationTargetResourceType != null && !ResourceType.DocumentCollection.equals(this.enclosingOperationTargetResourceType)) {
cosmosExceptionAccessor.setSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS);

return cosmosException;
}

return cosmosException;
}

return throwable;
});

} else {
Expand Down
Loading
Loading