diff --git a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/faultinjection/FaultInjectionRuleBuilder.java b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/faultinjection/FaultInjectionRuleBuilder.java index 303d5ee93328..25d6e4a3c75f 100644 --- a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/faultinjection/FaultInjectionRuleBuilder.java +++ b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/faultinjection/FaultInjectionRuleBuilder.java @@ -171,7 +171,8 @@ private void validateRuleOnGatewayConnection() { .isMetadataOperationType(this.condition)) { if (serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.TOO_MANY_REQUEST && serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.RESPONSE_DELAY - && serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.CONNECTION_DELAY) { + && serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.CONNECTION_DELAY + && serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.CHANNEL_EXCEPTION) { throw new IllegalArgumentException("Error type " + serverErrorResult.getServerErrorType() + " is not supported for rule with metadata request"); } diff --git a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorType.java b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorType.java index 45e0d4f20955..3dac98512bc8 100644 --- a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorType.java +++ b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorType.java @@ -70,5 +70,10 @@ public enum FaultInjectionServerErrorType { * this will swap role of the primary replica and the first secondary replica (p -> s, s -> p). * The scramble address rule will only stop when there is force address refresh happened. */ - SCRAMBLE_ADDRESS + SCRAMBLE_ADDRESS, + + /*** + * this will simulate a http socket exception + */ + CHANNEL_EXCEPTION } diff --git a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleStore.java b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleStore.java index 3e09c30cf0ef..5688fb3eb449 100644 --- a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleStore.java +++ b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleStore.java @@ -29,6 +29,8 @@ public class FaultInjectionRuleStore { private final Set connectionErrorRuleSet = ConcurrentHashMap.newKeySet(); private final Set serverResponseReduceLocalLSNRuleSet = ConcurrentHashMap.newKeySet(); private final Set scrambleAddressRuleSet = ConcurrentHashMap.newKeySet(); + private final Set gatewayChannelErrorRuleSet = ConcurrentHashMap.newKeySet(); + private final FaultInjectionRuleProcessor ruleProcessor; @@ -79,6 +81,9 @@ public Mono configureFaultInjectionRule(FaultInject case SCRAMBLE_ADDRESS_AND_REDUCE: this.scrambleAddressRuleSet.add(serverErrorRule); break; + case CHANNEL_EXCEPTION: + this.gatewayChannelErrorRuleSet.add(serverErrorRule); + break; default: this.serverResponseErrorRuleSet.add(serverErrorRule); break; @@ -115,6 +120,17 @@ public FaultInjectionServerErrorRule findServerResponseErrorRule(FaultInjectionR return null; } + public FaultInjectionServerErrorRule findGatewayChannelError(FaultInjectionRequestArgs requestArgs) { + for (FaultInjectionServerErrorRule serverResponseDelayRule : this.gatewayChannelErrorRuleSet) { + if (serverResponseDelayRule.getConnectionType() == GATEWAY + && serverResponseDelayRule.isApplicable(requestArgs)) { + return serverResponseDelayRule; + } + } + + return null; + } + public FaultInjectionServerErrorRule findServerConnectionDelayRule(FaultInjectionRequestArgs requestArgs) { FaultInjectionConnectionType connectionType = requestArgs instanceof RntbdFaultInjectionRequestArgs ? DIRECT : GATEWAY; diff --git a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/ServerErrorInjector.java b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/ServerErrorInjector.java index f548b27952aa..08f6f54f3c1d 100644 --- a/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/ServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/ServerErrorInjector.java @@ -105,6 +105,21 @@ public boolean injectServerResponseError( return false; } + public boolean injectGatewayChannelError(FaultInjectionRequestArgs faultInjectionRequestArgs) { + FaultInjectionServerErrorRule serverResponseErrorRule = this.ruleStore.findGatewayChannelError(faultInjectionRequestArgs); + + if (serverResponseErrorRule != null) { + faultInjectionRequestArgs.getServiceRequest().faultInjectionRequestContext + .applyFaultInjectionRule( + faultInjectionRequestArgs.getTransportRequestId(), + serverResponseErrorRule.getId()); + + return true; + } + + return false; + } + @Override public boolean injectServerConnectionDelay( FaultInjectionRequestArgs faultInjectionRequestArgs, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java index ce3e4f4f0af5..858e9e08ef29 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReaderTest.java @@ -25,14 +25,20 @@ import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.StoreResponseBuilder; +import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.VectorSessionToken; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; +import com.azure.cosmos.implementation.throughputControl.TestItem; +import com.azure.cosmos.models.CosmosBatch; +import com.azure.cosmos.models.CosmosBatchRequestOptions; +import com.azure.cosmos.models.CosmosBatchResponse; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; import com.azure.cosmos.test.faultinjection.FaultInjectionEndpointBuilder; import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; @@ -50,9 +56,11 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.math.BigDecimal; import java.math.RoundingMode; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -389,9 +397,160 @@ public void fabianm_Manual_Repro_Test() { return Mono.just(response); }) .block(); + } + } + @Test + public void batchTest() { + CosmosAsyncClient client = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + // add back the key + .key(TestConfigurations.MASTER_KEY) + .buildAsyncClient(); - } + CosmosAsyncContainer container = client + .getDatabase("TestDatabase") + .getContainer("FiveTransTestContainer"); + + TestItem testItem = TestItem.createNewItem(); + CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(testItem.getId())); + batch.createItemOperation(testItem); + batch.readItemOperation(testItem.getId()); + + FaultInjectionRule scrambledAddressForBatch = new FaultInjectionRuleBuilder("scrambledAddresses") + .condition( + new FaultInjectionConditionBuilder() + .operationType(FaultInjectionOperationType.BATCH_ITEM) + .build()) + .result( + FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.SCRAMBLE_ADDRESS) + .build() + ) + .build(); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(container, Arrays.asList(scrambledAddressForBatch)).block(); + + container.executeCosmosBatch(batch, new CosmosBatchRequestOptions()) + .flatMap(response -> { + logger.info("Succeeded"); + logger.info(response.getDiagnostics().toString()); + return Mono.empty(); + }) + .onErrorResume(throwable -> { + logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode()); + logger.info( + ((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson()); + return Mono.empty(); + }) + .block(); + } + + @Test + public void addressTests() { + CosmosAsyncClient client = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + // add back the key + .key(TestConfigurations.MASTER_KEY) + .buildAsyncClient(); + + CosmosAsyncContainer container = client + .getDatabase("TestDatabase") + .getContainer("FiveTransTestContainer"); + + TestItem testItem = TestItem.createNewItem(); + CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(testItem.getId())); + batch.createItemOperation(testItem); + batch.readItemOperation(testItem.getId()); + container.executeCosmosBatch(batch, new CosmosBatchRequestOptions()).block(); + + FaultInjectionRule addressRefreshRule = new FaultInjectionRuleBuilder("addressRefreshRule") + .condition( + new FaultInjectionConditionBuilder() + .operationType(FaultInjectionOperationType.METADATA_REQUEST_ADDRESS_REFRESH) + .connectionType(FaultInjectionConnectionType.GATEWAY) + .build()) + .result( + FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.CONNECTION_DELAY) + .delay(Duration.ofSeconds(70)) + .build() + ) + .build(); + + FaultInjectionRule batch410 = new FaultInjectionRuleBuilder("batch410") + .condition( + new FaultInjectionConditionBuilder() + .operationType(FaultInjectionOperationType.BATCH_ITEM) + .build()) + .result( + FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.SCRAMBLE_ADDRESS) + .build() + ) + .build(); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(container, Arrays.asList(addressRefreshRule, batch410)).block(); + // start the request in a different thread +// Mono.just(this) +// .flatMap(t -> container.executeCosmosBatch(batch, new CosmosBatchRequestOptions())) +// .flatMap(response -> { +// logger.info("Succeeded"); +// logger.info(response.getDiagnostics().toString()); +// return Mono.empty(); +// }) +// .onErrorResume(throwable -> { +// logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode()); +// logger.info( +// ((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson()); +// return Mono.empty(); +// }) +// .subscribeOn(Schedulers.boundedElastic()) +// .subscribe(); + + container.executeCosmosBatch(batch, new CosmosBatchRequestOptions()) + .flatMap(response -> { + logger.info("Succeeded"); + logger.info(response.getDiagnostics().toString()); + return Mono.empty(); + }) + .onErrorResume(throwable -> { + logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode()); + logger.info( + ((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson()); + return Mono.empty(); + }) + .block(); + + + logger.info("Starting the second request"); + container.executeCosmosBatch(batch, new CosmosBatchRequestOptions()) + .flatMap(response -> { + logger.info("Succeeded"); + logger.info(response.getDiagnostics().toString()); + return Mono.empty(); + }) + .onErrorResume(throwable -> { + logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode()); + logger.info( + ((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson()); + return Mono.empty(); + }) + .block(); +// +// for (int i = 0; i < 10; i++) { +// addressRefreshRule.disable(); +// container.executeCosmosBatch(batch, new CosmosBatchRequestOptions()) +// .flatMap(response -> { +// logger.info("Succeeded"); +// logger.info(response.getDiagnostics().toString()); +// return Mono.empty(); +// }) +// .onErrorResume(throwable -> { +// logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode()); +// logger.info( +// ((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson()); +// return Mono.empty(); +// }) +// .block(); +// } } @Test(groups = "unit") diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressSelector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressSelector.java index cdc634be5eea..0ab2cd3baec1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressSelector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressSelector.java @@ -171,8 +171,23 @@ public static Uri getPrimaryUri(RxDocumentServiceRequest request, List> resolveAddressesAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh) { + // if there already has address being refreshed, then stop checking scramble address rule + Pair shouldScrambleAddresses = + request.faultInjectionRequestContext.getAddressForceRefreshed() ? + Pair.of(false, false) : this.addressInjector.shouldScrambleAddresses(request); + Mono> resolvedAddressesObs = (this.addressResolver.resolveAsync(request, forceAddressRefresh)) + .map(addresses -> { + if (shouldScrambleAddresses.getLeft()) { + return scrambleAddresses( + Arrays.stream(addresses).collect(Collectors.toList()), + shouldScrambleAddresses.getRight()) + .toArray(new AddressInformation[0]); + } + + return addresses; + }) .map(addresses -> Arrays.stream(addresses) .filter(address -> !Strings.isNullOrEmpty(address.getPhysicalUri().getURIAsString()) && Strings.areEqualIgnoreCase(address.getProtocolScheme(), this.protocol.scheme())) .collect(Collectors.toList())); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java index 764a306bb04c..435d7127b121 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java @@ -410,9 +410,6 @@ private Mono> getServerAddressesViaGatewayInternalAsync(RxDocument JavaStreamUtils.toString(partitionKeyRangeIds, ",")); } - // track address refresh has happened, this is only meant to be used for fault injection validation - request.faultInjectionRequestContext.recordAddressForceRefreshed(forceRefresh); - String entryUrl = PathsHelper.generatePath(ResourceType.Document, collectionRid, true); HashMap addressQuery = new HashMap<>(); @@ -517,6 +514,9 @@ private Mono> getServerAddressesViaGatewayInternalAsync(RxDocument null, httpRequest.reactorNettyRequestRecord().getTransportRequestId()); + // track address refresh has happened, this is only meant to be used for fault injection validation + request.faultInjectionRequestContext.recordAddressForceRefreshed(forceRefresh); + return dsr.getQueryResponse(null, Address.class); }).onErrorResume(throwable -> { Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/AddressInjector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/AddressInjector.java index 6eba4516d73b..db5fa14698ef 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/AddressInjector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/AddressInjector.java @@ -33,7 +33,8 @@ public Pair shouldScrambleAddresses(RxDocumentServiceRequest r } FaultInjectionRequestArgs createFaultInjectionRequestArgs(RxDocumentServiceRequest rxDocumentServiceRequest) { - return new FaultInjectionRequestArgs(0, null, false, rxDocumentServiceRequest) { + boolean isPrimary = rxDocumentServiceRequest.isReadOnlyRequest() ? false : true; + return new FaultInjectionRequestArgs(0, null, isPrimary, rxDocumentServiceRequest) { @Override public List getPartitionKeyRangeIds() { if(rxDocumentServiceRequest.getPartitionKeyRangeIdentity() != null) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java index 7c93bf31d04c..83a18a824d5a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java @@ -19,6 +19,7 @@ import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; +import io.netty.channel.ChannelException; import io.netty.channel.ConnectTimeoutException; import io.netty.handler.timeout.ReadTimeoutException; import reactor.core.publisher.Mono; @@ -150,6 +151,10 @@ public Mono injectGatewayErrors( return Mono.error(exceptionToBeInjected.v); } + if (this.injectGatewayChannelError(faultInjectionRequestArgs)) { + return Mono.error(new ChannelException("Fake channel exception")); + } + if (this.injectGatewayServerConnectionDelay(faultInjectionRequestArgs, delayToBeInjected)) { Duration connectionAcquireTimeout = this.configs.getConnectionAcquireTimeout(); if (delayToBeInjected.v.toMillis() >= connectionAcquireTimeout.toMillis()) { @@ -221,6 +226,15 @@ private boolean injectGatewayServerResponseError( return false; } + private boolean injectGatewayChannelError(FaultInjectionRequestArgs faultInjectionRequestArgs) { + for (IServerErrorInjector serverErrorInjector : faultInjectors) { + if(serverErrorInjector.injectGatewayChannelError(faultInjectionRequestArgs)) { + return true; + } + } + return false; + } + private boolean injectGatewayServerConnectionDelay( FaultInjectionRequestArgs faultInjectionRequestArgs, Utils.ValueHolder delayToBeInjected) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/IServerErrorInjector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/IServerErrorInjector.java index 01f92d6c113c..ecb9cc0e5ceb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/IServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/IServerErrorInjector.java @@ -48,6 +48,8 @@ boolean injectServerResponseError( FaultInjectionRequestArgs faultInjectionRequestArgs, Utils.ValueHolder injectedException); + boolean injectGatewayChannelError(FaultInjectionRequestArgs faultInjectionRequestArgs); + /*** * Inject server connection delay error. *