Skip to content

Commit aee2a77

Browse files
xinlian12annie-mac
andauthored
RecreateContainerForQueryFix (#45930)
* initial change for StaledResourceException --------- Co-authored-by: annie-mac <[email protected]>
1 parent e0a847c commit aee2a77

39 files changed

+1676
-705
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosExceptionTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import com.azure.cosmos.implementation.ConflictException;
88
import com.azure.cosmos.implementation.ForbiddenException;
99
import com.azure.cosmos.implementation.GoneException;
10-
import com.azure.cosmos.implementation.HttpConstants;
1110
import com.azure.cosmos.implementation.InternalServerErrorException;
1211
import com.azure.cosmos.implementation.InvalidPartitionException;
1312
import com.azure.cosmos.implementation.LockedException;

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,7 @@ public void faultInjectionServerErrorRuleTests_ServerErrorResponse(
885885
.times(1)
886886
.build()
887887
)
888+
.hitLimit(1) // for read feed staled resource exception, need to also configure this to limit the total injection count
888889
.duration(Duration.ofMinutes(5))
889890
.build();
890891

@@ -1380,31 +1381,46 @@ private void validateFaultInjectionRuleApplied(
13801381
String ruleId,
13811382
boolean canRetryOnFaultInjectedError) throws JsonProcessingException {
13821383

1383-
List<ObjectNode> diagnosticsNode = new ArrayList<>();
1384-
if ((operationType == OperationType.Query || operationType == OperationType.ReadFeed) && canRetryOnFaultInjectedError) {
1385-
ObjectNode cosmosDiagnosticsNode = (ObjectNode) Utils.getSimpleObjectMapper().readTree(cosmosDiagnostics.toString());
1386-
for (JsonNode node : cosmosDiagnosticsNode.get("clientSideRequestStatistics")) {
1387-
diagnosticsNode.add((ObjectNode) node);
1384+
List<ObjectNode> clientSideRequestStatisticsNodes = new ArrayList<>();
1385+
assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull();
1386+
1387+
for (CosmosDiagnostics diagnostics : cosmosDiagnostics.getDiagnosticsContext().getDiagnostics()) {
1388+
if (operationType == OperationType.Query && canRetryOnFaultInjectedError) {
1389+
ObjectNode cosmosDiagnosticsNode = (ObjectNode) Utils.getSimpleObjectMapper().readTree(diagnostics.toString());
1390+
if (cosmosDiagnosticsNode.has("clientSideRequestStatistics")) { // query plan diagnostics will not have clientSideRequestStatistics
1391+
for (JsonNode node : cosmosDiagnosticsNode.get("clientSideRequestStatistics")) {
1392+
clientSideRequestStatisticsNodes.add((ObjectNode) node);
1393+
}
1394+
}
1395+
} else {
1396+
clientSideRequestStatisticsNodes.add((ObjectNode) Utils.getSimpleObjectMapper().readTree(diagnostics.toString()));
13881397
}
1389-
} else {
1390-
diagnosticsNode.add((ObjectNode) Utils.getSimpleObjectMapper().readTree(cosmosDiagnostics.toString()));
13911398
}
13921399

1393-
for (ObjectNode diagnosticNode : diagnosticsNode) {
1400+
List<JsonNode> responseStatisticsNodes = new ArrayList<>();
1401+
for (ObjectNode diagnosticNode : clientSideRequestStatisticsNodes) {
13941402
JsonNode responseStatisticsList = diagnosticNode.get("responseStatisticsList");
13951403
assertThat(responseStatisticsList.isArray()).isTrue();
13961404

1397-
if (canRetryOnFaultInjectedError) {
1398-
assertThat(responseStatisticsList.size()).isGreaterThanOrEqualTo(2);
1399-
} else {
1400-
assertThat(responseStatisticsList.size()).isOne();
1405+
for (JsonNode responseStatisticsNode : responseStatisticsList) {
1406+
responseStatisticsNodes.add(responseStatisticsNode);
14011407
}
1402-
JsonNode storeResult = responseStatisticsList.get(0).get("storeResult");
1403-
assertThat(storeResult).isNotNull();
1404-
assertThat(storeResult.get("statusCode").asInt()).isEqualTo(statusCode);
1405-
assertThat(storeResult.get("subStatusCode").asInt()).isEqualTo(subStatusCode);
1406-
assertThat(storeResult.get("faultInjectionRuleId").asText()).isEqualTo(ruleId);
14071408
}
1409+
1410+
if (canRetryOnFaultInjectedError) {
1411+
assertThat(responseStatisticsNodes.size()).isGreaterThanOrEqualTo(2);
1412+
} else {
1413+
assertThat(responseStatisticsNodes.size()).isOne();
1414+
}
1415+
1416+
assertThat(responseStatisticsNodes.stream().anyMatch(responseStatisticsNode -> {
1417+
JsonNode storeResultNode = responseStatisticsNode.get("storeResult");
1418+
assertThat(storeResultNode).isNotNull();
1419+
1420+
return (storeResultNode.get("statusCode").asInt() == statusCode)
1421+
&& (storeResultNode.get("subStatusCode").asInt() == subStatusCode)
1422+
&& (storeResultNode.has("faultInjectionRuleId") && storeResultNode.get("faultInjectionRuleId").asText().equals(ruleId));
1423+
})).isTrue();
14081424
}
14091425

14101426
private void validateNoFaultInjectionApplied(

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/FeedResponseListValidator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,11 @@ public void validate(List<FeedResponse<T>> feedList) {
359359
return this;
360360
}
361361

362+
public Builder<T> withValidator(FeedResponseListValidator<T> validator) {
363+
validators.add(validator);
364+
return this;
365+
}
366+
362367
private <T> Resource getResource(T response) {
363368
if (response instanceof Resource
364369
|| response instanceof CosmosConflictProperties

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,14 @@ public void partitionedSessionToken(boolean isNameBased) throws NoSuchMethodExce
305305
SinglePartitionKeyServerBatchRequest serverBatchRequest =
306306
(SinglePartitionKeyServerBatchRequest) method.invoke(SinglePartitionKeyServerBatchRequest.class, new PartitionKey(document.getId()),
307307
itemBatchOperations);
308-
spyClient.executeBatchRequest(getCollectionLink(isNameBased), serverBatchRequest,
309-
new RequestOptions(), false).block();
308+
spyClient
309+
.executeBatchRequest(
310+
getCollectionLink(isNameBased),
311+
serverBatchRequest,
312+
new RequestOptions(),
313+
false,
314+
true)
315+
.block();
310316
assertThat(getSessionTokensInRequests().size()).isEqualTo(1);
311317
assertThat(getSessionTokensInRequests().get(0)).isNotEmpty();
312318
assertThat(getSessionTokensInRequests().get(0)).doesNotContain(","); // making sure we have only one scope session token
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation;
5+
6+
import com.azure.cosmos.BridgeInternal;
7+
import com.azure.cosmos.CosmosException;
8+
import com.azure.cosmos.implementation.caches.RxCollectionCache;
9+
import org.mockito.Mockito;
10+
import org.testng.annotations.DataProvider;
11+
import org.testng.annotations.Test;
12+
import reactor.core.publisher.Mono;
13+
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
18+
import static org.mockito.Mockito.doNothing;
19+
import static org.mockito.Mockito.verify;
20+
21+
public class StaleResourceExceptionRetryPolicyTest {
22+
@DataProvider(name = "exceptionProvider")
23+
public Object[][] exceptionProvider() {
24+
return new Object[][] {
25+
//status code, subStatusCode, expectRetry
26+
{ HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE, true },
27+
{ HttpConstants.StatusCodes.BADREQUEST, HttpConstants.SubStatusCodes.INCORRECT_CONTAINER_RID_SUB_STATUS, true },
28+
{ HttpConstants.StatusCodes.GONE, HttpConstants.SubStatusCodes.CLOSED_CLIENT, false }
29+
};
30+
}
31+
32+
@Test(groups = "unit", dataProvider = "exceptionProvider")
33+
public void staledException(int statusCode, int subStatusCode, boolean expectRetry) {
34+
String testCollectionLink = "/dbs/test/colls/staledExceptionTest";
35+
DocumentCollection documentCollection = new DocumentCollection();
36+
documentCollection.setId("staledExceptionTest");
37+
documentCollection.setResourceId("staledExceptionTestRid");
38+
39+
RxCollectionCache rxCollectionCache = Mockito.mock(RxCollectionCache.class);
40+
Mockito
41+
.when(rxCollectionCache.resolveByNameAsync(Mockito.any(), Mockito.any(), Mockito.isNull()))
42+
.thenReturn(Mono.just(documentCollection));
43+
doNothing().when(rxCollectionCache).refresh(Mockito.any(), Mockito.any(), Mockito.isNull());
44+
45+
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
46+
47+
StaleResourceRetryPolicy staleResourceRetryPolicy = new StaleResourceRetryPolicy(
48+
rxCollectionCache,
49+
null,
50+
testCollectionLink,
51+
null,
52+
null,
53+
sessionContainer,
54+
TestUtils.mockDiagnosticsClientContext()
55+
);
56+
57+
CosmosException exception = BridgeInternal.createCosmosException(statusCode);
58+
BridgeInternal.setSubStatusCode(exception, subStatusCode);
59+
ShouldRetryResult shouldRetryResult = staleResourceRetryPolicy.shouldRetry(exception).block();
60+
assertThat(shouldRetryResult.shouldRetry).isEqualTo(expectRetry);
61+
62+
shouldRetryResult = staleResourceRetryPolicy.shouldRetry(exception).block();
63+
assertThat(shouldRetryResult.shouldRetry).isFalse();
64+
}
65+
66+
@Test(groups = "unit")
67+
public void suppressRetryForExternalCollectionRid() {
68+
String testCollectionLink = "/dbs/test/colls/staledExceptionTest";
69+
DocumentCollection documentCollection = new DocumentCollection();
70+
documentCollection.setId("staledExceptionTest");
71+
documentCollection.setResourceId("staledExceptionTestRid");
72+
73+
RxCollectionCache rxCollectionCache = Mockito.mock(RxCollectionCache.class);
74+
Mockito
75+
.when(rxCollectionCache.resolveByNameAsync(Mockito.any(), Mockito.any(), Mockito.isNull()))
76+
.thenReturn(Mono.just(documentCollection));
77+
doNothing().when(rxCollectionCache).refresh(Mockito.any(), Mockito.any(), Mockito.isNull());
78+
79+
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
80+
81+
Map<String, String> customHeaders = new HashMap<>();
82+
customHeaders.put(HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER, "staledExceptionTestRid");
83+
84+
StaleResourceRetryPolicy staleResourceRetryPolicy = new StaleResourceRetryPolicy(
85+
rxCollectionCache,
86+
null,
87+
testCollectionLink,
88+
null,
89+
customHeaders,
90+
sessionContainer,
91+
TestUtils.mockDiagnosticsClientContext()
92+
);
93+
94+
InvalidPartitionException invalidPartitionException = new InvalidPartitionException();
95+
ShouldRetryResult shouldRetryResult = staleResourceRetryPolicy.shouldRetry(invalidPartitionException).block();
96+
assertThat(shouldRetryResult.shouldRetry).isFalse();
97+
}
98+
99+
@Test(groups = "unit")
100+
public void cleanSessionToken() {
101+
String testCollectionLink = "/dbs/test/colls/staledExceptionTest";
102+
DocumentCollection documentCollection = new DocumentCollection();
103+
documentCollection.setId("staledExceptionTest");
104+
documentCollection.setResourceId("staledExceptionTestRid");
105+
106+
DocumentCollection documentCollectionAfterRefresh = new DocumentCollection();
107+
documentCollectionAfterRefresh.setId(documentCollection.getId());
108+
documentCollectionAfterRefresh.setResourceId(documentCollection.getResourceId() + "refreshed");
109+
110+
RxCollectionCache rxCollectionCache = Mockito.mock(RxCollectionCache.class);
111+
Mockito
112+
.when(rxCollectionCache.resolveByNameAsync(Mockito.any(), Mockito.any(), Mockito.isNull()))
113+
.thenReturn(Mono.just(documentCollection))
114+
.thenReturn(Mono.just(documentCollectionAfterRefresh));
115+
116+
doNothing().when(rxCollectionCache).refresh(Mockito.any(), Mockito.any(), Mockito.isNull());
117+
118+
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
119+
doNothing().when(sessionContainer).clearTokenByResourceId(documentCollection.getResourceId());
120+
121+
StaleResourceRetryPolicy staleResourceRetryPolicy = new StaleResourceRetryPolicy(
122+
rxCollectionCache,
123+
null,
124+
testCollectionLink,
125+
null,
126+
null,
127+
sessionContainer,
128+
TestUtils.mockDiagnosticsClientContext()
129+
);
130+
131+
InvalidPartitionException invalidPartitionException = new InvalidPartitionException();
132+
ShouldRetryResult shouldRetryResult = staleResourceRetryPolicy.shouldRetry(invalidPartitionException).block();
133+
assertThat(shouldRetryResult.shouldRetry).isTrue();
134+
verify(rxCollectionCache, Mockito.times(1)).refresh(Mockito.any(), Mockito.any(), Mockito.isNull());
135+
verify(sessionContainer, Mockito.times(1)).clearTokenByResourceId(documentCollection.getResourceId());
136+
}
137+
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorHelperTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void createChangeFeedRequestOptionsForChangeFeedState(ChangeFeedMode chan
5151
ImplementationBridgeHelpers
5252
.CosmosChangeFeedRequestOptionsHelper
5353
.getCosmosChangeFeedRequestOptionsAccessor()
54-
.getHeader(cosmosChangeFeedRequestOptions);
54+
.getHeaders(cosmosChangeFeedRequestOptions);
5555

5656
assertThat(customOptions).isNotNull();
5757
assertThat(customOptions.get(HttpConstants.HttpHeaders.CHANGE_FEED_WIRE_FORMAT_VERSION)).isEqualTo(
@@ -94,7 +94,7 @@ public void createForProcessingFromContinuation(ChangeFeedMode changeFeedMode) {
9494
ImplementationBridgeHelpers
9595
.CosmosChangeFeedRequestOptionsHelper
9696
.getCosmosChangeFeedRequestOptionsAccessor()
97-
.getHeader(cosmosChangeFeedRequestOptions);
97+
.getHeaders(cosmosChangeFeedRequestOptions);
9898

9999
assertThat(customOptions).isNotNull();
100100
assertThat(customOptions.get(HttpConstants.HttpHeaders.CHANGE_FEED_WIRE_FORMAT_VERSION)).isEqualTo(

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -262,31 +262,18 @@ public void shouldRetryWithPartitionIsMigratingException() {
262262
}
263263

264264
/**
265-
* Retry with InvalidPartitionException
265+
* Should not retry with InvalidPartitionException
266266
*/
267267
@Test(groups = { "unit" }, timeOut = TIMEOUT)
268-
public void shouldRetryWithInvalidPartitionException() {
268+
public void shouldNotRetryWithInvalidPartitionException() {
269269
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
270270
mockDiagnosticsClientContext(),
271271
OperationType.Read,
272272
ResourceType.Document);
273273
GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30);
274-
Mono<ShouldRetryResult> singleShouldRetry = goneAndRetryWithRetryPolicy
275-
.shouldRetry(new InvalidPartitionException());
276-
ShouldRetryResult shouldRetryResult = singleShouldRetry.block();
277-
assertThat(shouldRetryResult.shouldRetry).isTrue();
278-
assertThat(request.requestContext.quorumSelectedLSN).isEqualTo(-1);
279-
assertThat(request.requestContext.resolvedPartitionKeyRange).isNull();
280-
assertThat(request.requestContext.globalCommittedSelectedLSN).isEqualTo(-1);
281-
assertThat(shouldRetryResult.policyArg.getValue0()).isFalse();
282-
283-
goneAndRetryWithRetryPolicy.shouldRetry(new InvalidPartitionException()).block();
284-
// It will retry max till 3 attempts
285-
shouldRetryResult = goneAndRetryWithRetryPolicy.shouldRetry(new InvalidPartitionException()).block();
286-
assertThat(shouldRetryResult.shouldRetry).isFalse();
287-
CosmosException clientException = (CosmosException) shouldRetryResult.exception;
288-
assertThat(clientException.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE);
289-
274+
ShouldRetryResult singleShouldRetry = goneAndRetryWithRetryPolicy
275+
.shouldRetry(new InvalidPartitionException()).block();
276+
assertThat(singleShouldRetry.shouldRetry).isFalse();
290277
}
291278

292279
/**

0 commit comments

Comments
 (0)