Skip to content
Draft

Yield429 #47406

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions sdk/cosmos/azure-cosmos-test/src/main/java/module-info.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import com.azure.cosmos.*;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.test.faultinjection.*;
import org.apache.commons.lang3.Range;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

public class ConsistencyTestsStrong {

@Test(groups = {"direct"})
public void injectTooManyRequestsFaultAndVerify429Count(boolean shouldRegionScopedSessionContainerEnabled) throws InterruptedException {
string databaseName = "newdatabase";

// Create a fault injection rule for TooManyRequests (429) in direct mode
FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder(
id: "TooManyRequestsRule-" + Guid.NewGuid(),
condition: new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.ReadItem)
.Build(),
result: FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.TooManyRequests)
.Build())
.Build();
// inject 404/1002 into all regions
FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder("tooManyRequestsRule-" + UUID.randomUUID())
.condition(new FaultInjectionConditionBuilder())
.withOperationType(FaultInjectionOperationType.READ_ITEM)
.result(
FaultInjectionResultBuilders
.getResultBuilder(FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE)
.build())
.build();
// Initialize the fault injector
FaultInjector faultInjector = new FaultInjector(new List<FaultInjectionRule> { tooManyRequestsRule });
CosmosClient cosmosClient = new CosmosClientBuilder(
connectionString: "")
.WithConnectionModeDirect().WithFaultInjection(faultInjector)
.WithConsistencyLevel(Cosmos.ConsistencyLevel.Eventual)
.WithThrottlingRetryOptions(
maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries
maxRetryAttemptsOnThrottledRequests: 2) // Maximum retry attempts
.Build();
ContainerProperties containerProperties = new ContainerProperties(
id: "test",
partitionKeyPath: "/id");
// Create database and container
await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
Container container = await cosmosClient.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(containerProperties);
dynamic testObject = new
{
id = Guid.NewGuid().ToString(),
Company = "Microsoft",
State = "WA"

};


await container.CreateItemAsync<dynamic>(testObject);
try
{
// Attempt to read the item
ItemResponse<dynamic> itemResponse = await container.ReadItemAsync<dynamic>(
testObject.id,
new Cosmos.PartitionKey(testObject.id));

// Print diagnostics
Console.WriteLine("Diagnostics:");
Console.WriteLine(itemResponse.Diagnostics.ToString());
}

catch (CosmosException ex)
{
// Handle other Cosmos exceptions
Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}");
Console.WriteLine("Diagnostics:");
Console.WriteLine(ex.Diagnostics.ToString());
}
long hitCount = tooManyRequestsRule.GetHitCount();
Console.WriteLine($"Total 429 responses: {hitCount}");
Comment on lines +31 to +106
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test file contains syntax errors and uses C# syntax instead of Java:

  • Line 32: string databaseName should be String databaseName
  • Lines 36-42, 45-52: Invalid syntax mixing C# and Java
  • Line 55: new List<FaultInjectionRule> should be Java syntax
  • Lines 57-64: Using C# named parameters syntax (connectionString:, WithConnectionModeDirect())
  • Lines 73-74, 85-91, 94-95: await keyword is C# syntax (Java uses .block() for reactive code)
  • Line 78: Guid.NewGuid() is C#, should be UUID.randomUUID()
  • Lines 100-103: Console.WriteLine is C#, should be System.out.println or logger

This entire test file needs to be rewritten using proper Java syntax and Reactor patterns.

