Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ private void validateRuleOnGatewayConnection() {
.isMetadataOperationType(this.condition)) {
if (serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.TOO_MANY_REQUEST
&& serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.RESPONSE_DELAY
&& serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.CONNECTION_DELAY) {
&& serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.CONNECTION_DELAY
&& serverErrorResult.getServerErrorType() != FaultInjectionServerErrorType.CHANNEL_EXCEPTION) {

throw new IllegalArgumentException("Error type " + serverErrorResult.getServerErrorType() + " is not supported for rule with metadata request");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,10 @@ public enum FaultInjectionServerErrorType {
* this will swap role of the primary replica and the first secondary replica (p -> s, s -> p).
* The scramble address rule will only stop when there is force address refresh happened.
*/
SCRAMBLE_ADDRESS
SCRAMBLE_ADDRESS,

/***
* this will simulate a http socket exception
*/
CHANNEL_EXCEPTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class FaultInjectionRuleStore {
private final Set<FaultInjectionConnectionErrorRule> connectionErrorRuleSet = ConcurrentHashMap.newKeySet();
private final Set<FaultInjectionServerErrorRule> serverResponseReduceLocalLSNRuleSet = ConcurrentHashMap.newKeySet();
private final Set<FaultInjectionServerErrorRule> scrambleAddressRuleSet = ConcurrentHashMap.newKeySet();
private final Set<FaultInjectionServerErrorRule> gatewayChannelErrorRuleSet = ConcurrentHashMap.newKeySet();


private final FaultInjectionRuleProcessor ruleProcessor;

Expand Down Expand Up @@ -79,6 +81,9 @@ public Mono<IFaultInjectionRuleInternal> configureFaultInjectionRule(FaultInject
case SCRAMBLE_ADDRESS_AND_REDUCE:
this.scrambleAddressRuleSet.add(serverErrorRule);
break;
case CHANNEL_EXCEPTION:
this.gatewayChannelErrorRuleSet.add(serverErrorRule);
break;
default:
this.serverResponseErrorRuleSet.add(serverErrorRule);
break;
Expand Down Expand Up @@ -115,6 +120,17 @@ public FaultInjectionServerErrorRule findServerResponseErrorRule(FaultInjectionR
return null;
}

public FaultInjectionServerErrorRule findGatewayChannelError(FaultInjectionRequestArgs requestArgs) {
for (FaultInjectionServerErrorRule serverResponseDelayRule : this.gatewayChannelErrorRuleSet) {
if (serverResponseDelayRule.getConnectionType() == GATEWAY
&& serverResponseDelayRule.isApplicable(requestArgs)) {
return serverResponseDelayRule;
}
}

return null;
}

public FaultInjectionServerErrorRule findServerConnectionDelayRule(FaultInjectionRequestArgs requestArgs) {
FaultInjectionConnectionType connectionType =
requestArgs instanceof RntbdFaultInjectionRequestArgs ? DIRECT : GATEWAY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,21 @@ public boolean injectServerResponseError(
return false;
}

public boolean injectGatewayChannelError(FaultInjectionRequestArgs faultInjectionRequestArgs) {
FaultInjectionServerErrorRule serverResponseErrorRule = this.ruleStore.findGatewayChannelError(faultInjectionRequestArgs);

if (serverResponseErrorRule != null) {
faultInjectionRequestArgs.getServiceRequest().faultInjectionRequestContext
.applyFaultInjectionRule(
faultInjectionRequestArgs.getTransportRequestId(),
serverResponseErrorRule.getId());

return true;
}

return false;
}

@Override
public boolean injectServerConnectionDelay(
FaultInjectionRequestArgs faultInjectionRequestArgs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.StoreResponseBuilder;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.VectorSessionToken;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
import com.azure.cosmos.implementation.throughputControl.TestItem;
import com.azure.cosmos.models.CosmosBatch;
import com.azure.cosmos.models.CosmosBatchRequestOptions;
import com.azure.cosmos.models.CosmosBatchResponse;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper;
import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType;
import com.azure.cosmos.test.faultinjection.FaultInjectionEndpointBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType;
import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders;
Expand All @@ -50,9 +56,11 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -389,9 +397,160 @@ public void fabianm_Manual_Repro_Test() {
return Mono.just(response);
})
.block();
}
}

@Test
public void batchTest() {
CosmosAsyncClient client = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
// add back the key
.key(TestConfigurations.MASTER_KEY)
.buildAsyncClient();

}
CosmosAsyncContainer container = client
.getDatabase("TestDatabase")
.getContainer("FiveTransTestContainer");

TestItem testItem = TestItem.createNewItem();
CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(testItem.getId()));
batch.createItemOperation(testItem);
batch.readItemOperation(testItem.getId());

FaultInjectionRule scrambledAddressForBatch = new FaultInjectionRuleBuilder("scrambledAddresses")
.condition(
new FaultInjectionConditionBuilder()
.operationType(FaultInjectionOperationType.BATCH_ITEM)
.build())
.result(
FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.SCRAMBLE_ADDRESS)
.build()
)
.build();

CosmosFaultInjectionHelper.configureFaultInjectionRules(container, Arrays.asList(scrambledAddressForBatch)).block();

container.executeCosmosBatch(batch, new CosmosBatchRequestOptions())
.flatMap(response -> {
logger.info("Succeeded");
logger.info(response.getDiagnostics().toString());
return Mono.empty();
})
.onErrorResume(throwable -> {
logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode());
logger.info(
((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson());
return Mono.empty();
})
.block();
}

@Test
public void addressTests() {
CosmosAsyncClient client = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
// add back the key
.key(TestConfigurations.MASTER_KEY)
.buildAsyncClient();

CosmosAsyncContainer container = client
.getDatabase("TestDatabase")
.getContainer("FiveTransTestContainer");

TestItem testItem = TestItem.createNewItem();
CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(testItem.getId()));
batch.createItemOperation(testItem);
batch.readItemOperation(testItem.getId());
container.executeCosmosBatch(batch, new CosmosBatchRequestOptions()).block();

FaultInjectionRule addressRefreshRule = new FaultInjectionRuleBuilder("addressRefreshRule")
.condition(
new FaultInjectionConditionBuilder()
.operationType(FaultInjectionOperationType.METADATA_REQUEST_ADDRESS_REFRESH)
.connectionType(FaultInjectionConnectionType.GATEWAY)
.build())
.result(
FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.CONNECTION_DELAY)
.delay(Duration.ofSeconds(70))
.build()
)
.build();

FaultInjectionRule batch410 = new FaultInjectionRuleBuilder("batch410")
.condition(
new FaultInjectionConditionBuilder()
.operationType(FaultInjectionOperationType.BATCH_ITEM)
.build())
.result(
FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.SCRAMBLE_ADDRESS)
.build()
)
.build();

