Skip to content

Commit 67b30e8

Browse files
authored
Enabled excludeRegions to be applied for QueryPlan calls. (Azure#45196)
* Enabled `excludeRegions` to be applied for `QueryPlan` calls. * Enabled `excludeRegions` to be applied for `QueryPlan` calls.
1 parent 73320ce commit 67b30e8

File tree

3 files changed

+186
-31
lines changed

3 files changed

+186
-31
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ExcludeRegionTests.java

Lines changed: 175 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
import com.azure.cosmos.implementation.RxDocumentClientImpl;
1212
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
1313
import com.azure.cosmos.implementation.throughputControl.TestItem;
14+
import com.azure.cosmos.models.CosmosBatch;
15+
import com.azure.cosmos.models.CosmosBatchRequestOptions;
16+
import com.azure.cosmos.models.CosmosBatchResponse;
1417
import com.azure.cosmos.models.CosmosItemRequestOptions;
18+
import com.azure.cosmos.models.CosmosItemResponse;
1519
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
1620
import com.azure.cosmos.models.CosmosPatchOperations;
1721
import com.azure.cosmos.models.CosmosQueryRequestOptions;
@@ -32,6 +36,7 @@
3236
import org.testng.annotations.Factory;
3337
import org.testng.annotations.Test;
3438

39+
import java.time.Duration;
3540
import java.util.ArrayList;
3641
import java.util.Arrays;
3742
import java.util.Iterator;
@@ -46,6 +51,9 @@ public class ExcludeRegionTests extends TestSuiteBase {
4651
private CosmosAsyncContainer cosmosAsyncContainer;
4752
private List<String> preferredRegionList;
4853

54+
private static final CosmosEndToEndOperationLatencyPolicyConfig INF_E2E_TIMEOUT
55+
= new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofDays(100)).build();
56+
4957
@Factory(dataProvider = "clientBuildersWithSessionConsistency")
5058
public ExcludeRegionTests(CosmosClientBuilder clientBuilder) {
5159
super(clientBuilder);
@@ -89,24 +97,26 @@ public static Object[][] operationTypeArgProvider() {
8997
{ OperationType.Create },
9098
{ OperationType.Delete },
9199
{ OperationType.Query },
92-
{ OperationType.Patch }
100+
{ OperationType.Patch },
101+
{ OperationType.Batch }
93102
};
94103
}
95104

96105
@DataProvider(name = "faultInjectionArgProvider")
97106
public static Object[][] faultInjectionArgProvider() {
98-
return new Object[][]{
107+
return new Object[][] {
99108
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM },
100109
{ OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM },
101110
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM },
102111
{ OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM },
103112
{ OperationType.Query, FaultInjectionOperationType.QUERY_ITEM },
104-
{ OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM }
113+
{ OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM },
114+
{ OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM }
105115
};
106116
}
107117

