Skip to content

Commit 0b53398

Browse files
authored
Partition level circuit breaker (#39265)
* Fixing CI pipeline. * Added skeletal classes. * Added skeletal classes and method calls. * Added skeletal classes and method calls. * Added skeletal flow for marking a partition as unavailable for read / write. * Added skeletal flow for marking a partition as unavailable for read / write. * Adding skeletal methods to GlobalPartitionEndpointManagerForCircuitBreaker. * Adding skeletal methods to GlobalPartitionEndpointManagerForCircuitBreaker. * Updated CHANGELOG. * Added class to error track on a per-region basis. * Refactor region health transitions. * Added class to error track on a per-region basis. * Adapt point operations to bookmark failures. * Wiring region feedback handling for point operations. * Fixing compilation errors. * Added partitionKeyRange detection for point operations in document client layer. * Updated CHANGELOG.md. * Fixing partition state transition logic. * Modify logger level. * Adding a way to exclude partition-level unavailable regions. * Adding a way to exclude partition-level unavailable regions for queries. * Adding circuit breaking for 408s in point operations. * Handling 408 cases for queries. * Adding shared state for point and query operations with availability strategy enabled. * Implementing 408 handling for partition-level circuit breaker. * Implementing 408 handling for partition-level circuit breaker. * Implementing 408 handling for partition-level circuit breaker. * Implementing 408 handling for partition-level circuit breaker. * Code refactor. * Test changes and multi-container fixes. * Integrate readMany for partition-level circuit breaker. * Fixing merge. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Integrating circuit breaking behavior for change feed. * Integrating circuit breaking behavior for change feed. * Added separate exception / success counters for write / non-write operations. * Added separate exception / success counters for write / non-write operations. * Added separate exception / success counters for write / non-write operations. * Adding `Healthy` status. * Adding `CosmosDiagnostics` for change feed `FeedResponse`. * Refactoring. * Refactoring. * Refactoring. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Refactoring. * Refactoring. * Fixing CI pipeline. * Fixing CI pipeline. * Handle server-generated 500 errors for circuit breaking purposes. * Handling stale collection cache with retries. * Handling stale collection cache with retries. * Adding tests. * Adding tests. * Adding tests. * Fixing CI pipeline. * Fixing faulty merge. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing live tests pipeline. * Fixing live tests pipeline. * Refactoring. * Refactoring. * Refactoring. * Refactoring. * Fixing tests. * Fixing tests. * Refactoring. * Refactoring. * Refactoring. * Refactoring. * Adding code comments. * Adding code comments. * Modifying way to configure circuit breaker thresholds. * Adding capability to specify alternate account to upload benchmark results. * Fixing merge. * Reacting to review comments. * Refactoring tests. * Fixing tests. * Fixing tests. * Fixing tests. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Fixing tests. * Fixing merge conflicts. * Refactoring. * Refactoring. * Refactoring. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Fixing live tests pipeline. * Fixing live tests pipeline. * Fixing live tests pipeline. * Fixing live tests pipeline. * Fixing live tests pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Refactoring. * Refactoring. * Refactoring. * Refactoring. * Refactoring. * Refactoring. * Refactoring. * Refactoring and reacting to review comments. * Refactoring. * Refactoring. * Refactoring. * Fixing test pipeline. * Reacting to review comments. * Modify test pipeline timeout. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Reacting to review comments. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Fixing CI pipeline. * Updated CHANGELOG.md. * Added code comments.
1 parent c562205 commit 0b53398

File tree

94 files changed

+8381
-580
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+8381
-580
lines changed

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
3737
import io.micrometer.core.instrument.MeterRegistry;
3838
import org.apache.commons.lang3.RandomStringUtils;
39+
import org.apache.commons.lang3.StringUtils;
3940
import org.mpierce.metrics.reservoir.hdrhistogram.HdrHistogramResetOnSnapshotReservoir;
4041
import org.reactivestreams.Subscription;
4142
import org.slf4j.Logger;
@@ -88,6 +89,16 @@ abstract class AsyncBenchmark<T> {
8889
logger = LoggerFactory.getLogger(this.getClass());
8990
configuration = cfg;
9091

92+
if (configuration.isPartitionLevelCircuitBreakerEnabled()) {
93+
System.setProperty(
94+
"COSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_CONFIG",
95+
"{\"isPartitionLevelCircuitBreakerEnabled\": true, "
96+
+ "\"circuitBreakerType\": \"CONSECUTIVE_EXCEPTION_COUNT_BASED\","
97+
+ "\"consecutiveExceptionCountToleratedForReads\": 10,"
98+
+ "\"consecutiveExceptionCountToleratedForWrites\": 5,"
99+
+ "}");
100+
}
101+
91102
CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
92103
.endpoint(cfg.getServiceEndpoint())
93104
.key(cfg.getMasterKey())
@@ -138,8 +149,11 @@ abstract class AsyncBenchmark<T> {
138149
cosmosClientBuilder = cosmosClientBuilder.gatewayMode(gatewayConnectionConfig);
139150
}
140151

141-
CosmosClient syncClient = cosmosClientBuilder.buildClient();
142152
cosmosClient = cosmosClientBuilder.buildAsyncClient();
153+
CosmosClient syncClient = cosmosClientBuilder
154+
.endpoint(StringUtils.isNotEmpty(configuration.getServiceEndpointForRunResultsUploadAccount()) ? configuration.getServiceEndpointForRunResultsUploadAccount() : configuration.getServiceEndpoint())
155+
.key(StringUtils.isNotEmpty(configuration.getMasterKeyForRunResultsUploadAccount()) ? configuration.getMasterKeyForRunResultsUploadAccount() : configuration.getMasterKey())
156+
.buildClient();
143157

144158
try {
145159
cosmosAsyncDatabase = cosmosClient.getDatabase(this.configuration.getDatabaseId());
@@ -168,6 +182,17 @@ abstract class AsyncBenchmark<T> {
168182
).block();
169183

170184
cosmosAsyncContainer = cosmosAsyncDatabase.getContainer(this.configuration.getCollectionId());
185+
186+
// add some delay to allow container to be created across multiple regions
187+
// container creation across regions is an async operation
188+
// without the delay a container may not be available to process reads / writes
189+
190+
try {
191+
Thread.sleep(30_000);
192+
} catch (Exception exception) {
193+
throw new RuntimeException(exception);
194+
}
195+
171196
logger.info("Collection {} is created for this test", this.configuration.getCollectionId());
172197
collectionCreated = true;
173198
} else {

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Configuration.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ public class Configuration {
4747
@Parameter(names = "-masterKey", description = "Master Key")
4848
private String masterKey;
4949

50+
@Parameter(names = "-serviceEndpointForResultsUploadAccount", description = "Service Endpoint for run results upload account")
51+
private String serviceEndpointForRunResultsUploadAccount;
52+
53+
@Parameter(names = "-masterKeyForResultsUploadAccount", description = "Master Key for run results upload account")
54+
private String masterKeyForRunResultsUploadAccount;
55+
5056
@Parameter(names = "-databaseId", description = "Database ID")
5157
private String databaseId;
5258

@@ -137,6 +143,9 @@ public class Configuration {
137143
@Parameter(names = "-isRegionScopedSessionContainerEnabled", description = "A flag to denote whether region scoped session container is enabled")
138144
private String isRegionScopedSessionContainerEnabled = String.valueOf(false);
139145

146+
@Parameter(names = "isPartitionLevelCircuitBreakerEnabled", description = "A flag to denote whether partition level circuit breaker is enabled.")
147+
private String isPartitionLevelCircuitBreakerEnabled = String.valueOf(true);
148+
140149
@Parameter(names = "-operation", description = "Type of Workload:\n"
141150
+ "\tReadThroughput- run a READ workload that prints only throughput *\n"
142151
+ "\tReadThroughputWithMultipleClients - run a READ workload that prints throughput and latency for multiple client read.*\n"
@@ -397,6 +406,14 @@ public String getMasterKey() {
397406
return masterKey;
398407
}
399408

409+
public String getServiceEndpointForRunResultsUploadAccount() {
410+
return serviceEndpointForRunResultsUploadAccount;
411+
}
412+
413+
public String getMasterKeyForRunResultsUploadAccount() {
414+
return masterKeyForRunResultsUploadAccount;
415+
}
416+
400417
public String getApplicationName() {
401418
return applicationName;
402419
}
@@ -639,6 +656,10 @@ public boolean isRegionScopedSessionContainerEnabled() {
639656
return Boolean.parseBoolean(isRegionScopedSessionContainerEnabled);
640657
}
641658

659+
public boolean isPartitionLevelCircuitBreakerEnabled() {
660+
return Boolean.parseBoolean(isPartitionLevelCircuitBreakerEnabled);
661+
}
662+
642663
public void tryGetValuesFromSystem() {
643664
serviceEndpoint = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("SERVICE_END_POINT")),
644665
serviceEndpoint);

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
3333
import io.micrometer.core.instrument.MeterRegistry;
3434
import org.apache.commons.lang3.RandomStringUtils;
35+
import org.apache.commons.lang3.StringUtils;
3536
import org.mpierce.metrics.reservoir.hdrhistogram.HdrHistogramResetOnSnapshotReservoir;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
@@ -114,6 +115,16 @@ public T apply(T o, Throwable throwable) {
114115
configuration = cfg;
115116
logger = LoggerFactory.getLogger(this.getClass());
116117

118+
if (configuration.isPartitionLevelCircuitBreakerEnabled()) {
119+
System.setProperty(
120+
"COSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_CONFIG",
121+
"{\"isPartitionLevelCircuitBreakerEnabled\": true, "
122+
+ "\"circuitBreakerType\": \"CONSECUTIVE_EXCEPTION_COUNT_BASED\","
123+
+ "\"consecutiveExceptionCountToleratedForReads\": 10,"
124+
+ "\"consecutiveExceptionCountToleratedForWrites\": 5,"
125+
+ "}");
126+
}
127+
117128
CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
118129
.endpoint(cfg.getServiceEndpoint())
119130
.preferredRegions(cfg.getPreferredRegionsList())
@@ -147,6 +158,11 @@ public T apply(T o, Throwable throwable) {
147158
}
148159

149160
cosmosClient = cosmosClientBuilder.buildClient();
161+
CosmosClient syncClient = cosmosClientBuilder
162+
.endpoint(StringUtils.isNotEmpty(configuration.getServiceEndpointForRunResultsUploadAccount()) ? configuration.getServiceEndpointForRunResultsUploadAccount() : configuration.getServiceEndpoint())
163+
.key(StringUtils.isNotEmpty(configuration.getMasterKeyForRunResultsUploadAccount()) ? configuration.getMasterKeyForRunResultsUploadAccount() : configuration.getMasterKey())
164+
.buildClient();
165+
150166
try {
151167
cosmosDatabase = cosmosClient.getDatabase(this.configuration.getDatabaseId());
152168
cosmosDatabase.read();
@@ -171,6 +187,16 @@ public T apply(T o, Throwable throwable) {
171187
ThroughputProperties.createManualThroughput(this.configuration.getThroughput()));
172188
cosmosContainer = cosmosDatabase.getContainer(this.configuration.getCollectionId());
173189
logger.info("Collection {} is created for this test", this.configuration.getCollectionId());
190+
191+
// add some delay to allow container to be created across multiple regions
192+
// container creation across regions is an async operation
193+
// without the delay a container may not be available to process reads / writes
194+
try {
195+
Thread.sleep(30_000);
196+
} catch (Exception exception) {
197+
throw new RuntimeException(exception);
198+
}
199+
174200
collectionCreated = true;
175201
} else {
176202
throw e;
@@ -236,7 +262,7 @@ public T apply(T o, Throwable throwable) {
236262
resultReporter = CosmosTotalResultReporter
237263
.forRegistry(
238264
metricsRegistry,
239-
cosmosClient.getDatabase(configuration.getResultUploadDatabase()).getContainer(configuration.getResultUploadContainer()),
265+
syncClient.getDatabase(configuration.getResultUploadDatabase()).getContainer(configuration.getResultUploadContainer()),
240266
configuration)
241267
.convertRatesTo(TimeUnit.SECONDS)
242268
.convertDurationsTo(TimeUnit.MILLISECONDS).build();

sdk/cosmos/azure-cosmos-tests/pom.xml

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,69 @@ Licensed under the MIT License.
465465
</plugins>
466466
</build>
467467
</profile>
468+
<profile>
469+
<!-- integration tests, requires Cosmos DB endpoint with multi master support -->
470+
<id>circuit-breaker-read-all-read-many</id>
471+
<properties>
472+
<test.groups>circuit-breaker-read-all-read-many</test.groups>
473+
</properties>
474+
<build>
475+
<plugins>
476+
<plugin>
477+
<groupId>org.apache.maven.plugins</groupId>
478+
<artifactId>maven-failsafe-plugin</artifactId>
479+
<version>3.2.5</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
480+
<configuration>
481+
<suiteXmlFiles>
482+
<suiteXmlFile>src/test/resources/circuit-breaker-read-all-read-many-testng.xml</suiteXmlFile>
483+
</suiteXmlFiles>
484+
</configuration>
485+
</plugin>
486+
</plugins>
487+
</build>
488+
</profile>
489+
<profile>
490+
<!-- integration tests, requires Cosmos DB endpoint with multi master support -->
491+
<id>circuit-breaker-misc-direct</id>
492+
<properties>
493+
<test.groups>circuit-breaker-misc-direct</test.groups>
494+
</properties>
495+
<build>
496+
<plugins>
497+
<plugin>
498+
<groupId>org.apache.maven.plugins</groupId>
499+
<artifactId>maven-failsafe-plugin</artifactId>
500+
<version>3.2.5</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
501+
<configuration>
502+
<suiteXmlFiles>
503+
<suiteXmlFile>src/test/resources/circuit-breaker-misc-direct-testng.xml</suiteXmlFile>
504+
</suiteXmlFiles>
505+
</configuration>
506+
</plugin>
507+
</plugins>
508+
</build>
509+
</profile>
510+
<profile>
511+
<!-- integration tests, requires Cosmos DB endpoint with multi master support -->
512+
<id>circuit-breaker-misc-gateway</id>
513+
<properties>
514+
<test.groups>circuit-breaker-misc-gateway</test.groups>
515+
</properties>
516+
<build>
517+
<plugins>
518+
<plugin>
519+
<groupId>org.apache.maven.plugins</groupId>
520+
<artifactId>maven-failsafe-plugin</artifactId>
521+
<version>3.2.5</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
522+
<configuration>
523+
<suiteXmlFiles>
524+
<suiteXmlFile>src/test/resources/circuit-breaker-misc-gateway-testng.xml</suiteXmlFile>
525+
</suiteXmlFiles>
526+
</configuration>
527+
</plugin>
528+
</plugins>
529+
</build>
530+
</profile>
468531
<profile>
469532
<!-- integration tests, requires Cosmos DB endpoint with multi master support -->
470533
<id>flaky-multi-master</id>

0 commit comments

Comments
 (0)