Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.TestObject;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.rx.TestSuiteBase;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Factory;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

import java.io.IOException;
import java.lang.reflect.Field;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;

@Ignore("MalformedResponseTests is only safe to run in isolation as it leverages Reflection to override the ObjectMapper instance responsible for deserialization.")
public class MalformedResponseTests extends TestSuiteBase {

@Factory(dataProvider = "clientBuildersWithSessionConsistency")
public MalformedResponseTests(CosmosClientBuilder clientBuilder) {
super(clientBuilder);
}

@BeforeSuite(groups = {"emulator"})
public void beforeSuite() {
System.setProperty("COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT", "IGNORE");
System.setProperty("COSMOS.IS_NON_PARSEABLE_DOCUMENT_LOGGING_ENABLED", "true");
super.beforeSuite();
}

/**
* Validate that a CosmosException is thrown with the appropriate status code and sub-status code
* when the response from the server is malformed and cannot be deserialized
* and fallback decoder is set / not set
* <p>
* NOTE: Run this test with MalformedResponseTests#beforeSuite and MalformedResponseTests#afterSuite commented out for no fallback decoder.
* NOTE: Run this test with MalformedResponseTests#beforeSuite and MalformedResponseTests#afterSuite enabled for fallback decoder.
* */
@Test(groups = { "emulator" })
public void validateCosmosExceptionThrownOnMalformedResponse() throws NoSuchFieldException, IllegalAccessException {

CosmosAsyncClient cosmosAsyncClient = null;
ObjectMapper originalMapper = null;

try {
cosmosAsyncClient = getClientBuilder()
.key(TestConfigurations.MASTER_KEY)
.endpoint(TestConfigurations.HOST)
.buildAsyncClient();
CosmosAsyncContainer cosmosAsyncContainer = getSharedSinglePartitionCosmosContainer(cosmosAsyncClient);

TestObject testObject = TestObject.create();
cosmosAsyncContainer.createItem(testObject).block();

Field field = Utils.class.getDeclaredField("simpleObjectMapper");
field.setAccessible(true);

// Save original
originalMapper = (ObjectMapper) field.get(null);

// Create a bad ObjectMapper
ObjectMapper badMapper = new FailingObjectMapper();
// Override
field.set(null, badMapper);

cosmosAsyncContainer.readItem(testObject.getId(), new PartitionKey(testObject.getMypk()), TestObject.class).block();
fail("The read operation should have failed");
} catch (CosmosException cosmosException) {
assertThat(cosmosException.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR);
assertThat(cosmosException.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE);
assertThat(cosmosException.getDiagnostics()).isNotNull();
assertThat(cosmosException.getResponseHeaders()).isNotNull();
assertThat(cosmosException.getResponseHeaders()).isNotEmpty();
} catch (IllegalAccessException e) {
fail("An IllegalAccessException shouldn't have occurred", e);
} finally {
// Restore original
Field field = Utils.class.getDeclaredField("simpleObjectMapper");
field.setAccessible(true);
field.set(null, originalMapper);
field.setAccessible(false);

safeClose(cosmosAsyncClient);
}
}

private class FailingObjectMapper extends ObjectMapper {
@Override
public JsonNode readTree(byte[] bytes) throws IOException {
throw new IOException("Simulated failure");
}

@Override
public JsonNode readTree(String content) throws JsonProcessingException {
throw new JsonParseException("Simulated failure");
}
}

@AfterSuite(groups = {"emulator"})
public void afterSuite() {
System.clearProperty("COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT");
System.clearProperty("COSMOS.IS_NON_PARSEABLE_DOCUMENT_LOGGING_ENABLED");
super.afterSuite();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
Expand All @@ -22,14 +25,18 @@
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.ChangeFeedProcessorItem;
import com.azure.cosmos.models.FeedResponse;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.UUID;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
Expand All @@ -54,7 +61,7 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges(
boolean shouldDoCheckpoint) {
ChangeFeedObserver<ChangeFeedProcessorItem> observerMock = Mockito.mock(ChangeFeedObserver.class);
ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);

// Setup initial state with continuation token
ChangeFeedStateV1 initialChangeFeedState = this.getChangeFeedStateWithContinuationTokens(1);

Expand Down Expand Up @@ -117,7 +124,6 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges(
}
}


@Test(groups = "unit")
public void partitionSplitHappenOnFirstRequest() {
@SuppressWarnings("unchecked") ChangeFeedObserver<ChangeFeedProcessorItem> observerMock =
Expand Down Expand Up @@ -161,6 +167,70 @@ public void partitionSplitHappenOnFirstRequest() {
assertThat(feedRangeGoneException.getLastContinuation()).isEqualTo(leaseMock.getContinuationToken());
}

@Test(groups = "unit")
public void partitionProcessingErrorWhenInternalServerErrorIsHit() {

CosmosException parsingException = BridgeInternal.createCosmosException(
"A parsing error occurred.",
new IOException("An error occurred."),
new HashMap<>(),
HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR,
null);

BridgeInternal.setSubStatusCode(parsingException, HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE);

ChangeFeedObserver<ChangeFeedProcessorItem> observerMock = Mockito.mock(ChangeFeedObserver.class);
ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);

// Setup initial state with continuation token
ChangeFeedStateV1 initialChangeFeedState = this.getChangeFeedStateWithContinuationTokens(1);

CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
ProcessorSettings processorSettings = new ProcessorSettings(initialChangeFeedState, containerMock);
processorSettings.withMaxItemCount(10);

// Setup lease and checkpointer mocks
Lease leaseMock = Mockito.mock(ServiceItemLeaseV1.class);
Mockito.when(leaseMock.getContinuationToken()).thenReturn(initialChangeFeedState.toString());

PartitionCheckpointer partitionCheckpointer = Mockito.mock(PartitionCheckpointerImpl.class);

String lastContinuationToken = initialChangeFeedState.toString();

// Setup change feed request to throw parsing exception
Mockito
.when(changeFeedContextClientMock.createDocumentChangeFeedQuery(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(Flux.error(parsingException));

// Checkpointing mock setup
final ChangeFeedState continuationState = ChangeFeedState.fromString(lastContinuationToken);
Mockito.when(partitionCheckpointer.checkpointPartition(continuationState))
.thenReturn(Mono.empty());

// Create processor
PartitionProcessorImpl<ChangeFeedProcessorItem> partitionProcessor = new PartitionProcessorImpl<>(
observerMock,
changeFeedContextClientMock,
processorSettings,
partitionCheckpointer,
leaseMock,
ChangeFeedProcessorItem.class,
ChangeFeedMode.INCREMENTAL,
null);

StepVerifier
.create(partitionProcessor.run(new CancellationTokenSource().getToken()))
.verifyComplete();

// Verify that the PartitionProcessorImpl completed with the expected parsing exception
RuntimeException runtimeException = partitionProcessor.getResultException();
assertThat(runtimeException).isNotNull();
assertThat(runtimeException.getCause()).isInstanceOf(CosmosException.class);
CosmosException cosmosException = (CosmosException) runtimeException.getCause();
assertThat(cosmosException.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR);
assertThat(cosmosException.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE);
}

private ChangeFeedStateV1 getChangeFeedStateWithContinuationTokens(int tokenCount) {
String containerRid = "/cols/" + UUID.randomUUID();
String pkRangeId = UUID.randomUUID().toString();
Expand All @@ -184,7 +254,7 @@ private ChangeFeedStateV1 getChangeFeedStateWithContinuationTokens(int tokenCoun

continuationBuilder.append("],")
.append("\"PKRangeId\":\"").append(pkRangeId).append("\"}");

String continuationJson = continuationBuilder.toString();

FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

package com.azure.cosmos.implementation.changefeed.pkversion;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
Expand All @@ -16,10 +19,12 @@
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1;
import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.ChangeFeedProcessorItem;
import com.azure.cosmos.models.FeedResponse;
import com.fasterxml.jackson.databind.JsonNode;
import org.mockito.Mockito;
Expand All @@ -29,7 +34,9 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.UUID;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
Expand Down Expand Up @@ -159,6 +166,62 @@ public void partitionSplitHappenOnFirstRequest() {
assertThat(feedRangeGoneException.getLastContinuation()).isEqualTo(leaseMock.getContinuationToken());
}

@Test(groups = {"unit"})
public void partitionProcessingErrorWhenInternalServerErrorIsHit() {

CosmosException parsingException = BridgeInternal.createCosmosException(
"A parsing error occurred.",
new IOException("An error occurred."),
new HashMap<>(),
HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR,
null);

BridgeInternal.setSubStatusCode(parsingException, HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE);

@SuppressWarnings("unchecked") ChangeFeedObserver<JsonNode> observerMock =
(ChangeFeedObserver<JsonNode>) Mockito.mock(ChangeFeedObserver.class);
ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);

// Setup change feed request to throw parsing exception
Mockito
.when(changeFeedContextClientMock.createDocumentChangeFeedQuery(Mockito.any(), Mockito.any(),
Mockito.any()))
.thenReturn(Flux.error(parsingException));

ChangeFeedState changeFeedState = this.getChangeFeedStateWithContinuationTokens(2);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
ProcessorSettings processorSettings = new ProcessorSettings(changeFeedState, containerMock);
processorSettings.withMaxItemCount(10);

Lease leaseMock = Mockito.mock(ServiceItemLease.class);
Mockito
.when(leaseMock.getContinuationToken())
.thenReturn(changeFeedState.getContinuation().getCurrentContinuationToken().getToken());

LeaseCheckpointer leaseCheckpointerMock = Mockito.mock(LeaseCheckpointer.class);
PartitionCheckpointer partitionCheckpointer = new PartitionCheckpointerImpl(leaseCheckpointerMock, leaseMock);

PartitionProcessorImpl partitionProcessor = new PartitionProcessorImpl(
observerMock,
changeFeedContextClientMock,
processorSettings,
partitionCheckpointer,
leaseMock,
null
);

StepVerifier.create(partitionProcessor.run(new CancellationTokenSource().getToken()))
.verifyComplete();

// Verify that the PartitionProcessorImpl completed with the expected parsing exception
RuntimeException runtimeException = partitionProcessor.getResultException();
assertThat(runtimeException).isNotNull();
assertThat(runtimeException.getCause()).isInstanceOf(CosmosException.class);
CosmosException cosmosException = (CosmosException) runtimeException.getCause();
assertThat(cosmosException.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR);
assertThat(cosmosException.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE);
}

private ChangeFeedStateV1 getChangeFeedStateWithContinuationTokens(int tokenCount) {
String containerRid = "/cols/" + UUID.randomUUID();
String pkRangeId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

import java.util.HashMap;

public class JsonNodeStorePayloadTests {
@Test(groups = {"unit"})
@Ignore("fallbackCharsetDecoder will only be initialized during the first time when JsonNodeStorePayload loaded," +
Expand All @@ -26,7 +28,7 @@ public void parsingBytesWithInvalidUT8Bytes() {
try {
byte[] bytes = hexStringToByteArray(invalidHexString);
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
JsonNodeStorePayload jsonNodeStorePayload = new JsonNodeStorePayload(new ByteBufInputStream(byteBuf), bytes.length);
JsonNodeStorePayload jsonNodeStorePayload = new JsonNodeStorePayload(new ByteBufInputStream(byteBuf), bytes.length, new HashMap<>());
jsonNodeStorePayload.getPayload().toString();
} finally {
System.clearProperty("COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT");
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* Changed timestamp format to be consistent in leases for CFP. - [PR 46784](https://github.com/Azure/azure-sdk-for-java/pull/46784)
* Added `MetadataThrottlingRetryPolicy` for `PartitionKeyRange` `RequestRateTooLargeException` handling. - [PR 46823](https://github.com/Azure/azure-sdk-for-java/pull/46823)
* Ensure effective `DirectConnectionConfig#setNetworkRequestTimeout` is set to at least 5 seconds. - [PR 47024](https://github.com/Azure/azure-sdk-for-java/pull/47024)
* Wrap JSON parsing exceptions as `CosmosException` to provide better context. - [PR 47040](https://github.com/Azure/azure-sdk-for-java/pull/47040)

### 4.74.0 (2025-09-05)

Expand Down
Loading