diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt index e4b6afee82a..efec3f2cb5b 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt @@ -1,2 +1,9 @@ Comparing source compatibility of opentelemetry-sdk-trace-1.52.0-SNAPSHOT.jar against opentelemetry-sdk-trace-1.51.0.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder setInternalTelemetryVersion(io.opentelemetry.sdk.common.InternalTelemetryVersion) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder setMeterProvider(java.util.function.Supplier) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder setInternalTelemetryVersion(io.opentelemetry.sdk.common.InternalTelemetryVersion) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder setMeterProvider(java.util.function.Supplier) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java index 3c8de2f541c..94af8c75fc2 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java @@ -45,7 +45,7 @@ public final class GrpcExporter { public GrpcExporter( GrpcSender grpcSender, InternalTelemetryVersion internalTelemetryVersion, - StandardComponentId componentId, + StandardComponentId componentId, Supplier meterProviderSupplier, String endpoint) { this.type = componentId.getStandardType().signal().logFriendlyName(); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java index 07533a95fea..d28fc6a88c2 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java @@ -40,7 +40,7 @@ public final class HttpExporter { private final ExporterInstrumentation exporterMetrics; public HttpExporter( - StandardComponentId componentId, + StandardComponentId componentId, HttpSender httpSender, Supplier meterProviderSupplier, InternalTelemetryVersion internalTelemetryVersion, diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/metrics/ExporterInstrumentation.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/metrics/ExporterInstrumentation.java index a6ae53481a9..c776a2f1abf 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/metrics/ExporterInstrumentation.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/metrics/ExporterInstrumentation.java @@ -28,7 +28,7 @@ public class ExporterInstrumentation { public ExporterInstrumentation( InternalTelemetryVersion schema, Supplier meterProviderSupplier, - StandardComponentId componentId, + StandardComponentId componentId, String endpoint) { Signal signal = componentId.getStandardType().signal(); diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java index 0ea1fa4a10e..61c859e3b5b 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java @@ -77,7 +77,8 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { try (SdkMeterProvider meterProvider = SdkMeterProvider.builder().registerMetricReader(inMemoryMetrics).build()) { - StandardComponentId id = ComponentId.generateLazy(exporterType); + StandardComponentId id = + ComponentId.generateLazy(exporterType); Attributes expectedAttributes = Attributes.builder() diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/http/HttpExporterTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/http/HttpExporterTest.java index 7dcfa3de68e..2ea0563c0d4 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/http/HttpExporterTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/http/HttpExporterTest.java @@ -72,7 +72,8 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { try (SdkMeterProvider meterProvider = SdkMeterProvider.builder().registerMetricReader(inMemoryMetrics).build()) { - StandardComponentId id = ComponentId.generateLazy(exporterType); + StandardComponentId id = + ComponentId.generateLazy(exporterType); Attributes expectedAttributes = Attributes.builder() diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/ComponentId.java b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/ComponentId.java index 13621ffc5d0..f3dc0618d6a 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/ComponentId.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/ComponentId.java @@ -62,7 +62,8 @@ public static ComponentId generateLazy(String componentType) { return new Lazy(componentType); } - public static StandardComponentId generateLazy(StandardComponentId.ExporterType exporterType) { - return new StandardComponentId(exporterType); + public static StandardComponentId generateLazy( + T componentType) { + return new StandardComponentId<>(componentType); } } diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/StandardComponentId.java b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/StandardComponentId.java index d7396d82879..37e34289ea3 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/StandardComponentId.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/StandardComponentId.java @@ -11,13 +11,22 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public class StandardComponentId extends ComponentId.Lazy { +public class StandardComponentId + extends ComponentId.Lazy { /** * This class is internal and is hence not for public use. Its APIs are unstable and can change at * any time. */ - public enum ExporterType { + public interface StandardType { + String typeName(); + } + + /** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ + public enum ExporterType implements StandardType { OTLP_GRPC_SPAN_EXPORTER("otlp_grpc_span_exporter", Signal.SPAN), OTLP_HTTP_SPAN_EXPORTER("otlp_http_span_exporter", Signal.SPAN), OTLP_HTTP_JSON_SPAN_EXPORTER("otlp_http_json_span_exporter", Signal.SPAN), @@ -47,16 +56,41 @@ public enum ExporterType { public Signal signal() { return signal; } + + @Override + public String typeName() { + return value; + } + } + + /** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ + public enum SpanProcessorType implements StandardType { + BATCH_SPAN_PROCESSOR("batching_span_processor"), + SIMPLE_SPAN_PROCESSOR("simple_span_processor"); + + final String value; + + SpanProcessorType(String value) { + this.value = value; + } + + @Override + public String typeName() { + return value; + } } - private final ExporterType standardType; + private final T standardType; - StandardComponentId(ExporterType standardType) { - super(standardType.value); + StandardComponentId(T standardType) { + super(standardType.typeName()); this.standardType = standardType; } - public ExporterType getStandardType() { + public T getStandardType() { return standardType; } } diff --git a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/ComponentIdTest.java b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/ComponentIdTest.java index 27b06463040..901ddef6cab 100644 --- a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/ComponentIdTest.java +++ b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/ComponentIdTest.java @@ -33,18 +33,27 @@ void testStandardTypesUpToDate() { assertThat(StandardComponentId.ExporterType.OTLP_HTTP_JSON_LOG_EXPORTER.value) .isEqualTo( OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_HTTP_JSON_LOG_EXPORTER); - // TODO: uncomment as soon as available in semconv release - // assertThat(ComponentId.StandardType.OTLP_GRPC_METRIC_EXPORTER.value) - // - // .isEqualTo(OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_GRPC_METRIC_EXPORTER); - // assertThat(ComponentId.StandardType.OTLP_HTTP_METRIC_EXPORTER.value) - // - // .isEqualTo(OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_HTTP_METRIC_EXPORTER); - // assertThat(ComponentId.StandardType.OTLP_HTTP_JSON_METRIC_EXPORTER.value) - // - // .isEqualTo(OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_HTTP_JSON_METRIC_EXPORTER); - // assertThat(ComponentId.StandardType.ZIPKIN_HTTP_SPAN_EXPORTER.value) + assertThat(StandardComponentId.ExporterType.OTLP_GRPC_METRIC_EXPORTER.value) + .isEqualTo( + OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_GRPC_METRIC_EXPORTER); + assertThat(StandardComponentId.ExporterType.OTLP_HTTP_METRIC_EXPORTER.value) + .isEqualTo( + OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_HTTP_METRIC_EXPORTER); + assertThat(StandardComponentId.ExporterType.OTLP_HTTP_JSON_METRIC_EXPORTER.value) + .isEqualTo( + OtelIncubatingAttributes.OtelComponentTypeIncubatingValues + .OTLP_HTTP_JSON_METRIC_EXPORTER); + // TODO: uncomment when released in new semconv version + // assertThat(StandardComponentId.ExporterType.ZIPKIN_HTTP_SPAN_EXPORTER.value) + // .isEqualTo( // - // .isEqualTo(OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.ZIPKIN_HTTP_SPAN_EXPORTER); + // OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.ZIPKIN_HTTP_SPAN_EXPORTER); + + assertThat(StandardComponentId.SpanProcessorType.BATCH_SPAN_PROCESSOR.value) + .isEqualTo( + OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.BATCHING_SPAN_PROCESSOR); + assertThat(StandardComponentId.SpanProcessorType.SIMPLE_SPAN_PROCESSOR.value) + .isEqualTo( + OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.SIMPLE_SPAN_PROCESSOR); } } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index cdedb5abb1c..ed8939b62cf 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -5,11 +5,6 @@ package io.opentelemetry.sdk.trace.export; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.internal.DaemonThreadFactory; @@ -19,6 +14,7 @@ import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.internal.JcTools; +import io.opentelemetry.sdk.trace.internal.metrics.SpanProcessorMetrics; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -47,15 +43,12 @@ public final class BatchSpanProcessor implements SpanProcessor { private static final String WORKER_THREAD_NAME = BatchSpanProcessor.class.getSimpleName() + "_WorkerThread"; - private static final AttributeKey SPAN_PROCESSOR_TYPE_LABEL = - AttributeKey.stringKey("processorType"); - private static final AttributeKey SPAN_PROCESSOR_DROPPED_LABEL = - AttributeKey.booleanKey("dropped"); - private static final String SPAN_PROCESSOR_TYPE_VALUE = BatchSpanProcessor.class.getSimpleName(); private final boolean exportUnsampledSpans; private final Worker worker; private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final long queueCapacity; + private final SpanProcessorMetrics metrics; /** * Returns a new Builder for {@link BatchSpanProcessor}. @@ -71,16 +64,17 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { BatchSpanProcessor( SpanExporter spanExporter, boolean exportUnsampledSpans, - MeterProvider meterProvider, + SpanProcessorMetrics metrics, long scheduleDelayNanos, int maxQueueSize, int maxExportBatchSize, long exporterTimeoutNanos) { this.exportUnsampledSpans = exportUnsampledSpans; + this.queueCapacity = maxQueueSize; + this.metrics = metrics; this.worker = new Worker( spanExporter, - meterProvider, scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, @@ -99,6 +93,7 @@ public boolean isStartRequired() { @Override public void onEnd(ReadableSpan span) { + metrics.startRecordingQueueMetrics(worker.queue::size, () -> queueCapacity); if (span != null && (exportUnsampledSpans || span.getSpanContext().isSampled())) { worker.addSpan(span); } @@ -114,7 +109,7 @@ public CompletableResultCode shutdown() { if (isShutdown.getAndSet(true)) { return CompletableResultCode.ofSuccess(); } - return worker.shutdown(); + return worker.shutdown().whenComplete(metrics::close); } @Override @@ -159,11 +154,7 @@ public String toString() { // Worker is a thread that batches multiple spans and calls the registered SpanExporter to export // the data. - private static final class Worker implements Runnable { - - private final LongCounter processedSpansCounter; - private final Attributes droppedAttrs; - private final Attributes exportedAttrs; + private final class Worker implements Runnable { private final SpanExporter spanExporter; private final long scheduleDelayNanos; @@ -188,7 +179,6 @@ private static final class Worker implements Runnable { private Worker( SpanExporter spanExporter, - MeterProvider meterProvider, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, @@ -199,44 +189,12 @@ private Worker( this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; this.signal = new ArrayBlockingQueue<>(1); - Meter meter = meterProvider.meterBuilder("io.opentelemetry.sdk.trace").build(); - meter - .gaugeBuilder("queueSize") - .ofLongs() - .setDescription("The number of items queued") - .setUnit("1") - .buildWithCallback( - result -> - result.record( - queue.size(), - Attributes.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE))); - processedSpansCounter = - meter - .counterBuilder("processedSpans") - .setUnit("1") - .setDescription( - "The number of spans processed by the BatchSpanProcessor. " - + "[dropped=true if they were dropped due to high throughput]") - .build(); - droppedAttrs = - Attributes.of( - SPAN_PROCESSOR_TYPE_LABEL, - SPAN_PROCESSOR_TYPE_VALUE, - SPAN_PROCESSOR_DROPPED_LABEL, - true); - exportedAttrs = - Attributes.of( - SPAN_PROCESSOR_TYPE_LABEL, - SPAN_PROCESSOR_TYPE_VALUE, - SPAN_PROCESSOR_DROPPED_LABEL, - false); - this.batch = new ArrayList<>(this.maxExportBatchSize); } private void addSpan(ReadableSpan span) { if (!queue.offer(span)) { - processedSpansCounter.add(1, droppedAttrs); + metrics.recordSpansProcessed(1L, SpanProcessorMetrics.QUEUE_FULL_DROPPED_ERROR_TYPE); } else { if (queueSize.incrementAndGet() >= spansNeeded.get()) { signal.offer(true); @@ -341,10 +299,11 @@ private void exportCurrentBatch() { } try { + metrics.recordSpansProcessed(batch.size(), null); CompletableResultCode result = spanExporter.export(Collections.unmodifiableList(batch)); result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); if (result.isSuccess()) { - processedSpansCounter.add(batch.size(), exportedAttrs); + metrics.recordSpansExportedSuccessfully(batch.size()); } else { logger.log(Level.FINE, "Exporter failed"); } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java index d89083ded06..2bff134878e 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java @@ -9,8 +9,11 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.trace.internal.metrics.SpanProcessorMetrics; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -33,7 +36,8 @@ public final class BatchSpanProcessorBuilder { private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE; private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE; private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS); - private MeterProvider meterProvider = MeterProvider.noop(); + private Supplier meterProviderSupplier = MeterProvider::noop; + private InternalTelemetryVersion internalTelemetryVersion = InternalTelemetryVersion.LEGACY; BatchSpanProcessorBuilder(SpanExporter spanExporter) { this.spanExporter = requireNonNull(spanExporter, "spanExporter"); @@ -139,13 +143,34 @@ public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) { return this; } + /** + * Sets the {@link InternalTelemetryVersion} defining which self-monitoring metrics this processor + * collects. + */ + public BatchSpanProcessorBuilder setInternalTelemetryVersion( + InternalTelemetryVersion internalTelemetryVersion) { + requireNonNull(internalTelemetryVersion, "internalTelemetryVersion"); + this.internalTelemetryVersion = internalTelemetryVersion; + return this; + } + + /** + * Sets the {@link MeterProvider} to use to collect metrics related to batch export. If not set, + * metrics will not be collected. + */ + public BatchSpanProcessorBuilder setMeterProvider(Supplier meterProviderSupplier) { + requireNonNull(meterProviderSupplier, "meterProviderSupplier"); + this.meterProviderSupplier = meterProviderSupplier; + return this; + } + /** * Sets the {@link MeterProvider} to use to collect metrics related to batch export. If not set, * metrics will not be collected. */ public BatchSpanProcessorBuilder setMeterProvider(MeterProvider meterProvider) { requireNonNull(meterProvider, "meterProvider"); - this.meterProvider = meterProvider; + setMeterProvider(() -> meterProvider); return this; } @@ -171,7 +196,8 @@ public BatchSpanProcessor build() { return new BatchSpanProcessor( spanExporter, exportUnsampledSpans, - meterProvider, + SpanProcessorMetrics.createForBatchProcessor( + internalTelemetryVersion, meterProviderSupplier), scheduleDelayNanos, maxQueueSize, maxExportBatchSize, diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java index f543e25353e..ec912c86b1d 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java @@ -13,6 +13,7 @@ import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.internal.metrics.SpanProcessorMetrics; import java.util.Collections; import java.util.List; import java.util.Set; @@ -36,6 +37,7 @@ public final class SimpleSpanProcessor implements SpanProcessor { private static final Logger logger = Logger.getLogger(SimpleSpanProcessor.class.getName()); private final SpanExporter spanExporter; + private final SpanProcessorMetrics metrics; private final boolean exportUnsampledSpans; private final Set pendingExports = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -68,8 +70,10 @@ public static SimpleSpanProcessorBuilder builder(SpanExporter exporter) { return new SimpleSpanProcessorBuilder(exporter); } - SimpleSpanProcessor(SpanExporter spanExporter, boolean exportUnsampledSpans) { + SimpleSpanProcessor( + SpanExporter spanExporter, SpanProcessorMetrics metrics, boolean exportUnsampledSpans) { this.spanExporter = requireNonNull(spanExporter, "spanExporter"); + this.metrics = metrics; this.exportUnsampledSpans = exportUnsampledSpans; } @@ -88,6 +92,7 @@ public void onEnd(ReadableSpan span) { if (span != null && (exportUnsampledSpans || span.getSpanContext().isSampled())) { try { List spans = Collections.singletonList(span.toSpanData()); + metrics.recordSpansProcessed(1, null); CompletableResultCode result; synchronized (exporterLock) { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorBuilder.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorBuilder.java index de9f3f9152a..e45e23c0622 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorBuilder.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorBuilder.java @@ -7,6 +7,12 @@ import static java.util.Objects.requireNonNull; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.trace.internal.metrics.SpanProcessorMetrics; +import java.util.function.Supplier; + /** * Builder class for {@link SimpleSpanProcessor}. * @@ -15,6 +21,8 @@ public final class SimpleSpanProcessorBuilder { private final SpanExporter spanExporter; private boolean exportUnsampledSpans = false; + private Supplier meterProviderSupplier = GlobalOpenTelemetry::getMeterProvider; + private InternalTelemetryVersion internalTelemetryVersion = InternalTelemetryVersion.LEGACY; SimpleSpanProcessorBuilder(SpanExporter spanExporter) { this.spanExporter = requireNonNull(spanExporter, "spanExporter"); @@ -29,12 +37,38 @@ public SimpleSpanProcessorBuilder setExportUnsampledSpans(boolean exportUnsample return this; } + /** + * Sets the {@link InternalTelemetryVersion} defining which self-monitoring metrics this processor + * collects. + */ + public SimpleSpanProcessorBuilder setInternalTelemetryVersion( + InternalTelemetryVersion internalTelemetryVersion) { + requireNonNull(internalTelemetryVersion, "internalTelemetryVersion"); + this.internalTelemetryVersion = internalTelemetryVersion; + return this; + } + + /** + * Sets the {@link MeterProvider} to use to collect metrics related to batch export. If not set, + * metrics will not be collected. + */ + public SimpleSpanProcessorBuilder setMeterProvider( + Supplier meterProviderSupplier) { + requireNonNull(meterProviderSupplier, "meterProviderSupplier"); + this.meterProviderSupplier = meterProviderSupplier; + return this; + } + /** * Returns a new {@link SimpleSpanProcessor} with the configuration of this builder. * * @return a new {@link SimpleSpanProcessor}. */ public SimpleSpanProcessor build() { - return new SimpleSpanProcessor(spanExporter, exportUnsampledSpans); + return new SimpleSpanProcessor( + spanExporter, + SpanProcessorMetrics.createForSimpleProcessor( + internalTelemetryVersion, meterProviderSupplier), + exportUnsampledSpans); } } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/LegacyBatchSpanProcessorMetrics.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/LegacyBatchSpanProcessorMetrics.java new file mode 100644 index 00000000000..58b20a27edb --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/LegacyBatchSpanProcessorMetrics.java @@ -0,0 +1,123 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.internal.metrics; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +class LegacyBatchSpanProcessorMetrics implements SpanProcessorMetrics { + + public static final AttributeKey SPAN_PROCESSOR_TYPE_LABEL = + AttributeKey.stringKey("processorType"); + public static final AttributeKey SPAN_PROCESSOR_DROPPED_LABEL = + AttributeKey.booleanKey("dropped"); + public static final String SPAN_PROCESSOR_TYPE_VALUE = BatchSpanProcessor.class.getSimpleName(); + + private static final Attributes DROPPED_ATTRIBS = + Attributes.of( + LegacyBatchSpanProcessorMetrics.SPAN_PROCESSOR_TYPE_LABEL, + LegacyBatchSpanProcessorMetrics.SPAN_PROCESSOR_TYPE_VALUE, + LegacyBatchSpanProcessorMetrics.SPAN_PROCESSOR_DROPPED_LABEL, + true); + + private static final Attributes EXPORTED_ATTRIBS = + Attributes.of( + LegacyBatchSpanProcessorMetrics.SPAN_PROCESSOR_TYPE_LABEL, + LegacyBatchSpanProcessorMetrics.SPAN_PROCESSOR_TYPE_VALUE, + LegacyBatchSpanProcessorMetrics.SPAN_PROCESSOR_DROPPED_LABEL, + false); + + private final Supplier meterProviderSupplier; + + @Nullable private volatile LongCounter processedSpans; + + private volatile boolean queueMetricInitialized = false; + + @Nullable private ObservableLongGauge queueSize; + + LegacyBatchSpanProcessorMetrics(Supplier meterProviderSupplier) { + this.meterProviderSupplier = meterProviderSupplier; + } + + private Meter meter() { + MeterProvider meterProvider = meterProviderSupplier.get(); + if (meterProvider == null) { + meterProvider = MeterProvider.noop(); + } + return meterProvider.get("io.opentelemetry.sdk.trace"); + } + + private LongCounter processedSpans() { + LongCounter processedSpans = this.processedSpans; + if (processedSpans == null) { + processedSpans = + meter() + .counterBuilder("processedSpans") + .setUnit("1") + .setDescription( + "The number of spans processed by the BatchSpanProcessor. " + + "[dropped=true if they were dropped due to high throughput]") + .build(); + this.processedSpans = processedSpans; + } + return processedSpans; + } + + @Override + public void recordSpansProcessed(long count, @Nullable String errorType) { + // Only used by legacy metrics for dropped spans + if (errorType != null) { + processedSpans().add(count, DROPPED_ATTRIBS); + } + } + + @Override + public void recordSpansExportedSuccessfully(long count) { + processedSpans().add(count, EXPORTED_ATTRIBS); + } + + @Override + public void startRecordingQueueMetrics( + LongSupplier queueSizeSupplier, LongSupplier queueCapacitySupplier) { + if (queueMetricInitialized) { + return; + } + synchronized (this) { + if (queueMetricInitialized) { + return; + } + this.queueSize = + meter() + .gaugeBuilder("queueSize") + .ofLongs() + .setDescription("The number of items queued") + .setUnit("1") + .buildWithCallback( + result -> + result.record( + queueSizeSupplier.getAsLong(), + Attributes.of( + LegacyBatchSpanProcessorMetrics.SPAN_PROCESSOR_TYPE_LABEL, + LegacyBatchSpanProcessorMetrics.SPAN_PROCESSOR_TYPE_VALUE))); + } + } + + @Override + public synchronized void close() { + queueMetricInitialized = true; // to prevent initialization after close + if (queueSize != null) { + queueSize.close(); + } + } +} diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/NoopSpanProcessorMetrics.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/NoopSpanProcessorMetrics.java new file mode 100644 index 00000000000..1ba3b363ce4 --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/NoopSpanProcessorMetrics.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.internal.metrics; + +import java.util.function.LongSupplier; +import javax.annotation.Nullable; + +class NoopSpanProcessorMetrics implements SpanProcessorMetrics { + + static final NoopSpanProcessorMetrics INSTANCE = new NoopSpanProcessorMetrics(); + + @Override + public void recordSpansProcessed(long count, @Nullable String errorType) {} + + @Override + public void recordSpansExportedSuccessfully(long count) {} + + @Override + public void startRecordingQueueMetrics( + LongSupplier queueSizeSupplier, LongSupplier queueCapacitySupplier) {} + + @Override + public void close() {} +} diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/SemConvSpanProcessorMetrics.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/SemConvSpanProcessorMetrics.java new file mode 100644 index 00000000000..eeb87d7e494 --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/SemConvSpanProcessorMetrics.java @@ -0,0 +1,132 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.internal.metrics; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; +import io.opentelemetry.sdk.internal.ComponentId; +import io.opentelemetry.sdk.internal.SemConvAttributes; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +class SemConvSpanProcessorMetrics implements SpanProcessorMetrics { + + private volatile boolean queueMetricsStarted = false; + + @Nullable private ObservableLongUpDownCounter queueCapacity; + @Nullable private ObservableLongUpDownCounter queueSize; + @Nullable private LongCounter processed; + + private final Supplier meterProviderSupplier; + private final ComponentId componentId; + + @Nullable private volatile Attributes attributes = null; + + SemConvSpanProcessorMetrics( + Supplier meterProviderSupplier, ComponentId componentId) { + this.meterProviderSupplier = meterProviderSupplier; + this.componentId = componentId; + } + + private Attributes attributes() { + // attributes are initialized lazily to trigger lazy initialization of the componentId + Attributes attribs = this.attributes; + if (attribs == null) { + AttributesBuilder builder = Attributes.builder(); + builder.put(SemConvAttributes.OTEL_COMPONENT_TYPE, componentId.getTypeName()); + builder.put(SemConvAttributes.OTEL_COMPONENT_NAME, componentId.getComponentName()); + attribs = builder.build(); + this.attributes = attribs; + } + return attribs; + } + + private Meter meter() { + MeterProvider meterProvider = meterProviderSupplier.get(); + if (meterProvider == null) { + meterProvider = MeterProvider.noop(); + } + return meterProvider.get("io.opentelemetry.processor." + componentId.getTypeName()); + } + + private LongCounter processed() { + LongCounter processed = this.processed; + if (processed == null) { + processed = + meter() + .counterBuilder("otel.sdk.processor.span.processed") + .setUnit("{span}") + .setDescription("The number of spans for which the processing has finished") + .build(); + this.processed = processed; + } + return processed; + } + + @Override + public void startRecordingQueueMetrics( + LongSupplier queueSizeSupplier, LongSupplier queueCapacitySupplier) { + if (queueMetricsStarted) { + return; + } + synchronized (this) { + if (queueMetricsStarted) { + return; + } + queueSize = + meter() + .upDownCounterBuilder("otel.sdk.processor.span.queue.size") + .setUnit("{span}") + .setDescription( + "The number of spans in the queue of a given instance of an SDK span processor") + .buildWithCallback( + measurement -> { + measurement.record(queueSizeSupplier.getAsLong(), attributes()); + }); + queueCapacity = + meter() + .upDownCounterBuilder("otel.sdk.processor.span.queue.capacity") + .setUnit("{span}") + .setDescription( + "The maximum number of spans the queue of a given instance of an SDK span processor can hold") + .buildWithCallback( + measurement -> { + measurement.record(queueCapacitySupplier.getAsLong(), attributes()); + }); + queueMetricsStarted = true; + } + } + + @Override + public synchronized void close() { + queueMetricsStarted = true; // prevent initialization after close + if (queueCapacity != null) { + queueCapacity.close(); + } + if (queueSize != null) { + queueSize.close(); + } + } + + @Override + public void recordSpansProcessed(long count, @Nullable String errorType) { + Attributes attribs = attributes(); + if (errorType != null) { + attribs = attribs.toBuilder().put(SemConvAttributes.ERROR_TYPE, errorType).build(); + } + processed().add(count, attribs); + } + + @Override + public void recordSpansExportedSuccessfully(long count) { + // Not used by semconv metrics + } +} diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/SpanProcessorMetrics.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/SpanProcessorMetrics.java new file mode 100644 index 00000000000..5adce7da6be --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/internal/metrics/SpanProcessorMetrics.java @@ -0,0 +1,72 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.internal.metrics; + +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.internal.ComponentId; +import io.opentelemetry.sdk.internal.StandardComponentId; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public interface SpanProcessorMetrics extends AutoCloseable { + + /** This value is defined in the semantic conventions. */ + String QUEUE_FULL_DROPPED_ERROR_TYPE = "queue_full"; + + static SpanProcessorMetrics noop() { + return NoopSpanProcessorMetrics.INSTANCE; + } + + static SpanProcessorMetrics createForBatchProcessor( + InternalTelemetryVersion version, Supplier meterProvider) { + switch (version) { + case LEGACY: + return new LegacyBatchSpanProcessorMetrics(meterProvider); + case LATEST: + return new SemConvSpanProcessorMetrics( + meterProvider, + ComponentId.generateLazy(StandardComponentId.SpanProcessorType.BATCH_SPAN_PROCESSOR)); + } + throw new IllegalStateException("Unhandled case: " + version); + } + + static SpanProcessorMetrics createForSimpleProcessor( + InternalTelemetryVersion version, Supplier meterProvider) { + switch (version) { + case LEGACY: + return SpanProcessorMetrics.noop(); // no legacy metrics for simple span processor + case LATEST: + return new SemConvSpanProcessorMetrics( + meterProvider, + ComponentId.generateLazy(StandardComponentId.SpanProcessorType.SIMPLE_SPAN_PROCESSOR)); + } + throw new IllegalStateException("Unhandled case: " + version); + } + + void recordSpansProcessed(long count, @Nullable String errorType); + + void recordSpansExportedSuccessfully(long count); + + /** + * Can be called multiple times and concurrently, but only the first invocation will have an + * effect. + */ + void startRecordingQueueMetrics( + LongSupplier queueSizeSupplier, LongSupplier queueCapacitySupplier); + + /** + * Must be called if {@link #startRecordingQueueMetrics(LongSupplier, LongSupplier)} was called, + * otherwise not required. + */ + @Override + void close(); +} diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index 89aff56d9c7..254bd9a0ebf 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -5,7 +5,7 @@ package io.opentelemetry.sdk.trace.export; -import static org.assertj.core.api.Assertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; @@ -16,13 +16,22 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.internal.GuardedBy; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.internal.SemConvAttributes; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.data.SpanData; @@ -36,6 +45,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import javax.annotation.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -44,6 +55,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentMatchers; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -681,6 +693,275 @@ void exporterThrowsNonRuntimeException() { await().untilAsserted(() -> assertThat(batchSpanProcessor.getQueue()).isEmpty()); } + @Test + @SuppressWarnings("unchecked") + void verifyMeterProviderTouchedLazily() { + Supplier mockSupplier = Mockito.mock(Supplier.class); + + BatchSpanProcessor batchSpanProcessor = + BatchSpanProcessor.builder(mockSpanExporter) + .setScheduleDelay( + 1, TimeUnit.HOURS) // don't use scheduled exporting to not mess with the test + .setMaxExportBatchSize(3) + .setMaxQueueSize(4) + .setMeterProvider(mockSupplier) + .build(); + + verifyNoInteractions(mockSupplier); + + batchSpanProcessor.shutdown(); + } + + @Test + void verifySemConvMetrics() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + + try (SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + BatchSpanProcessor batchSpanProcessor = + BatchSpanProcessor.builder(mockSpanExporter) + .setScheduleDelay( + 1, TimeUnit.HOURS) // don't use scheduled exporting to not mess with the test + .setMaxExportBatchSize(3) + .setMaxQueueSize(4) + .setInternalTelemetryVersion(InternalTelemetryVersion.LATEST) + .setMeterProvider(meterProvider) + .build()) { + + sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build(); + + CompletableResultCode blockedResultCode = new CompletableResultCode(); + when(mockSpanExporter.export(any())).thenReturn(blockedResultCode); + + createEndedSpan("span1"); + createEndedSpan("span2"); + + AtomicReference componentNameHolder = new AtomicReference<>(); + + Collection metricData = metricReader.collectAllMetrics(); + assertThat(metricData) + .hasSize(2) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("otel.sdk.processor.span.queue.size") + .hasLongSumSatisfying( + size -> + size.hasPointsSatisfying( + point -> + point + .hasValue(2) + .hasAttributesSatisfying( + attribs -> { + assertThat(attribs.size()).isEqualTo(2); + assertThat( + attribs.get( + SemConvAttributes.OTEL_COMPONENT_TYPE)) + .isEqualTo("batching_span_processor"); + String componentName = + attribs.get( + SemConvAttributes.OTEL_COMPONENT_NAME); + assertThat(componentName) + .matches("batching_span_processor/\\d+"); + componentNameHolder.set(componentName); + })))); + + Attributes expectedAttribs = + Attributes.builder() + .put(SemConvAttributes.OTEL_COMPONENT_TYPE, "batching_span_processor") + .put(SemConvAttributes.OTEL_COMPONENT_NAME, componentNameHolder.get()) + .build(); + + assertThat(metricData) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("otel.sdk.processor.span.queue.capacity") + .hasLongSumSatisfying( + capacity -> + capacity.hasPointsSatisfying( + point -> point.hasValue(4).hasAttributes(expectedAttribs)))); + + createEndedSpan("span3"); // triggers export, queue is empty afterwards + createEndedSpan("span4"); + createEndedSpan("span5"); + createEndedSpan("span6"); + createEndedSpan("span7"); + createEndedSpan("span8"); // dropped due to full queue + + assertThat(metricReader.collectAllMetrics()) + .hasSize(3) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("otel.sdk.processor.span.queue.capacity") + .hasLongSumSatisfying( + capacity -> + capacity.hasPointsSatisfying( + point -> point.hasValue(4).hasAttributes(expectedAttribs)))) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("otel.sdk.processor.span.queue.size") + .hasLongSumSatisfying( + capacity -> + capacity.hasPointsSatisfying( + point -> point.hasValue(4).hasAttributes(expectedAttribs)))) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + processed -> + processed.hasPointsSatisfying( + point -> point.hasValue(3).hasAttributes(expectedAttribs), + point -> + point + .hasValue(1) + .hasAttributes( + expectedAttribs.toBuilder() + .put(SemConvAttributes.ERROR_TYPE, "queue_full") + .build())))); + + // Stop blocking to allow a fast termination of the test case + blockedResultCode.succeed(); + } + } + + @Test + void verifyLegacyMetrics() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + + try (SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + BatchSpanProcessor batchSpanProcessor = + BatchSpanProcessor.builder(mockSpanExporter) + .setScheduleDelay( + 1, TimeUnit.HOURS) // don't use scheduled exporting to not mess with the test + .setMaxExportBatchSize(3) + .setMaxQueueSize(4) + .setMeterProvider(meterProvider) + .build()) { + + // Create tracer provider with our processor + sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build(); + + // Create and end spans to trigger metrics recording + createEndedSpan("span1"); + createEndedSpan("span2"); + + // Collect metrics and verify them + Attributes typeAttributes = + Attributes.of( + AttributeKey.stringKey("processorType"), BatchSpanProcessor.class.getSimpleName()); + assertThat(metricReader.collectAllMetrics()) + .hasSize(1) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("queueSize") + .hasLongGaugeSatisfying( + size -> + size.hasPointsSatisfying( + point -> point.hasValue(2).hasAttributes(typeAttributes)))); + + when(mockSpanExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess()); + + // Supply one more span to trigger an export + createEndedSpan("span3"); + + // We have to wait until after the export is finished for the metrics to be published + await() + .atMost(Duration.ofSeconds(5)) + .untilAsserted( + () -> + assertThat(metricReader.collectAllMetrics()) + .hasSize(2) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("queueSize") + .hasLongGaugeSatisfying( + size -> + size.hasPointsSatisfying( + point -> + point.hasValue(0).hasAttributes(typeAttributes)))) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("processedSpans") + .hasLongSumSatisfying( + count -> + count.hasPointsSatisfying( + point -> + point + .hasValue(3) + .hasAttributes( + typeAttributes.toBuilder() + .put( + AttributeKey.booleanKey( + "dropped"), + false) + .build()))))); + + // Now we fill trigger another export and then have the exporter block, + // to in turn cause a full queue and spans to be dropped + + CompletableResultCode blockedResultCode = new CompletableResultCode(); + when(mockSpanExporter.export(any())).thenReturn(blockedResultCode); + + createEndedSpan("span4"); + createEndedSpan("span5"); + createEndedSpan("span6"); + + await() + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> verify(mockSpanExporter, times(2)).export(any())); + + // Now fill up the queue and have a span dropped + createEndedSpan("span6"); + createEndedSpan("span7"); + createEndedSpan("span8"); + createEndedSpan("span9"); + createEndedSpan("span10"); + + assertThat(metricReader.collectAllMetrics()) + .hasSize(2) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("queueSize") + .hasLongGaugeSatisfying( + size -> + size.hasPointsSatisfying( + point -> point.hasValue(4).hasAttributes(typeAttributes)))) + .anySatisfy( + metric -> + assertThat(metric) + .hasName("processedSpans") + .hasLongSumSatisfying( + count -> + count.hasPointsSatisfying( + point -> + point + .hasValue(3) + .hasAttributes( + typeAttributes.toBuilder() + .put(AttributeKey.booleanKey("dropped"), false) + .build()), + point -> + point + .hasValue(1) + .hasAttributes( + typeAttributes.toBuilder() + .put(AttributeKey.booleanKey("dropped"), true) + .build())))); + + // Stop blocking to allow a fast termination of the test case + blockedResultCode.succeed(); + } + } + private static final class BlockingSpanExporter implements SpanExporter { final Object monitor = new Object(); diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java index 62b1d59a84f..e4cf1c1cc96 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java @@ -16,6 +16,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanId; @@ -25,6 +26,11 @@ import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.internal.SemConvAttributes; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; @@ -36,10 +42,12 @@ import io.opentelemetry.sdk.trace.samplers.SamplingResult; import java.util.Collections; import java.util.List; +import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -281,4 +289,76 @@ void getSpanExporter() { assertThat(((SimpleSpanProcessor) SimpleSpanProcessor.create(spanExporter)).getSpanExporter()) .isSameAs(spanExporter); } + + @Test + @SuppressWarnings("unchecked") + void verifyMetricsDisabledByDefault() { + Supplier mockSupplier = Mockito.mock(Supplier.class); + + when(spanExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess()); + + SpanData spanData = TestUtils.makeBasicSpan(); + when(readableSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT); + when(readableSpan.toSpanData()).thenReturn(spanData); + + SimpleSpanProcessor processor = + SimpleSpanProcessor.builder(spanExporter).setMeterProvider(mockSupplier).build(); + + processor.onEnd(readableSpan); + + verifyNoInteractions(mockSupplier); + } + + @Test + void verifySemConvMetrics() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + + try (SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build()) { + + SpanData spanData = TestUtils.makeBasicSpan(); + when(readableSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT); + when(readableSpan.toSpanData()).thenReturn(spanData); + + SimpleSpanProcessor processor = + SimpleSpanProcessor.builder(spanExporter) + .setMeterProvider(() -> meterProvider) + .setInternalTelemetryVersion(InternalTelemetryVersion.LATEST) + .build(); + + CompletableResultCode blockedResultCode = new CompletableResultCode(); + when(spanExporter.export(any())).thenReturn(blockedResultCode); + + processor.onEnd(readableSpan); + + assertThat(metricReader.collectAllMetrics()) + .hasSize(1) + .anySatisfy( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + size -> + size.hasPointsSatisfying( + point -> + point + .hasValue(1) + .hasAttributesSatisfying( + attribs -> { + assertThat(attribs.size()).isEqualTo(2); + assertThat( + attribs.get( + SemConvAttributes.OTEL_COMPONENT_TYPE)) + .isEqualTo("simple_span_processor"); + String componentName = + attribs.get( + SemConvAttributes.OTEL_COMPONENT_NAME); + assertThat(componentName) + .matches("simple_span_processor/\\d+"); + })))); + + // Stop blocking to allow a fast termination of the test case + blockedResultCode.succeed(); + } + } }