108118
@Test(groups = {"multi-master"}, dataProvider = "operationTypeArgProvider", timeOut = TIMEOUT)
109-
public void excludeRegionTest_SkipFirstPreferredRegion(OperationType operationType) {
119+
public void excludeRegionTest_SkipFirstPreferredRegion(OperationType operationType) throws InterruptedException {
110120

111121
if (this.preferredRegionList.size() <= 1) {
112122
throw new SkipException("excludeRegionTest_SkipFirstPreferredRegion can only be tested for multi-master with multi-regions");
@@ -115,20 +125,23 @@ public void excludeRegionTest_SkipFirstPreferredRegion(OperationType operationTy
115125
TestItem createdItem = TestItem.createNewItem();
116126
this.cosmosAsyncContainer.createItem(createdItem).block();
117127

118-
CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(cosmosAsyncContainer, operationType, createdItem, null);
119-
assertThat(cosmosDiagnostics.getContactedRegionNames().size()).isEqualTo(1);
120-
assertThat(cosmosDiagnostics.getContactedRegionNames()).containsAll(this.preferredRegionList.subList(0, 1));
128+
Thread.sleep(1000);
129+
130+
CosmosDiagnosticsContext cosmosDiagnosticsContextBeforeRegionExclusion
131+
= this.performDocumentOperation(cosmosAsyncContainer, operationType, createdItem, null, INF_E2E_TIMEOUT);
132+
133+
validateRegionsContacted(cosmosDiagnosticsContextBeforeRegionExclusion, this.preferredRegionList.subList(0, 1));
121134

122135
// now exclude the first preferred region
123-
cosmosDiagnostics =
136+
CosmosDiagnosticsContext cosmosDiagnosticsContextPostRegionExclusion =
124137
this.performDocumentOperation(
125138
cosmosAsyncContainer,
126139
operationType,
127140
createdItem,
128-
Arrays.asList(this.preferredRegionList.get(0)));
141+
Arrays.asList(this.preferredRegionList.get(0)),
142+
INF_E2E_TIMEOUT);
129143

130-
assertThat(cosmosDiagnostics.getContactedRegionNames().size()).isEqualTo(1);
131-
assertThat(cosmosDiagnostics.getContactedRegionNames()).containsAll(this.preferredRegionList.subList(1, 2));
144+
validateRegionsContacted(cosmosDiagnosticsContextPostRegionExclusion, this.preferredRegionList.subList(1, 2));
132145
}
133146

134147
@Test(groups = {"multi-master"}, dataProvider = "faultInjectionArgProvider", timeOut = TIMEOUT)
@@ -162,9 +175,8 @@ public void excludeRegionTest_readSessionNotAvailable(
162175
try {
163176
CosmosFaultInjectionHelper.configureFaultInjectionRules(this.cosmosAsyncContainer, Arrays.asList(serverErrorRule)).block();
164177
try {
165-
CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(cosmosAsyncContainer, operationType, createdItem, null);
166-
assertThat(cosmosDiagnostics.getContactedRegionNames().size()).isEqualTo(2);
167-
assertThat(cosmosDiagnostics.getContactedRegionNames().containsAll(this.preferredRegionList.subList(0, 2)));
178+
CosmosDiagnosticsContext cosmosDiagnosticsContextBeforeRegionExclusion = this.performDocumentOperation(cosmosAsyncContainer, operationType, createdItem, null, INF_E2E_TIMEOUT);
179+
validateRegionsContacted(cosmosDiagnosticsContextBeforeRegionExclusion, this.preferredRegionList.subList(0, 2));
168180
} catch (CosmosException e) {
169181
fail("Request should succeeded in other regions");
170182
}
@@ -175,13 +187,19 @@ public void excludeRegionTest_readSessionNotAvailable(
175187
cosmosAsyncContainer,
176188
operationType,
177189
createdItem,
178-
this.preferredRegionList.subList(1, this.preferredRegionList.size()));
190+
this.preferredRegionList.subList(1, this.preferredRegionList.size()),
191+
INF_E2E_TIMEOUT);
179192

180193
fail("Request should have failed");
181194
} catch (CosmosException exception) {
182195
CosmosDiagnostics cosmosDiagnostics = exception.getDiagnostics();
183-
assertThat(cosmosDiagnostics.getContactedRegionNames().size()).isEqualTo(1);
184-
assertThat(cosmosDiagnostics.getContactedRegionNames().containsAll(this.preferredRegionList.subList(0, 1)));
196+
197+
assertThat(cosmosDiagnostics).isNotNull();
198+
199+
CosmosDiagnosticsContext cosmosDiagnosticsContextPostRegionExclusion
200+
= cosmosDiagnostics.getDiagnosticsContext();
201+
202+
validateRegionsContacted(cosmosDiagnosticsContextPostRegionExclusion, this.preferredRegionList.subList(0, 1));
185203
}
186204
} finally {
187205
serverErrorRule.disable();
@@ -208,19 +226,29 @@ private List<String> getPreferredRegionList(CosmosAsyncClient client) {
208226
return preferredRegionList;
209227
}
210228

211-
private CosmosDiagnostics performDocumentOperation(
229+
private CosmosDiagnosticsContext performDocumentOperation(
212230
CosmosAsyncContainer cosmosAsyncContainer,
213231
OperationType operationType,
214232
TestItem createdItem,
215-
List<String> excludeRegions) {
233+
List<String> excludeRegions,
234+
CosmosEndToEndOperationLatencyPolicyConfig cosmosEndToEndOperationLatencyPolicyConfig) {
235+
216236
if (operationType == OperationType.Query) {
217237
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
238+
239+
queryRequestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(cosmosEndToEndOperationLatencyPolicyConfig);
240+
218241
String query = String.format("SELECT * from c where c.id = '%s'", createdItem.getId());
219242
queryRequestOptions.setExcludedRegions(excludeRegions);
220243
FeedResponse<TestItem> itemFeedResponse =
221244
cosmosAsyncContainer.queryItems(query, queryRequestOptions, TestItem.class).byPage().blockFirst();
222245

223-
return itemFeedResponse.getCosmosDiagnostics();
246+
assertThat(itemFeedResponse).isNotNull();
247+
CosmosDiagnostics cosmosDiagnostics = itemFeedResponse.getCosmosDiagnostics();
248+
249+
assertThat(cosmosDiagnostics).isNotNull();
250+
251+
return cosmosDiagnostics.getDiagnosticsContext();
224252
}
225253

226254
if (operationType == OperationType.Read
@@ -231,37 +259,109 @@ private CosmosDiagnostics performDocumentOperation(
231259
|| operationType == OperationType.Upsert) {
232260

233261
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
262+
263+
cosmosItemRequestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(cosmosEndToEndOperationLatencyPolicyConfig);
234264
cosmosItemRequestOptions.setExcludedRegions(excludeRegions);
235265

236266
if (operationType == OperationType.Read) {
237267

238-
return cosmosAsyncContainer.readItem(
268+
try {
269+
Thread.sleep(1000);
270+
} catch (InterruptedException e) {
271+
throw new RuntimeException(e);
272+
}
273+
274+
CosmosItemResponse<TestItem> itemResponse = cosmosAsyncContainer.readItem(
239275
createdItem.getId(),
240276
new PartitionKey(createdItem.getMypk()),
241277
cosmosItemRequestOptions,
242-
TestItem.class).block().getDiagnostics();
278+
TestItem.class).block();
279+
280+
assertThat(itemResponse).isNotNull();
281+
assertThat(itemResponse.getDiagnostics()).isNotNull();
282+
283+
CosmosDiagnostics cosmosDiagnostics = itemResponse.getDiagnostics();
284+
285+
assertThat(cosmosDiagnostics).isNotNull();
286+
287+
return cosmosDiagnostics.getDiagnosticsContext();
243288
}
244289

245290
if (operationType == OperationType.Replace) {
246-
return cosmosAsyncContainer.replaceItem(
291+
292+
try {
293+
Thread.sleep(1000);
294+
} catch (InterruptedException e) {
295+
throw new RuntimeException(e);
296+
}
297+
298+
CosmosItemResponse<TestItem> itemResponse = cosmosAsyncContainer.replaceItem(
247299
createdItem,
248300
createdItem.getId(),
249301
new PartitionKey(createdItem.getMypk()),
250-
cosmosItemRequestOptions).block().getDiagnostics();
302+
cosmosItemRequestOptions).block();
303+
304+
assertThat(itemResponse).isNotNull();
305+
assertThat(itemResponse.getDiagnostics()).isNotNull();
306+
307+
CosmosDiagnostics cosmosDiagnostics = itemResponse.getDiagnostics();
308+
309+
assertThat(cosmosDiagnostics).isNotNull();
310+
311+
return cosmosDiagnostics.getDiagnosticsContext();
251312
}
252313

253314
if (operationType == OperationType.Delete) {
254-
TestItem toBeDeletedItem = TestItem.createNewItem();
255-
cosmosAsyncContainer.createItem(toBeDeletedItem).block();
256-
return cosmosAsyncContainer.deleteItem(toBeDeletedItem, cosmosItemRequestOptions).block().getDiagnostics();
315+
316+
TestItem itemToBeDeleted = TestItem.createNewItem();
317+
318+
cosmosAsyncContainer.createItem(itemToBeDeleted, cosmosItemRequestOptions).block();
319+
320+
try {
321+
Thread.sleep(1000);
322+
} catch (InterruptedException e) {
323+
throw new RuntimeException(e);
324+
}
325+
326+
CosmosItemResponse<Object> itemResponse
327+
= cosmosAsyncContainer.deleteItem(itemToBeDeleted, cosmosItemRequestOptions).block();
328+
329+
assertThat(itemResponse).isNotNull();
330+
assertThat(itemResponse.getDiagnostics()).isNotNull();
331+
332+
CosmosDiagnostics cosmosDiagnostics = itemResponse.getDiagnostics();
333+
334+
assertThat(cosmosDiagnostics).isNotNull();
335+
336+
return cosmosDiagnostics.getDiagnosticsContext();
257337
}
258338

259339
if (operationType == OperationType.Create) {
260-
return cosmosAsyncContainer.createItem(TestItem.createNewItem(), cosmosItemRequestOptions).block().getDiagnostics();
340+
CosmosItemResponse<TestItem> itemResponse = cosmosAsyncContainer
341+
.createItem(TestItem.createNewItem(), cosmosItemRequestOptions).block();
342+
343+
assertThat(itemResponse).isNotNull();
344+
assertThat(itemResponse.getDiagnostics()).isNotNull();
345+
346+
CosmosDiagnostics cosmosDiagnostics = itemResponse.getDiagnostics();
347+
348+
assertThat(cosmosDiagnostics).isNotNull();
349+
350+
return cosmosDiagnostics.getDiagnosticsContext();
261351
}
262352

263353
if (operationType == OperationType.Upsert) {
264-
return cosmosAsyncContainer.upsertItem(TestItem.createNewItem(), cosmosItemRequestOptions).block().getDiagnostics();
354+
CosmosItemResponse<TestItem> itemResponse
355+
= cosmosAsyncContainer.upsertItem(TestItem.createNewItem(), cosmosItemRequestOptions).block();
356+
357+
assertThat(itemResponse).isNotNull();
358+
assertThat(itemResponse.getDiagnostics()).isNotNull();
359+
360+
CosmosDiagnostics cosmosDiagnostics = itemResponse.getDiagnostics();
361+
362+
assertThat(cosmosDiagnostics).isNotNull();
363+
364+
return cosmosDiagnostics.getDiagnosticsContext();
265365
}
266366

267367
if (operationType == OperationType.Patch) {
@@ -271,13 +371,57 @@ private CosmosDiagnostics performDocumentOperation(
271371
.add("/newPath", "newPath");
272372

273373
CosmosPatchItemRequestOptions patchItemRequestOptions = new CosmosPatchItemRequestOptions();
374+
375+
patchItemRequestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(cosmosEndToEndOperationLatencyPolicyConfig);
274376
patchItemRequestOptions.setExcludedRegions(excludeRegions);
275-
return cosmosAsyncContainer
377+
378+
CosmosItemResponse<TestItem> itemResponse = cosmosAsyncContainer
276379
.patchItem(createdItem.getId(), new PartitionKey(createdItem.getMypk()), patchOperations, patchItemRequestOptions, TestItem.class)
277-
.block().getDiagnostics();
380+
.block();
381+
382+
assertThat(itemResponse).isNotNull();
383+
assertThat(itemResponse.getDiagnostics()).isNotNull();
384+
385+
CosmosDiagnostics cosmosDiagnostics = itemResponse.getDiagnostics();
386+
387+
assertThat(cosmosDiagnostics).isNotNull();
388+
389+
return cosmosDiagnostics.getDiagnosticsContext();
278390
}
279391
}
280392

393+
if (operationType == OperationType.Batch) {
394+
395+
CosmosBatchRequestOptions cosmosBatchRequestOptions = new CosmosBatchRequestOptions();
396+
397+
cosmosBatchRequestOptions.setExcludedRegions(excludeRegions);
398+
399+
TestItem testItem = TestItem.createNewItem();
400+
PartitionKey partitionKey = new PartitionKey(testItem.getMypk());
401+
402+
CosmosBatch cosmosBatch = CosmosBatch.createCosmosBatch(partitionKey);
403+
cosmosBatch.createItemOperation(testItem);
404+
405+
CosmosBatchResponse cosmosBatchResponse
406+
= cosmosAsyncContainer.executeCosmosBatch(cosmosBatch, cosmosBatchRequestOptions).block();
407+
408+
assertThat(cosmosBatchResponse).isNotNull();
409+
assertThat(cosmosBatchResponse.getDiagnostics()).isNotNull();
410+
411+
CosmosDiagnostics cosmosDiagnostics = cosmosBatchResponse.getDiagnostics();
412+
413+
assertThat(cosmosDiagnostics).isNotNull();
414+
415+
return cosmosDiagnostics.getDiagnosticsContext();
416+
}
417+
281418
throw new IllegalArgumentException("The operation type is not supported");
282419
}
420+
421+
private static void validateRegionsContacted(CosmosDiagnosticsContext diagnosticsContext, List<String> expectedRegionsContacted) {
422+
assertThat(diagnosticsContext).isNotNull();
423+
assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull();
424+
assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(expectedRegionsContacted.size());
425+
assertThat(diagnosticsContext.getContactedRegionNames()).containsAll(expectedRegionsContacted);
426+
}
283427
}

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#### Bugs Fixed
1212
* Fixed the fail back flow where not all partitions were failing back to original first preferred region for Per-Partition Circuit Breaker. - [PR 44099](https://github.com/Azure/azure-sdk-for-java/pull/44099)
1313
* Fixed diagnostics issue where operations in Gateway mode hitting end-to-end timeout would not capture diagnostics correctly. - [PR 44099](https://github.com/Azure/azure-sdk-for-java/pull/44099)
14+
* Enabled `excludeRegions` to be applied for `QueryPlan` calls. - [PR 45196](https://github.com/Azure/azure-sdk-for-java/pull/45196)
1415

1516
#### Other Changes
1617
* Added the `vectorIndexShardKeys` to the vectorIndexSpec for QuantizedFlat and DiskANN vector search. - [PR 44007](https://github.com/Azure/azure-sdk-for-java/pull/44007)

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import reactor.core.publisher.Mono;
2626

2727
import java.util.HashMap;
28+
import java.util.List;
2829
import java.util.Map;
2930
import java.util.function.BiFunction;
3031
import java.util.function.Supplier;
@@ -102,10 +103,19 @@ static Mono<PartitionedQueryExecutionInfo> getQueryPlanThroughGatewayAsync(Diagn
102103
CosmosEndToEndOperationLatencyPolicyConfig end2EndConfig = qryOptAccessor
103104
.getImpl(nonNullRequestOptions)
104105
.getCosmosEndToEndLatencyPolicyConfig();
106+
107+
List<String> excludeRegions = qryOptAccessor
108+
.getImpl(nonNullRequestOptions)
109+
.getExcludedRegions();
110+
105111
if (end2EndConfig != null) {
106112
queryPlanRequest.requestContext.setEndToEndOperationLatencyPolicyConfig(end2EndConfig);
107113
}
108114

115+
if (excludeRegions != null && !excludeRegions.isEmpty()) {
116+
queryPlanRequest.requestContext.setExcludeRegions(excludeRegions);
117+
}
118+
109119
BiFunction<Supplier<DocumentClientRetryPolicy>, RxDocumentServiceRequest, Mono<PartitionedQueryExecutionInfo>> executeFunc =
110120
(retryPolicyFactory, req) -> {
111121
DocumentClientRetryPolicy retryPolicyInstance = retryPolicyFactory.get();

0 commit comments

Comments
 (0)