diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsE2ETest.java index 8bd8b741ac84..3b85b3220c85 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsE2ETest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosDiagnosticsE2ETest.java @@ -5,20 +5,28 @@ import com.azure.core.util.Context; import com.azure.cosmos.implementation.ConsoleLoggingRegistryFactory; +import com.azure.cosmos.implementation.Exceptions; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.CosmosClientTelemetryConfig; +import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosMicrometerMetricsOptions; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.rx.TestSuiteBase; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.json.JsonReadFeature; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.micrometer.core.instrument.MeterRegistry; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; @@ -54,6 +62,17 @@ public void afterClass() { this.client.close(); } + @DataProvider + public static Object[][] operationTypeProvider() { + return new Object[][]{ + { OperationType.Read }, + { OperationType.Replace }, + { OperationType.Create }, + { OperationType.Delete }, + { OperationType.Query } + }; + } + public String resolveTestNameSuffix(Object[] row) { return ""; } @@ -179,12 +198,13 @@ public void defaultLoggerAndMetrics() { // with custom appender } - @Test(groups = { "simple", "emulator" }, timeOut = TIMEOUT) - public void delayedSampling() { + @Test(groups = { "simple", "emulator" }, dataProvider = "operationTypeProvider", timeOut = TIMEOUT) + public void delayedSampling(OperationType operationType) { MeterRegistry meterRegistry = ConsoleLoggingRegistryFactory.create(1); + CapturingLogger capturingLogger = new CapturingLogger(); CosmosClientTelemetryConfig clientTelemetryCfg = new CosmosClientTelemetryConfig() - .diagnosticsHandler(CosmosDiagnosticsHandler.DEFAULT_LOGGING_HANDLER) + .diagnosticsHandler(capturingLogger) .metricsOptions(new CosmosMicrometerMetricsOptions().meterRegistry(meterRegistry)); CosmosClientBuilder builder = this @@ -197,17 +217,28 @@ public void delayedSampling() { meterRegistry.clear(); meterRegistry.close(); + String id = UUID.randomUUID().toString(); + ObjectNode newItem = getDocumentDefinition(id); + container.createItem(newItem); + // change sample rate to 25% clientTelemetryCfg.sampleDiagnostics(0.25); - executeTestCase(container); + executeDocumentOperation(container, operationType, id, newItem); + int loggedMessageSizeBefore = capturingLogger.getLoggedMessages().size(); // reduce sample rate to 0 - disable all diagnostics clientTelemetryCfg.sampleDiagnostics(0); - executeTestCase(container); + executeDocumentOperation(container, operationType, id, newItem); + int loggedMessageSizeAfter = capturingLogger.getLoggedMessages().size(); + // verify when sample rate is 0, the diagnostics will not be logged + assertThat(loggedMessageSizeBefore).isEqualTo(loggedMessageSizeAfter); // set sample rate to 1 - enable all diagnostics (no sampling anymore) clientTelemetryCfg.sampleDiagnostics(1); - executeTestCase(container); + executeDocumentOperation(container, operationType, id, newItem); + loggedMessageSizeAfter = capturingLogger.getLoggedMessages().size(); + // Verify when sample rate is 1, the diagnostics will be logged + assertThat(loggedMessageSizeBefore + 1).isEqualTo(loggedMessageSizeAfter); // no assertions here - invocations for diagnostics handler are validated above // log4j event logging isn't validated in general in unit tests because it is too brittle to do so @@ -232,6 +263,49 @@ public void defaultLoggerWithLegacyOpenTelemetryTraces() { System.setProperty("COSMOS.USE_LEGACY_TRACING", "false"); } + private void executeDocumentOperation( + CosmosContainer cosmosContainer, + OperationType operationType, + String createdItemId, + ObjectNode createdItem) { + switch (operationType) { + case Query: + String query = String.format("SELECT * from c"); + CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions(); + queryRequestOptions.setFeedRange(FeedRange.forLogicalPartition(new PartitionKey(createdItemId))); + Iterable> results = cosmosContainer.queryItems(query, queryRequestOptions, JsonNode.class).iterableByPage(); + results.forEach(t -> {}); + break; + case ReadFeed: + CosmosChangeFeedRequestOptions changeFeedRequestOptions = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forFullRange()); + cosmosContainer.queryChangeFeed(changeFeedRequestOptions, JsonNode.class).iterableByPage(); + break; + case Read: + cosmosContainer + .readItem(createdItemId, new PartitionKey(createdItemId), JsonNode.class); + break; + case Replace: + cosmosContainer + .replaceItem(createdItem, createdItemId, new PartitionKey(createdItemId), new CosmosItemRequestOptions()); + break; + case Delete: + try { + cosmosContainer.deleteItem(getDocumentDefinition(UUID.randomUUID().toString()), new CosmosItemRequestOptions()); + } catch (CosmosException e) { + if (!Exceptions.isNotFound(e)) { + throw e; + } + } + break; + case Create: + cosmosContainer.createItem(getDocumentDefinition(UUID.randomUUID().toString())); + break; + default: + throw new IllegalArgumentException("The operation type is not supported"); + } + } + private void executeTestCase(CosmosContainer container) { String id = UUID.randomUUID().toString(); CosmosItemResponse response = container.createItem( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsProvider.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsProvider.java index 539c36fb59ab..67d9a563f6e8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsProvider.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsProvider.java @@ -600,18 +600,14 @@ public Mono> traceEnabledCosmosItemResponsePublisher( * @param publisher publisher to run. * @return wrapped publisher. */ - public Flux runUnderSpanInContext(Flux publisher, CosmosPagedFluxOptions options) { + public Flux runUnderSpanInContext(Flux publisher) { + return propagatingFlux.flatMap(ignored -> publisher); + } + public boolean shouldSampleOutOperation(CosmosPagedFluxOptions options) { final double samplingRateSnapshot = clientTelemetryConfigAccessor.getSamplingRate(this.telemetryConfig); - options.setSamplingRateSnapshot(samplingRateSnapshot); - - if (shouldSampleOutOperation(samplingRateSnapshot)) { - return publisher; - } - - return propagatingFlux - .flatMap(ignored -> publisher); + return shouldSampleOutOperation(samplingRateSnapshot); } private boolean shouldSampleOutOperation(double samplingRate) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java index a462d661033a..3d511a13d6fb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java @@ -145,7 +145,11 @@ private CosmosPagedFluxOptions createCosmosPagedFluxOptions() { return cosmosPagedFluxOptions; } - private Flux wrapWithTracingIfEnabled(CosmosPagedFluxOptions pagedFluxOptions, Flux publisher) { + private Flux> wrapWithTracingIfEnabled( + CosmosPagedFluxOptions pagedFluxOptions, + Flux> publisher, + AtomicLong feedResponseConsumerLatencyInNanos, + Context context) { DiagnosticsProvider tracerProvider = pagedFluxOptions.getDiagnosticsProvider(); if (tracerProvider == null || !tracerProvider.isEnabled()) { @@ -153,7 +157,82 @@ private Flux wrapWithTracingIfEnabled(CosmosPagedFluxOptions return publisher; } - return tracerProvider.runUnderSpanInContext(publisher, pagedFluxOptions); + if (tracerProvider.shouldSampleOutOperation(pagedFluxOptions)) { + return publisher; + } + + final CosmosDiagnosticsContext cosmosCtx = ctxAccessor.create( + pagedFluxOptions.getSpanName(), + pagedFluxOptions.getAccountTag(), + BridgeInternal.getServiceEndpoint(pagedFluxOptions.getCosmosAsyncClient()), + pagedFluxOptions.getDatabaseId(), + pagedFluxOptions.getContainerId(), + pagedFluxOptions.getResourceType(), + pagedFluxOptions.getOperationType(), + pagedFluxOptions.getOperationId(), + pagedFluxOptions.getEffectiveConsistencyLevel(), + pagedFluxOptions.getMaxItemCount(), + pagedFluxOptions.getDiagnosticsThresholds(), + null, + pagedFluxOptions.getConnectionMode(), + pagedFluxOptions.getUserAgent()); + ctxAccessor.setSamplingRateSnapshot(cosmosCtx, pagedFluxOptions.getSamplingRateSnapshot()); + + Flux> result = tracerProvider + .runUnderSpanInContext(publisher) + .doOnEach(signal -> { + FeedResponse response = signal.get(); + Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(signal.getContextView()); + switch (signal.getType()) { + case ON_COMPLETE: + this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos); + + tracerProvider.recordFeedResponseConsumerLatency( + signal, + Duration.ofNanos(feedResponseConsumerLatencyInNanos.get())); + + tracerProvider.endSpan(traceCtx); + + break; + case ON_NEXT: + this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos); + + break; + + case ON_ERROR: + tracerProvider.recordFeedResponseConsumerLatency( + signal, + Duration.ofNanos(feedResponseConsumerLatencyInNanos.get())); + + // all info is extracted from CosmosException when applicable + tracerProvider.endSpan( + traceCtx, + signal.getThrowable() + ); + + break; + + default: + break; + } + }); + + return Flux + .deferContextual(reactorCtx -> result + .doOnCancel(() -> { + Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx); + tracerProvider.endSpan(traceCtx); + }) + .doOnComplete(() -> { + Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx); + tracerProvider.endSpan(traceCtx); + })) + .contextWrite( + DiagnosticsProvider.setContextInReactor( + tracerProvider.startSpan( + pagedFluxOptions.getSpanName(), + cosmosCtx, + context))); } private void recordFeedResponse( @@ -203,90 +282,15 @@ private Flux> byPage(CosmosPagedFluxOptions pagedFluxOptions, Co AtomicReference startTime = new AtomicReference<>(); AtomicLong feedResponseConsumerLatencyInNanos = new AtomicLong(0); - Flux> result = - wrapWithTracingIfEnabled( - pagedFluxOptions, this.optionsFluxFunction.apply(pagedFluxOptions)) - .doOnSubscribe(ignoredValue -> { - startTime.set(Instant.now()); - feedResponseConsumerLatencyInNanos.set(0); - }) - .doOnEach(signal -> { - - FeedResponse response = signal.get(); - Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(signal.getContextView()); - DiagnosticsProvider tracerProvider = pagedFluxOptions.getDiagnosticsProvider(); - switch (signal.getType()) { - case ON_COMPLETE: - this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos); - - if (isTracerEnabled(tracerProvider)) { - tracerProvider.recordFeedResponseConsumerLatency( - signal, - Duration.ofNanos(feedResponseConsumerLatencyInNanos.get())); - - tracerProvider.endSpan(traceCtx); - } - - break; - case ON_NEXT: - this.recordFeedResponse(pagedFluxOptions, traceCtx, tracerProvider, response, feedResponseConsumerLatencyInNanos); - - break; - - case ON_ERROR: - if (isTracerEnabled(tracerProvider)) { - tracerProvider.recordFeedResponseConsumerLatency( - signal, - Duration.ofNanos(feedResponseConsumerLatencyInNanos.get())); - - // all info is extracted from CosmosException when applicable - tracerProvider.endSpan( - traceCtx, - signal.getThrowable() - ); - } - - break; - - default: - break; - }}); - - - final DiagnosticsProvider tracerProvider = pagedFluxOptions.getDiagnosticsProvider(); - final CosmosDiagnosticsContext cosmosCtx = ctxAccessor.create( - pagedFluxOptions.getSpanName(), - pagedFluxOptions.getAccountTag(), - BridgeInternal.getServiceEndpoint(pagedFluxOptions.getCosmosAsyncClient()), - pagedFluxOptions.getDatabaseId(), - pagedFluxOptions.getContainerId(), - pagedFluxOptions.getResourceType(), - pagedFluxOptions.getOperationType(), - pagedFluxOptions.getOperationId(), - pagedFluxOptions.getEffectiveConsistencyLevel(), - pagedFluxOptions.getMaxItemCount(), - pagedFluxOptions.getDiagnosticsThresholds(), - null, - pagedFluxOptions.getConnectionMode(), - pagedFluxOptions.getUserAgent()); - ctxAccessor.setSamplingRateSnapshot(cosmosCtx, pagedFluxOptions.getSamplingRateSnapshot()); - pagedFluxOptions.setDiagnosticsContext(cosmosCtx); - - return Flux - .deferContextual(reactorCtx -> result - .doOnCancel(() -> { - Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx); - tracerProvider.endSpan(traceCtx); - }) - .doOnComplete(() -> { - Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx); - tracerProvider.endSpan(traceCtx); - })) - .contextWrite(DiagnosticsProvider.setContextInReactor( - pagedFluxOptions.getDiagnosticsProvider().startSpan( - pagedFluxOptions.getSpanName(), - cosmosCtx, - context))); + return wrapWithTracingIfEnabled( + pagedFluxOptions, + this.optionsFluxFunction.apply(pagedFluxOptions), + feedResponseConsumerLatencyInNanos, + context) + .doOnSubscribe(ignoredValue -> { + startTime.set(Instant.now()); + feedResponseConsumerLatencyInNanos.set(0); + }); } private boolean isTracerEnabled(DiagnosticsProvider tracerProvider) {