CosmosFaultInjectionHelper.configureFaultInjectionRules(container, Arrays.asList(addressRefreshRule, batch410)).block();
// start the request in a different thread
// Mono.just(this)
// .flatMap(t -> container.executeCosmosBatch(batch, new CosmosBatchRequestOptions()))
// .flatMap(response -> {
// logger.info("Succeeded");
// logger.info(response.getDiagnostics().toString());
// return Mono.empty();
// })
// .onErrorResume(throwable -> {
// logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode());
// logger.info(
// ((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson());
// return Mono.empty();
// })
// .subscribeOn(Schedulers.boundedElastic())
// .subscribe();

container.executeCosmosBatch(batch, new CosmosBatchRequestOptions())
.flatMap(response -> {
logger.info("Succeeded");
logger.info(response.getDiagnostics().toString());
return Mono.empty();
})
.onErrorResume(throwable -> {
logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode());
logger.info(
((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson());
return Mono.empty();
})
.block();


logger.info("Starting the second request");
container.executeCosmosBatch(batch, new CosmosBatchRequestOptions())
.flatMap(response -> {
logger.info("Succeeded");
logger.info(response.getDiagnostics().toString());
return Mono.empty();
})
.onErrorResume(throwable -> {
logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode());
logger.info(
((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson());
return Mono.empty();
})
.block();
//
// for (int i = 0; i < 10; i++) {
// addressRefreshRule.disable();
// container.executeCosmosBatch(batch, new CosmosBatchRequestOptions())
// .flatMap(response -> {
// logger.info("Succeeded");
// logger.info(response.getDiagnostics().toString());
// return Mono.empty();
// })
// .onErrorResume(throwable -> {
// logger.info("Final status code {}", ((CosmosException)throwable).getStatusCode());
// logger.info(
// ((CosmosException)throwable).getDiagnostics().getDiagnosticsContext().toJson());
// return Mono.empty();
// })
// .block();
// }
}

@Test(groups = "unit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,23 @@ public static Uri getPrimaryUri(RxDocumentServiceRequest request, List<AddressIn
}

public Mono<List<AddressInformation>> resolveAddressesAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh) {
// if there already has address being refreshed, then stop checking scramble address rule
Pair<Boolean, Boolean> shouldScrambleAddresses =
request.faultInjectionRequestContext.getAddressForceRefreshed() ?
Pair.of(false, false) : this.addressInjector.shouldScrambleAddresses(request);

Mono<List<AddressInformation>> resolvedAddressesObs =
(this.addressResolver.resolveAsync(request, forceAddressRefresh))
.map(addresses -> {
if (shouldScrambleAddresses.getLeft()) {
return scrambleAddresses(
Arrays.stream(addresses).collect(Collectors.toList()),
shouldScrambleAddresses.getRight())
.toArray(new AddressInformation[0]);
}

return addresses;
})
.map(addresses -> Arrays.stream(addresses)
.filter(address -> !Strings.isNullOrEmpty(address.getPhysicalUri().getURIAsString()) && Strings.areEqualIgnoreCase(address.getProtocolScheme(), this.protocol.scheme()))
.collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,6 @@ private Mono<List<Address>> getServerAddressesViaGatewayInternalAsync(RxDocument
JavaStreamUtils.toString(partitionKeyRangeIds, ","));
}

// track address refresh has happened, this is only meant to be used for fault injection validation
request.faultInjectionRequestContext.recordAddressForceRefreshed(forceRefresh);

String entryUrl = PathsHelper.generatePath(ResourceType.Document, collectionRid, true);
HashMap<String, String> addressQuery = new HashMap<>();

Expand Down Expand Up @@ -517,6 +514,9 @@ private Mono<List<Address>> getServerAddressesViaGatewayInternalAsync(RxDocument
null,
httpRequest.reactorNettyRequestRecord().getTransportRequestId());

// track address refresh has happened, this is only meant to be used for fault injection validation
request.faultInjectionRequestContext.recordAddressForceRefreshed(forceRefresh);

return dsr.getQueryResponse(null, Address.class);
}).onErrorResume(throwable -> {
Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public Pair<Boolean, Boolean> shouldScrambleAddresses(RxDocumentServiceRequest r
}

FaultInjectionRequestArgs createFaultInjectionRequestArgs(RxDocumentServiceRequest rxDocumentServiceRequest) {
return new FaultInjectionRequestArgs(0, null, false, rxDocumentServiceRequest) {
boolean isPrimary = rxDocumentServiceRequest.isReadOnlyRequest() ? false : true;
return new FaultInjectionRequestArgs(0, null, isPrimary, rxDocumentServiceRequest) {
@Override
public List<String> getPartitionKeyRangeIds() {
if(rxDocumentServiceRequest.getPartitionKeyRangeIdentity() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import io.netty.channel.ChannelException;
import io.netty.channel.ConnectTimeoutException;
import io.netty.handler.timeout.ReadTimeoutException;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -150,6 +151,10 @@ public Mono<HttpResponse> injectGatewayErrors(
return Mono.error(exceptionToBeInjected.v);
}

if (this.injectGatewayChannelError(faultInjectionRequestArgs)) {
return Mono.error(new ChannelException("Fake channel exception"));
}

if (this.injectGatewayServerConnectionDelay(faultInjectionRequestArgs, delayToBeInjected)) {
Duration connectionAcquireTimeout = this.configs.getConnectionAcquireTimeout();
if (delayToBeInjected.v.toMillis() >= connectionAcquireTimeout.toMillis()) {
Expand Down Expand Up @@ -221,6 +226,15 @@ private boolean injectGatewayServerResponseError(
return false;
}

private boolean injectGatewayChannelError(FaultInjectionRequestArgs faultInjectionRequestArgs) {
for (IServerErrorInjector serverErrorInjector : faultInjectors) {
if(serverErrorInjector.injectGatewayChannelError(faultInjectionRequestArgs)) {
return true;
}
}
return false;
}

private boolean injectGatewayServerConnectionDelay(
FaultInjectionRequestArgs faultInjectionRequestArgs,
Utils.ValueHolder<Duration> delayToBeInjected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ boolean injectServerResponseError(
FaultInjectionRequestArgs faultInjectionRequestArgs,
Utils.ValueHolder<CosmosException> injectedException);

boolean injectGatewayChannelError(FaultInjectionRequestArgs faultInjectionRequestArgs);

/***
* Inject server connection delay error.
*
Expand Down
Loading