Suggested change
public void injectTooManyRequestsFaultAndVerify429Count(boolean shouldRegionScopedSessionContainerEnabled) throws InterruptedException {
string databaseName = "newdatabase";
// Create a fault injection rule for TooManyRequests (429) in direct mode
FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder(
id: "TooManyRequestsRule-" + Guid.NewGuid(),
condition: new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.ReadItem)
.Build(),
result: FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.TooManyRequests)
.Build())
.Build();
// inject 404/1002 into all regions
FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder("tooManyRequestsRule-" + UUID.randomUUID())
.condition(new FaultInjectionConditionBuilder())
.withOperationType(FaultInjectionOperationType.READ_ITEM)
.result(
FaultInjectionResultBuilders
.getResultBuilder(FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE)
.build())
.build();
// Initialize the fault injector
FaultInjector faultInjector = new FaultInjector(new List<FaultInjectionRule> { tooManyRequestsRule });
CosmosClient cosmosClient = new CosmosClientBuilder(
connectionString: "")
.WithConnectionModeDirect().WithFaultInjection(faultInjector)
.WithConsistencyLevel(Cosmos.ConsistencyLevel.Eventual)
.WithThrottlingRetryOptions(
maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(5), // Maximum wait time for retries
maxRetryAttemptsOnThrottledRequests: 2) // Maximum retry attempts
.Build();
ContainerProperties containerProperties = new ContainerProperties(
id: "test",
partitionKeyPath: "/id");
// Create database and container
await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
Container container = await cosmosClient.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(containerProperties);
dynamic testObject = new
{
id = Guid.NewGuid().ToString(),
Company = "Microsoft",
State = "WA"
};
await container.CreateItemAsync<dynamic>(testObject);
try
{
// Attempt to read the item
ItemResponse<dynamic> itemResponse = await container.ReadItemAsync<dynamic>(
testObject.id,
new Cosmos.PartitionKey(testObject.id));
// Print diagnostics
Console.WriteLine("Diagnostics:");
Console.WriteLine(itemResponse.Diagnostics.ToString());
}
catch (CosmosException ex)
{
// Handle other Cosmos exceptions
Console.WriteLine($"CosmosException: {ex.StatusCode} - {ex.Message}");
Console.WriteLine("Diagnostics:");
Console.WriteLine(ex.Diagnostics.ToString());
}
long hitCount = tooManyRequestsRule.GetHitCount();
Console.WriteLine($"Total 429 responses: {hitCount}");
public void injectTooManyRequestsFaultAndVerify429Count() throws InterruptedException {
String databaseName = "newdatabase";
// Create a fault injection rule for TooManyRequests (429) in direct mode
FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder("TooManyRequestsRule-" + UUID.randomUUID())
.condition(new FaultInjectionConditionBuilder()
.operationType(FaultInjectionOperationType.READ_ITEM)
.build())
.result(
FaultInjectionResultBuilders
.getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUESTS)
.build())
.build();
// Initialize the fault injector
List<FaultInjectionRule> rules = new ArrayList<>();
rules.add(tooManyRequestsRule);
FaultInjector faultInjector = new FaultInjector(rules);
CosmosClient cosmosClient = new CosmosClientBuilder()
.connectionString("") // TODO: Provide valid connection string
.directMode()
.faultInjection(faultInjector)
.consistencyLevel(ConsistencyLevel.EVENTUAL)
.throttlingRetryOptions(new ThrottlingRetryOptions()
.setMaxRetryWaitTime(5)
.setMaxRetryAttemptsOnThrottledRequests(2))
.buildClient();
ContainerProperties containerProperties = new ContainerProperties("test", "/id");
// Create database and container
cosmosClient.createDatabaseIfNotExists(databaseName).block();
CosmosContainer container = cosmosClient.getDatabase(databaseName)
.createContainerIfNotExists(containerProperties).block();
// Create a test object as a POJO or Map
String itemId = UUID.randomUUID().toString();
TestItem testObject = new TestItem(itemId, "Microsoft", "WA");
container.createItem(testObject).block();
try {
// Attempt to read the item
CosmosItemResponse<TestItem> itemResponse = container.readItem(
testObject.getId(),
new PartitionKey(testObject.getId()),
TestItem.class
).block();
// Print diagnostics
System.out.println("Diagnostics:");
System.out.println(itemResponse.getDiagnostics().toString());
} catch (CosmosException ex) {
// Handle other Cosmos exceptions
System.out.println("CosmosException: " + ex.getStatusCode() + " - " + ex.getMessage());
System.out.println("Diagnostics:");
System.out.println(ex.getDiagnostics().toString());
}
long hitCount = tooManyRequestsRule.getHitCount();
System.out.println("Total 429 responses: " + hitCount);
}
// Helper class for test item
static class TestItem {
private String id;
private String company;
private String state;
public TestItem(String id, String company, String state) {
this.id = id;
this.company = company;
this.state = state;
}
public String getId() {
return id;
}
public String getCompany() {
return company;
}
public String getState() {
return state;
}

Copilot uses AI. Check for mistakes.
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClientBuilder;
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unused import com.azure.cosmos.CosmosClientBuilder should be removed.

Suggested change
import com.azure.cosmos.CosmosClientBuilder;

Copilot uses AI. Check for mistakes.
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.FailureValidator;
Expand All @@ -21,11 +22,13 @@
import com.azure.cosmos.implementation.StoreResponseBuilder;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
import com.azure.cosmos.rx.TestSuiteBase;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
Comment on lines +25 to +31
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unused imports com.azure.cosmos.rx.TestSuiteBase and org.testng.annotations.Factory should be removed.

Suggested change
import com.azure.cosmos.rx.TestSuiteBase;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.testng.annotations.DataProvider;

Copilot uses AI. Check for mistakes.
import org.testng.annotations.Test;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Mono;
Expand All @@ -46,6 +49,7 @@
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static com.azure.cosmos.implementation.TestUtils.mockDocumentServiceRequest;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The when static import from Mockito is unused in the visible code (it appears only in the commented-out test). If the commented test is removed, this import should also be removed.

Suggested change
import static org.mockito.Mockito.when;

Copilot uses AI. Check for mistakes.

public class ConsistencyWriterTest {
private final static DiagnosticsClientContext clientContext = mockDiagnosticsClientContext();
Expand Down Expand Up @@ -77,6 +81,19 @@ public Object[][] storeResponseArgProvider() {
};
}

/*@Test(groups = "unit")
public void ConsistencyWriter_BarrierRetriesExhaustedWith429_Throws408Async() {
int barrierRequestCount = 0;

sessionContainer = Mockito.mock(ISessionContainer.class);

transportClient = Mockito.mock(TransportClient.class);
when(transportClient.invokeStoreAsync(
Mockito.any(Uri.class),
Mockito.any(RxDocumentServiceRequest.class))
.thenAnswer(i -> i.getArguments()[0]));
}*/

Comment on lines +84 to +96
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented-out code should be removed rather than left in the codebase. If this test is meant to be implemented later, create a separate issue or TODO comment explaining the intent.

Suggested change
/*@Test(groups = "unit")
public void ConsistencyWriter_BarrierRetriesExhaustedWith429_Throws408Async() {
int barrierRequestCount = 0;
sessionContainer = Mockito.mock(ISessionContainer.class);
transportClient = Mockito.mock(TransportClient.class);
when(transportClient.invokeStoreAsync(
Mockito.any(Uri.class),
Mockito.any(RxDocumentServiceRequest.class))
.thenAnswer(i -> i.getArguments()[0]));
}*/

Copilot uses AI. Check for mistakes.
@Test(groups = "unit", dataProvider = "exceptionArgProvider")
public void exception(Exception ex, Class<Exception> klass, int expectedStatusCode, Integer expectedSubStatusCode) {
TransportClientWrapper transportClientWrapper = new TransportClientWrapper.Builder.ReplicaResponseBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ public static class SubStatusCodes {
public static final int NO_VALID_STORE_RESPONSE = 21009;
public static final int SERVER_GENERATED_408 = 21010;
public static final int FAILED_TO_PARSE_SERVER_RESPONSE = 21011;
public static final int SERVER_WRITE_BARRIER_THROTTLED = 21013;
}

public static class HeaderValues {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
*/
public class RequestTimeoutException extends CosmosException {

/**
* Instantiates a new Request timeout exception.
*/
public RequestTimeoutException(int subStatusCode) {
this(RMResources.RequestTimeout, null, subStatusCode);
}
Comment on lines +20 to +25
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing JavaDoc for the new constructor. The constructor should document the subStatusCode parameter to explain its purpose and valid values.

Copilot uses AI. Check for mistakes.

/**
* Instantiates a new Request timeout exception.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.collections.ComparatorUtils;
import com.azure.cosmos.implementation.directconnectivity.rntbd.ClosedClientTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
Expand All @@ -46,6 +45,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -386,6 +386,7 @@ Mono<StoreResponse> barrierForGlobalStrong(RxDocumentServiceRequest request, Sto

private Mono<Boolean> waitForWriteBarrierAsync(RxDocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) {
AtomicInteger writeBarrierRetryCount = new AtomicInteger(ConsistencyWriter.MAX_NUMBER_OF_WRITE_BARRIER_READ_RETRIES);
AtomicBoolean lastAttemptWasThrottled = new AtomicBoolean(false);
AtomicLong maxGlobalCommittedLsnReceived = new AtomicLong(0);
return Flux.defer(() -> {
if (barrierRequest.requestContext.timeoutHelper.isElapsed()) {
Expand All @@ -403,8 +404,35 @@ private Mono<Boolean> waitForWriteBarrierAsync(RxDocumentServiceRequest barrierR
false /*forceReadAll*/);
return storeResultListObs.flatMap(
responses -> {
if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) {
return Mono.just(Boolean.TRUE);
//if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) {
// return Mono.just(Boolean.TRUE);
//}

Comment on lines +407 to +410
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented-out code should be removed. This appears to be dead code that was replaced by the new logic below. Keeping commented code reduces readability and maintainability.

Suggested change
//if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) {
// return Mono.just(Boolean.TRUE);
//}

Copilot uses AI. Check for mistakes.
if (responses != null) {
// Check if all replicas returned 429, but don't exit early.
if (responses.stream().count() > 0 && responses.stream().allMatch(response -> response.getStoreResponse().getStatus() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS)) {
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition responses.stream().count() > 0 is redundant. If you're calling allMatch() on an empty stream, it returns true (vacuous truth). Consider using !responses.isEmpty() for clarity and better performance, or rely on the allMatch() predicate alone if an empty response set should not be treated as throttled.

Suggested change
if (responses.stream().count() > 0 && responses.stream().allMatch(response -> response.getStoreResponse().getStatus() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS)) {
if (!responses.isEmpty() && responses.stream().allMatch(response -> response.getStoreResponse().getStatus() == HttpConstants.StatusCodes.TOO_MANY_REQUESTS)) {

Copilot uses AI. Check for mistakes.
if (logger.isDebugEnabled()) {
logger.debug("waitForWriteBarrierAsync: All replicas returned 429 Too Many Requests. Continuing retries." +
"StatusCode: {}, SubStatusCode: {}, PkRangeId :{}.",
responses.get(0).getStoreResponse().getStatus(),
responses.get(0).getStoreResponse().getSubStatusCode(),
responses.get(0).getStoreResponse().getPartitionKeyRangeId());
}
lastAttemptWasThrottled.set(true);
} else {
lastAttemptWasThrottled.set(false);
}

// Check if any response satisfies the barrier condition
if (responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) {
if (logger.isDebugEnabled()) {
logger.debug("ConsistencyWriter: WaitForWriteBarrierAsync - Barrier met. Responses: {}",
responses.stream().map(StoreResult::toString).collect(Collectors.joining("; ")));
}
return Mono.just(Boolean.TRUE);
}
} else {
lastAttemptWasThrottled.set(false);
}

//get max global committed lsn from current batch of responses, then update if greater than max of all batches.
Expand All @@ -416,6 +444,14 @@ private Mono<Boolean> waitForWriteBarrierAsync(RxDocumentServiceRequest barrierR
//only refresh on first barrier call, set to false for subsequent attempts.
barrierRequest.requestContext.forceRefreshAddressCache = false;

if (lastAttemptWasThrottled.get())
{
if (logger.isDebugEnabled()) {
logger.debug("ConsistencyWriter: Write barrier failed after all retries due to consistent throttling (429). Throwing RequestTimeoutException (408).");
}
throw new RequestTimeoutException(HttpConstants.SubStatusCodes.SERVER_WRITE_BARRIER_THROTTLED);
}

//get max global committed lsn from current batch of responses, then update if greater than max of all batches.
if (writeBarrierRetryCount.getAndDecrement() == 0) {
if (logger.isDebugEnabled() && responses != null) {
Expand Down
Loading
Loading