-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Yield429 #47406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Yield429 #47406
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request implements throttling (429) handling for quorum reads and write barriers in Azure Cosmos DB's strong consistency model. The changes enable the SDK to properly detect and respond when all replicas return 429 Too Many Requests responses during barrier operations.
Key changes:
- Added
QuorumThrottledresult type and throttling detection logic in quorum read/write barrier flows - Introduced
WaitForReadBarrierResultwrapper class to propagate throttled responses - Added new substatus code
SERVER_WRITE_BARRIER_THROTTLEDfor timeout exceptions caused by throttling
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos/src/main/java/module-info.java | Module descriptor file deleted (entire file removed) |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java | Added QuorumThrottled enum value, introduced WaitForReadBarrierResult class, and updated read barrier logic to handle throttled responses |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java | Added throttling detection in write barrier with AtomicBoolean tracking and throws RequestTimeoutException when retries exhausted due to 429s |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeoutException.java | Added new constructor accepting subStatusCode parameter |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java | Added new substatus code constant SERVER_WRITE_BARRIER_THROTTLED = 21013 |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java | Added commented-out test skeleton and unused imports |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConsistencyTestsStrong.java | Added new test file with C# syntax errors (not valid Java code) |
| sdk/cosmos/azure-cosmos-test/src/main/java/module-info.java | Module descriptor file deleted (entire file removed) |
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
| return waitForObs.flatMap( | ||
| waitFor -> { | ||
| if (!waitFor) { | ||
| if (waitFor.isSuccess) { |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic error: The condition should be if (!waitFor.isSuccess) to match the original behavior. The current code inverts the logic - it returns QuorumSelected when the barrier succeeds, but it should return QuorumSelected when the barrier fails (matching the original if (!waitFor) condition).
| if (waitFor.isSuccess) { | |
| if (!waitFor.isSuccess) { |
| import com.azure.cosmos.rx.TestSuiteBase; | ||
| import io.reactivex.subscribers.TestSubscriber; | ||
| import org.mockito.MockedStatic; | ||
| import org.mockito.Mockito; | ||
| import org.mockito.invocation.InvocationOnMock; | ||
| import org.testng.annotations.DataProvider; | ||
| import org.testng.annotations.Factory; |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unused imports com.azure.cosmos.rx.TestSuiteBase and org.testng.annotations.Factory should be removed.
| import com.azure.cosmos.rx.TestSuiteBase; | |
| import io.reactivex.subscribers.TestSubscriber; | |
| import org.mockito.MockedStatic; | |
| import org.mockito.Mockito; | |
| import org.mockito.invocation.InvocationOnMock; | |
| import org.testng.annotations.DataProvider; | |
| import org.testng.annotations.Factory; | |
| import io.reactivex.subscribers.TestSubscriber; | |
| import org.mockito.MockedStatic; | |
| import org.mockito.Mockito; | |
| import org.mockito.invocation.InvocationOnMock; | |
| import org.testng.annotations.DataProvider; |
| /*@Test(groups = "unit") | ||
| public void ConsistencyWriter_BarrierRetriesExhaustedWith429_Throws408Async() { | ||
| int barrierRequestCount = 0; | ||
| sessionContainer = Mockito.mock(ISessionContainer.class); | ||
| transportClient = Mockito.mock(TransportClient.class); | ||
| when(transportClient.invokeStoreAsync( | ||
| Mockito.any(Uri.class), | ||
| Mockito.any(RxDocumentServiceRequest.class)) | ||
| .thenAnswer(i -> i.getArguments()[0])); | ||
| }*/ | ||
|
|
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented-out code should be removed rather than left in the codebase. If this test is meant to be implemented later, create a separate issue or TODO comment explaining the intent.
| /*@Test(groups = "unit") | |
| public void ConsistencyWriter_BarrierRetriesExhaustedWith429_Throws408Async() { | |
| int barrierRequestCount = 0; | |
| sessionContainer = Mockito.mock(ISessionContainer.class); | |
| transportClient = Mockito.mock(TransportClient.class); | |
| when(transportClient.invokeStoreAsync( | |
| Mockito.any(Uri.class), | |
| Mockito.any(RxDocumentServiceRequest.class)) | |
| .thenAnswer(i -> i.getArguments()[0])); | |
| }*/ |
| //if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) { | ||
| // return Mono.just(Boolean.TRUE); | ||
| //} | ||
|
|
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented-out code should be removed. This appears to be dead code that was replaced by the new logic below. Keeping commented code reduces readability and maintainability.
| //if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) { | |
| // return Mono.just(Boolean.TRUE); | |
| //} |
|
|
||
| if (responses != null) { | ||
| // Check if all replicas returned 429, but don't exit early. | ||
| if (responses.stream().count() > 0 && responses.stream().allMatch(response -> response.getStoreResponse().getStatus() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS)) { |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition responses.stream().count() > 0 is redundant. If you're calling allMatch() on an empty stream, it returns true (vacuous truth). Consider using !responses.isEmpty() for clarity and better performance, or rely on the allMatch() predicate alone if an empty response set should not be treated as throttled.
| if (responses.stream().count() > 0 && responses.stream().allMatch(response -> response.getStoreResponse().getStatus() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS)) { | |
| if (!responses.isEmpty() && responses.stream().allMatch(response -> response.getStoreResponse().getStatus() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS)) { |
| private class WaitForReadBarrierResult { | ||
| private final boolean isSuccess; | ||
| private final StoreResponse throttledResponse; | ||
|
|
||
| private WaitForReadBarrierResult(boolean isSuccess, StoreResponse throttledResponse) { | ||
| this.isSuccess = isSuccess; | ||
| this.throttledResponse = throttledResponse; | ||
| } |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing JavaDoc documentation for the new inner class WaitForReadBarrierResult and its fields. According to Azure SDK guidelines, all public and internal APIs should include comprehensive JavaDoc explaining the purpose and usage.
| /** | ||
| * Instantiates a new Request timeout exception. | ||
| */ | ||
| public RequestTimeoutException(int subStatusCode) { | ||
| this(RMResources.RequestTimeout, null, subStatusCode); | ||
| } |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing JavaDoc for the new constructor. The constructor should document the subStatusCode parameter to explain its purpose and valid values.
|
|
||
| import com.azure.cosmos.BridgeInternal; | ||
| import com.azure.cosmos.ConsistencyLevel; | ||
| import com.azure.cosmos.CosmosClientBuilder; |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unused import com.azure.cosmos.CosmosClientBuilder should be removed.
| import com.azure.cosmos.CosmosClientBuilder; |
| import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext; | ||
| import static com.azure.cosmos.implementation.TestUtils.mockDocumentServiceRequest; | ||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.mockito.Mockito.when; |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The when static import from Mockito is unused in the visible code (it appears only in the commented-out test). If the commented test is removed, this import should also be removed.
| import static org.mockito.Mockito.when; |
| public void injectTooManyRequestsFaultAndVerify429Count(boolean shouldRegionScopedSessionContainerEnabled) throws InterruptedException { | ||
| string databaseName = "newdatabase"; | ||
|
|
||
| // Create a fault injection rule for TooManyRequests (429) in direct mode | ||
| FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder( | ||
| id: "TooManyRequestsRule-" + Guid.NewGuid(), | ||
| condition: new FaultInjectionConditionBuilder() | ||
| .WithOperationType(FaultInjectionOperationType.ReadItem) | ||
| .Build(), | ||
| result: FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.TooManyRequests) | ||
| .Build()) | ||
| .Build(); | ||
| // inject 404/1002 into all regions | ||
| FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder("tooManyRequestsRule-" + UUID.randomUUID()) | ||
| .condition(new FaultInjectionConditionBuilder()) | ||
| .withOperationType(FaultInjectionOperationType.READ_ITEM) | ||
| .result( | ||
| FaultInjectionResultBuilders | ||
| .getResultBuilder(FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE) | ||
| .build()) | ||
| .build(); | ||
| // Initialize the fault injector | ||
| FaultInjector faultInjector = new FaultInjector(new List<FaultInjectionRule> { tooManyRequestsRule }); | ||
| CosmosClient cosmosClient = new CosmosClientBuilder( | ||
| connectionString: "") | ||
| .WithConnectionModeDirect().WithFaultInjection(faultInjector) | ||
| .WithConsistencyLevel(Cosmos.ConsistencyLevel.Eventual) | ||
| .WithThrottlingRetryOptions( | ||
| maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries | ||
| maxRetryAttemptsOnThrottledRequests: 2) // Maximum retry attempts | ||
| .Build(); | ||
| ContainerProperties containerProperties = new ContainerProperties( | ||
| id: "test", | ||
| partitionKeyPath: "/id"); | ||
| // Create database and container | ||
| await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); | ||
| Container container = await cosmosClient.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(containerProperties); | ||
| dynamic testObject = new | ||
| { | ||
| id = Guid.NewGuid().ToString(), | ||
| Company = "Microsoft", | ||
| State = "WA" | ||
|
|
||
| }; | ||
|
|
||
|
|
||
| await container.CreateItemAsync<dynamic>(testObject); | ||
| try | ||
| { | ||
| // Attempt to read the item | ||
| ItemResponse<dynamic> itemResponse = await container.ReadItemAsync<dynamic>( | ||
| testObject.id, | ||
| new Cosmos.PartitionKey(testObject.id)); | ||
|
|
||
| // Print diagnostics | ||
| Console.WriteLine("Diagnostics:"); | ||
| Console.WriteLine(itemResponse.Diagnostics.ToString()); | ||
| } | ||
|
|
||
| catch (CosmosException ex) | ||
| { | ||
| // Handle other Cosmos exceptions | ||
| Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}"); | ||
| Console.WriteLine("Diagnostics:"); | ||
| Console.WriteLine(ex.Diagnostics.ToString()); | ||
| } | ||
| long hitCount = tooManyRequestsRule.GetHitCount(); | ||
| Console.WriteLine($"Total 429 responses: {hitCount}"); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test file contains syntax errors and uses C# syntax instead of Java:
- Line 32:
string databaseNameshould beString databaseName - Lines 36-42, 45-52: Invalid syntax mixing C# and Java
- Line 55:
new List<FaultInjectionRule>should be Java syntax - Lines 57-64: Using C# named parameters syntax (
connectionString:,WithConnectionModeDirect()) - Lines 73-74, 85-91, 94-95:
awaitkeyword is C# syntax (Java uses.block()for reactive code) - Line 78:
Guid.NewGuid()is C#, should beUUID.randomUUID() - Lines 100-103:
Console.WriteLineis C#, should beSystem.out.printlnor logger
This entire test file needs to be rewritten using proper Java syntax and Reactor patterns.
| public void injectTooManyRequestsFaultAndVerify429Count(boolean shouldRegionScopedSessionContainerEnabled) throws InterruptedException { | |
| string databaseName = "newdatabase"; | |
| // Create a fault injection rule for TooManyRequests (429) in direct mode | |
| FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder( | |
| id: "TooManyRequestsRule-" + Guid.NewGuid(), | |
| condition: new FaultInjectionConditionBuilder() | |
| .WithOperationType(FaultInjectionOperationType.ReadItem) | |
| .Build(), | |
| result: FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.TooManyRequests) | |
| .Build()) | |
| .Build(); | |
| // inject 404/1002 into all regions | |
| FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder("tooManyRequestsRule-" + UUID.randomUUID()) | |
| .condition(new FaultInjectionConditionBuilder()) | |
| .withOperationType(FaultInjectionOperationType.READ_ITEM) | |
| .result( | |
| FaultInjectionResultBuilders | |
| .getResultBuilder(FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE) | |
| .build()) | |
| .build(); | |
| // Initialize the fault injector | |
| FaultInjector faultInjector = new FaultInjector(new List<FaultInjectionRule> { tooManyRequestsRule }); | |
| CosmosClient cosmosClient = new CosmosClientBuilder( | |
| connectionString: "") | |
| .WithConnectionModeDirect().WithFaultInjection(faultInjector) | |
| .WithConsistencyLevel(Cosmos.ConsistencyLevel.Eventual) | |
| .WithThrottlingRetryOptions( | |
| maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries | |
| maxRetryAttemptsOnThrottledRequests: 2) // Maximum retry attempts | |
| .Build(); | |
| ContainerProperties containerProperties = new ContainerProperties( | |
| id: "test", | |
| partitionKeyPath: "/id"); | |
| // Create database and container | |
| await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName); | |
| Container container = await cosmosClient.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(containerProperties); | |
| dynamic testObject = new | |
| { | |
| id = Guid.NewGuid().ToString(), | |
| Company = "Microsoft", | |
| State = "WA" | |
| }; | |
| await container.CreateItemAsync<dynamic>(testObject); | |
| try | |
| { | |
| // Attempt to read the item | |
| ItemResponse<dynamic> itemResponse = await container.ReadItemAsync<dynamic>( | |
| testObject.id, | |
| new Cosmos.PartitionKey(testObject.id)); | |
| // Print diagnostics | |
| Console.WriteLine("Diagnostics:"); | |
| Console.WriteLine(itemResponse.Diagnostics.ToString()); | |
| } | |
| catch (CosmosException ex) | |
| { | |
| // Handle other Cosmos exceptions | |
| Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}"); | |
| Console.WriteLine("Diagnostics:"); | |
| Console.WriteLine(ex.Diagnostics.ToString()); | |
| } | |
| long hitCount = tooManyRequestsRule.GetHitCount(); | |
| Console.WriteLine($"Total 429 responses: {hitCount}"); | |
| public void injectTooManyRequestsFaultAndVerify429Count() throws InterruptedException { | |
| String databaseName = "newdatabase"; | |
| // Create a fault injection rule for TooManyRequests (429) in direct mode | |
| FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder("TooManyRequestsRule-" + UUID.randomUUID()) | |
| .condition(new FaultInjectionConditionBuilder() | |
| .operationType(FaultInjectionOperationType.READ_ITEM) | |
| .build()) | |
| .result( | |
| FaultInjectionResultBuilders | |
| .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUESTS) | |
| .build()) | |
| .build(); | |
| // Initialize the fault injector | |
| List<FaultInjectionRule> rules = new ArrayList<>(); | |
| rules.add(tooManyRequestsRule); | |
| FaultInjector faultInjector = new FaultInjector(rules); | |
| CosmosClient cosmosClient = new CosmosClientBuilder() | |
| .connectionString("") // TODO: Provide valid connection string | |
| .directMode() | |
| .faultInjection(faultInjector) | |
| .consistencyLevel(ConsistencyLevel.EVENTUAL) | |
| .throttlingRetryOptions(new ThrottlingRetryOptions() | |
| .setMaxRetryWaitTime(5) | |
| .setMaxRetryAttemptsOnThrottledRequests(2)) | |
| .buildClient(); | |
| ContainerProperties containerProperties = new ContainerProperties("test", "/id"); | |
| // Create database and container | |
| cosmosClient.createDatabaseIfNotExists(databaseName).block(); | |
| CosmosContainer container = cosmosClient.getDatabase(databaseName) | |
| .createContainerIfNotExists(containerProperties).block(); | |
| // Create a test object as a POJO or Map | |
| String itemId = UUID.randomUUID().toString(); | |
| TestItem testObject = new TestItem(itemId, "Microsoft", "WA"); | |
| container.createItem(testObject).block(); | |
| try { | |
| // Attempt to read the item | |
| CosmosItemResponse<TestItem> itemResponse = container.readItem( | |
| testObject.getId(), | |
| new PartitionKey(testObject.getId()), | |
| TestItem.class | |
| ).block(); | |
| // Print diagnostics | |
| System.out.println("Diagnostics:"); | |
| System.out.println(itemResponse.getDiagnostics().toString()); | |
| } catch (CosmosException ex) { | |
| // Handle other Cosmos exceptions | |
| System.out.println("CosmosException: " + ex.getStatusCode() + " - " + ex.getMessage()); | |
| System.out.println("Diagnostics:"); | |
| System.out.println(ex.getDiagnostics().toString()); | |
| } | |
| long hitCount = tooManyRequestsRule.getHitCount(); | |
| System.out.println("Total 429 responses: " + hitCount); | |
| } | |
| // Helper class for test item | |
| static class TestItem { | |
| private String id; | |
| private String company; | |
| private String state; | |
| public TestItem(String id, String company, String state) { | |
| this.id = id; | |
| this.company = company; | |
| this.state = state; | |
| } | |
| public String getId() { | |
| return id; | |
| } | |
| public String getCompany() { | |
| return company; | |
| } | |
| public String getState() { | |
| return state; | |
| } |
Description
Please add an informative description that covers that changes made by the pull request and link all relevant issues.
If an SDK is being regenerated based on a new swagger spec, a link to the pull request containing these swagger spec changes has been included above.
All SDK Contribution checklist:
General Guidelines and Best Practices
Testing Guidelines