Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.TestObject;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.rx.TestSuiteBase;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Factory;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

import java.io.IOException;
import java.lang.reflect.Field;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;

@Ignore("MalformedResponseTests is safe to run in isolation as it leverages Reflection to override the deserializer.")
public class MalformedResponseTests extends TestSuiteBase {

@Factory(dataProvider = "clientBuildersWithSessionConsistency")
public MalformedResponseTests(CosmosClientBuilder clientBuilder) {
super(clientBuilder);
}

@BeforeSuite(groups = {"emulator"}, alwaysRun = true)
public void beforeSuite() {
System.setProperty("COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT", "IGNORE");
super.beforeSuite();
}

@Test(groups = { "emulator" })
public void validateCosmosExceptionThrownOnMalformedResponse() throws NoSuchFieldException, IllegalAccessException {

CosmosAsyncClient cosmosAsyncClient = null;
ObjectMapper originalMapper = null;

try {
cosmosAsyncClient = getClientBuilder()
.key(TestConfigurations.MASTER_KEY)
.endpoint(TestConfigurations.HOST)
.buildAsyncClient();
CosmosAsyncContainer cosmosAsyncContainer = getSharedSinglePartitionCosmosContainer(cosmosAsyncClient);

TestObject testObject = TestObject.create();
cosmosAsyncContainer.createItem(testObject).block();

Field field = Utils.class.getDeclaredField("simpleObjectMapper");
field.setAccessible(true);

// Save original
originalMapper = (ObjectMapper) field.get(null);

// Create a bad ObjectMapper
ObjectMapper badMapper = new FailingObjectMapper();
// Override
field.set(null, badMapper);

cosmosAsyncContainer.readItem(testObject.getId(), new PartitionKey(testObject.getMypk()), TestObject.class).block();
fail("The read operation should have failed");
} catch (CosmosException cosmosException) {
validate(cosmosException);
} catch (IllegalAccessException e) {
fail("An IllegalAccessException shouldn't have occurred", e);
} finally {
// Restore original
Field field = Utils.class.getDeclaredField("simpleObjectMapper");
field.setAccessible(true);
field.set(null, originalMapper);
field.setAccessible(false);
}
}

@Test(groups = {"emulator"})
public void validateCosmosExceptionThrownOnMalformedResponseWhenFallbackDecoderSet() throws NoSuchFieldException, IllegalAccessException {

CosmosAsyncClient cosmosAsyncClient = null;
ObjectMapper originalMapper = null;

try {

cosmosAsyncClient = getClientBuilder()
.key(TestConfigurations.MASTER_KEY)
.endpoint(TestConfigurations.HOST)
.buildAsyncClient();
CosmosAsyncContainer cosmosAsyncContainer = getSharedSinglePartitionCosmosContainer(cosmosAsyncClient);

TestObject testObject = TestObject.create();
cosmosAsyncContainer.createItem(testObject).block();

Field field = Utils.class.getDeclaredField("simpleObjectMapper");
field.setAccessible(true);

// Save original
originalMapper = (ObjectMapper) field.get(null);

// Create a bad ObjectMapper
ObjectMapper badMapper = new FailingObjectMapper();
// Override
field.set(null, badMapper);

cosmosAsyncContainer.readItem(testObject.getId(), new PartitionKey(testObject.getMypk()), TestObject.class).block();
fail("The read operation should have failed");
} catch (CosmosException cosmosException) {
validate(cosmosException);
} catch (IllegalAccessException e) {
fail("An IllegalAccessException shouldn't have occurred", e);
} finally {
// Restore original
Field field = Utils.class.getDeclaredField("simpleObjectMapper");
field.setAccessible(true);
field.set(null, originalMapper);
field.setAccessible(false);
}
}

private class FailingObjectMapper extends ObjectMapper {
@Override
public JsonNode readTree(byte[] bytes) throws IOException {
throw new IOException("Simulated failure");
}

@Override
public JsonNode readTree(String content) throws JsonProcessingException {
throw new JsonParseException("Simulated failure");
}
}

@AfterSuite(groups = {"emulator"}, alwaysRun = true)
public void afterSuite() {
System.setProperty("COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT", "IGNORE");
super.afterSuite();
}

private static void validate(CosmosException cosmosException) {
assertThat(cosmosException.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR);
assertThat(cosmosException.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE);
assertThat(cosmosException.getDiagnostics()).isNotNull();
assertThat(cosmosException.getResponseHeaders()).isNotNull();
assertThat(cosmosException.getResponseHeaders()).isNotEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

import java.util.HashMap;

public class JsonNodeStorePayloadTests {
@Test(groups = {"unit"})
@Ignore("fallbackCharsetDecoder will only be initialized during the first time when JsonNodeStorePayload loaded," +
Expand All @@ -26,7 +28,7 @@ public void parsingBytesWithInvalidUT8Bytes() {
try {
byte[] bytes = hexStringToByteArray(invalidHexString);
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
JsonNodeStorePayload jsonNodeStorePayload = new JsonNodeStorePayload(new ByteBufInputStream(byteBuf), bytes.length);
JsonNodeStorePayload jsonNodeStorePayload = new JsonNodeStorePayload(new ByteBufInputStream(byteBuf), bytes.length, new HashMap<>());
jsonNodeStorePayload.getPayload().toString();
} finally {
System.clearProperty("COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ public class Configs {
private static final String HTTP2_MAX_CONCURRENT_STREAMS = "COSMOS.HTTP2_MAX_CONCURRENT_STREAMS";
private static final String HTTP2_MAX_CONCURRENT_STREAMS_VARIABLE = "COSMOS_HTTP2_MAX_CONCURRENT_STREAMS";

private static final boolean DEFAULT_SHOULD_LOG_NON_PARSEABLE_PAYLOADS = false;
private static final String SHOULD_LOG_NON_PARSEABLE_PAYLOADS = "COSMOS.SHOULD_LOG_NON_PARSEABLE_PAYLOADS";
private static final String SHOULD_LOG_NON_PARSEABLE_PAYLOADS_VARIABLE = "COSMOS_SHOULD_LOG_NON_PARSEABLE_PAYLOADS";

public static final String APPLICATIONINSIGHTS_CONNECTION_STRING = "applicationinsights.connection.string";
public static final String APPLICATIONINSIGHTS_CONNECTION_STRING_VARIABLE = "APPLICATIONINSIGHTS_CONNECTION_STRING";

Expand Down Expand Up @@ -1240,4 +1244,14 @@ public static EnumSet<AttributeNamingScheme> getDefaultOtelSpanAttributeNamingSc

return AttributeNamingScheme.parse(DEFAULT_OTEL_SPAN_ATTRIBUTE_NAMING_SCHEME);
}

public static boolean isNonParseableDocumentLoggingEnabled() {
String isNonParseableDocumentLoggingEnabledAsString = System.getProperty(
SHOULD_LOG_NON_PARSEABLE_PAYLOADS,
firstNonNull(
emptyToNull(System.getenv().get(SHOULD_LOG_NON_PARSEABLE_PAYLOADS_VARIABLE)),
String.valueOf(DEFAULT_SHOULD_LOG_NON_PARSEABLE_PAYLOADS)));

return Boolean.parseBoolean(isNonParseableDocumentLoggingEnabledAsString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ public static class SubStatusCodes {
public static final int SERVER_GENERATED_503 = 21008;
public static final int NO_VALID_STORE_RESPONSE = 21009;
public static final int SERVER_GENERATED_408 = 21010;
public static final int FAILED_TO_PARSE_SERVER_RESPONSE = 21011;
}

public static class HeaderValues {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,18 +475,19 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
if (!(unwrappedException instanceof Exception)) {
// fatal error
logger.error("Unexpected failure {}", unwrappedException.getMessage(), unwrappedException);
logger.error("Unexpected failure " + unwrappedException.getMessage(), unwrappedException);
return Mono.error(unwrappedException);
}

Exception exception = (Exception) unwrappedException;
CosmosException dce;
if (!(exception instanceof CosmosException)) {
// wrap in CosmosException
logger.warn("Network failure", exception);

int statusCode = 0;
if (WebExceptionUtility.isNetworkFailure(exception)) {

// wrap in CosmosException
logger.warn("Network failure", exception);

if (WebExceptionUtility.isReadTimeoutException(exception)) {
statusCode = HttpConstants.StatusCodes.REQUEST_TIMEOUT;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.CosmosItemSerializer;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
Expand Down Expand Up @@ -44,7 +45,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -814,4 +814,19 @@ public static Duration min(Duration duration1, Duration duration2) {
return duration1.compareTo(duration2) < 0 ? duration1 : duration2;
}
}

public static CosmosException createCosmosException(int statusCode, int substatusCode, Exception nestedException, Map<String, String> responseHeaders) {

// TODO: Review adding resource address
CosmosException exceptionToThrow = BridgeInternal.createCosmosException(
nestedException.getMessage(),
nestedException,
responseHeaders,
statusCode,
Strings.Emtpy);

BridgeInternal.setSubStatusCode(exceptionToThrow, substatusCode);

return exceptionToThrow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.Utils;
import com.fasterxml.jackson.databind.JsonNode;
import io.netty.buffer.ByteBufInputStream;
Expand All @@ -16,49 +17,79 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;

public class JsonNodeStorePayload implements StorePayload<JsonNode> {
private static final Logger logger = LoggerFactory.getLogger(JsonNodeStorePayload.class);
private static final CharsetDecoder fallbackCharsetDecoder = getFallbackCharsetDecoder();
private final int responsePayloadSize;
private final JsonNode jsonValue;

public JsonNodeStorePayload(ByteBufInputStream bufferStream, int readableBytes) {
public JsonNodeStorePayload(ByteBufInputStream bufferStream, int readableBytes, Map<String, String> responseHeaders) {
if (readableBytes > 0) {
this.responsePayloadSize = readableBytes;
this.jsonValue = fromJson(bufferStream, readableBytes);
this.jsonValue = fromJson(bufferStream, readableBytes, responseHeaders);
} else {
this.responsePayloadSize = 0;
this.jsonValue = null;
}
}

private static JsonNode fromJson(ByteBufInputStream bufferStream, int readableBytes) {
private static JsonNode fromJson(ByteBufInputStream bufferStream, int readableBytes, Map<String, String> responseHeaders) {
byte[] bytes = new byte[readableBytes];
try {
bufferStream.read(bytes);
return Utils.getSimpleObjectMapper().readTree(bytes);
} catch (IOException e) {
if (fallbackCharsetDecoder != null) {
logger.warn("Unable to parse JSON, fallback to use customized charset decoder.", e);
return fromJsonWithFallbackCharsetDecoder(bytes);
return fromJsonWithFallbackCharsetDecoder(bytes, responseHeaders);
} else {
throw new IllegalStateException("Unable to parse JSON.", e);

if (Configs.isNonParseableDocumentLoggingEnabled()) {
String documentSample = Base64.getEncoder().encodeToString(bytes);
logger.error("Failed to parse JSON document. No customized charset decoder configured. Document in Base64 format: [" + documentSample + "]", e);
} else {
logger.error("Failed to parse JSON document. No customized charset decoder configured.");
}

IllegalStateException innerException = new IllegalStateException("Unable to parse JSON.", e);

throw Utils.createCosmosException(
HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I was inclined to ask for changing this to 400 Bad Request - indicating that application is responsible for invalid json. That said, service should validate json validity before even storing it - so, 500 also seems reasonable. Worth discussing a bit more broadly IMO.

HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE,
innerException,
responseHeaders);
}
}
}

private static JsonNode fromJsonWithFallbackCharsetDecoder(byte[] bytes) {
private static JsonNode fromJsonWithFallbackCharsetDecoder(byte[] bytes, Map<String, String> responseHeaders) {
try {
String sanitizedJson = fallbackCharsetDecoder.decode(ByteBuffer.wrap(bytes)).toString();
return Utils.getSimpleObjectMapper().readTree(sanitizedJson);
} catch (IOException e) {
throw new IllegalStateException(

if (Configs.isNonParseableDocumentLoggingEnabled()) {
String documentSample = Base64.getEncoder().encodeToString(bytes);
logger.error("Failed to parse JSON document even after applying fallback charset decoder. Document in Base64 format: [" + documentSample + "]", e);
} else {
logger.error("Failed to parse JSON document even after applying fallback charset decoder.");
}

Exception nestedException = new IllegalStateException(
String.format(
"Unable to parse JSON with fallback charset decoder[OnMalformedInput %s, OnUnmappedCharacter %s]",
Configs.getCharsetDecoderErrorActionOnMalformedInput(),
Configs.getCharsetDecoderErrorActionOnUnmappedCharacter()),
e);

throw Utils.createCosmosException(
HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR,
HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE,
nestedException,
responseHeaders);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public StoreResponse(
replicaStatusList = new HashMap<>();
if (contentStream != null) {
try {
this.responsePayload = new JsonNodeStorePayload(contentStream, responsePayloadLength);
this.responsePayload = new JsonNodeStorePayload(contentStream, responsePayloadLength, headerMap);
}
finally {
try {
Expand Down