Skip to content

Implement semconv span processor health metrics #7441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
Comparing source compatibility of opentelemetry-sdk-trace-1.52.0-SNAPSHOT.jar against opentelemetry-sdk-trace-1.51.0.jar
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder setInternalTelemetryVersion(io.opentelemetry.sdk.common.InternalTelemetryVersion)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder setMeterProvider(java.util.function.Supplier<io.opentelemetry.api.metrics.MeterProvider>)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder setInternalTelemetryVersion(io.opentelemetry.sdk.common.InternalTelemetryVersion)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder setMeterProvider(java.util.function.Supplier<io.opentelemetry.api.metrics.MeterProvider>)
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public final class GrpcExporter<T extends Marshaler> {
public GrpcExporter(
GrpcSender<T> grpcSender,
InternalTelemetryVersion internalTelemetryVersion,
StandardComponentId componentId,
StandardComponentId<StandardComponentId.ExporterType> componentId,
Supplier<MeterProvider> meterProviderSupplier,
String endpoint) {
this.type = componentId.getStandardType().signal().logFriendlyName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class HttpExporter<T extends Marshaler> {
private final ExporterInstrumentation exporterMetrics;

public HttpExporter(
StandardComponentId componentId,
StandardComponentId<StandardComponentId.ExporterType> componentId,
HttpSender httpSender,
Supplier<MeterProvider> meterProviderSupplier,
InternalTelemetryVersion internalTelemetryVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ExporterInstrumentation {
public ExporterInstrumentation(
InternalTelemetryVersion schema,
Supplier<MeterProvider> meterProviderSupplier,
StandardComponentId componentId,
StandardComponentId<StandardComponentId.ExporterType> componentId,
String endpoint) {

Signal signal = componentId.getStandardType().signal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) {
try (SdkMeterProvider meterProvider =
SdkMeterProvider.builder().registerMetricReader(inMemoryMetrics).build()) {

StandardComponentId id = ComponentId.generateLazy(exporterType);
StandardComponentId<StandardComponentId.ExporterType> id =
ComponentId.generateLazy(exporterType);

Attributes expectedAttributes =
Attributes.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) {
try (SdkMeterProvider meterProvider =
SdkMeterProvider.builder().registerMetricReader(inMemoryMetrics).build()) {

StandardComponentId id = ComponentId.generateLazy(exporterType);
StandardComponentId<StandardComponentId.ExporterType> id =
ComponentId.generateLazy(exporterType);

Attributes expectedAttributes =
Attributes.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public static ComponentId generateLazy(String componentType) {
return new Lazy(componentType);
}

public static StandardComponentId generateLazy(StandardComponentId.ExporterType exporterType) {
return new StandardComponentId(exporterType);
public static <T extends StandardComponentId.StandardType> StandardComponentId<T> generateLazy(
T componentType) {
return new StandardComponentId<>(componentType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,22 @@
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class StandardComponentId extends ComponentId.Lazy {
public class StandardComponentId<T extends StandardComponentId.StandardType>
extends ComponentId.Lazy {

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public enum ExporterType {
public interface StandardType {
String typeName();
}

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public enum ExporterType implements StandardType {
OTLP_GRPC_SPAN_EXPORTER("otlp_grpc_span_exporter", Signal.SPAN),
OTLP_HTTP_SPAN_EXPORTER("otlp_http_span_exporter", Signal.SPAN),
OTLP_HTTP_JSON_SPAN_EXPORTER("otlp_http_json_span_exporter", Signal.SPAN),
Expand Down Expand Up @@ -47,16 +56,41 @@ public enum ExporterType {
public Signal signal() {
return signal;
}

@Override
public String typeName() {
return value;
}
}

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public enum SpanProcessorType implements StandardType {
BATCH_SPAN_PROCESSOR("batching_span_processor"),
SIMPLE_SPAN_PROCESSOR("simple_span_processor");

final String value;

SpanProcessorType(String value) {
this.value = value;
}

@Override
public String typeName() {
return value;
}
}

private final ExporterType standardType;
private final T standardType;

StandardComponentId(ExporterType standardType) {
super(standardType.value);
StandardComponentId(T standardType) {
super(standardType.typeName());
this.standardType = standardType;
}

public ExporterType getStandardType() {
public T getStandardType() {
return standardType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,27 @@ void testStandardTypesUpToDate() {
assertThat(StandardComponentId.ExporterType.OTLP_HTTP_JSON_LOG_EXPORTER.value)
.isEqualTo(
OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_HTTP_JSON_LOG_EXPORTER);
// TODO: uncomment as soon as available in semconv release
// assertThat(ComponentId.StandardType.OTLP_GRPC_METRIC_EXPORTER.value)
//
// .isEqualTo(OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_GRPC_METRIC_EXPORTER);
// assertThat(ComponentId.StandardType.OTLP_HTTP_METRIC_EXPORTER.value)
//
// .isEqualTo(OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_HTTP_METRIC_EXPORTER);
// assertThat(ComponentId.StandardType.OTLP_HTTP_JSON_METRIC_EXPORTER.value)
//
// .isEqualTo(OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_HTTP_JSON_METRIC_EXPORTER);
// assertThat(ComponentId.StandardType.ZIPKIN_HTTP_SPAN_EXPORTER.value)
assertThat(StandardComponentId.ExporterType.OTLP_GRPC_METRIC_EXPORTER.value)
.isEqualTo(
OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_GRPC_METRIC_EXPORTER);
assertThat(StandardComponentId.ExporterType.OTLP_HTTP_METRIC_EXPORTER.value)
.isEqualTo(
OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.OTLP_HTTP_METRIC_EXPORTER);
assertThat(StandardComponentId.ExporterType.OTLP_HTTP_JSON_METRIC_EXPORTER.value)
.isEqualTo(
OtelIncubatingAttributes.OtelComponentTypeIncubatingValues
.OTLP_HTTP_JSON_METRIC_EXPORTER);
// TODO: uncomment when released in new semconv version
// assertThat(StandardComponentId.ExporterType.ZIPKIN_HTTP_SPAN_EXPORTER.value)
// .isEqualTo(
//
// .isEqualTo(OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.ZIPKIN_HTTP_SPAN_EXPORTER);
// OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.ZIPKIN_HTTP_SPAN_EXPORTER);

assertThat(StandardComponentId.SpanProcessorType.BATCH_SPAN_PROCESSOR.value)
.isEqualTo(
OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.BATCHING_SPAN_PROCESSOR);
assertThat(StandardComponentId.SpanProcessorType.SIMPLE_SPAN_PROCESSOR.value)
.isEqualTo(
OtelIncubatingAttributes.OtelComponentTypeIncubatingValues.SIMPLE_SPAN_PROCESSOR);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

package io.opentelemetry.sdk.trace.export;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
Expand All @@ -19,6 +14,7 @@
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.internal.JcTools;
import io.opentelemetry.sdk.trace.internal.metrics.SpanProcessorMetrics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -47,15 +43,12 @@ public final class BatchSpanProcessor implements SpanProcessor {

private static final String WORKER_THREAD_NAME =
BatchSpanProcessor.class.getSimpleName() + "_WorkerThread";
private static final AttributeKey<String> SPAN_PROCESSOR_TYPE_LABEL =
AttributeKey.stringKey("processorType");
private static final AttributeKey<Boolean> SPAN_PROCESSOR_DROPPED_LABEL =
AttributeKey.booleanKey("dropped");
private static final String SPAN_PROCESSOR_TYPE_VALUE = BatchSpanProcessor.class.getSimpleName();

private final boolean exportUnsampledSpans;
private final Worker worker;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final long queueCapacity;
private final SpanProcessorMetrics metrics;

/**
* Returns a new Builder for {@link BatchSpanProcessor}.
Expand All @@ -71,16 +64,17 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) {
BatchSpanProcessor(
SpanExporter spanExporter,
boolean exportUnsampledSpans,
MeterProvider meterProvider,
SpanProcessorMetrics metrics,
long scheduleDelayNanos,
int maxQueueSize,
int maxExportBatchSize,
long exporterTimeoutNanos) {
this.exportUnsampledSpans = exportUnsampledSpans;
this.queueCapacity = maxQueueSize;
this.metrics = metrics;
this.worker =
new Worker(
spanExporter,
meterProvider,
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
Expand All @@ -99,6 +93,7 @@ public boolean isStartRequired() {

@Override
public void onEnd(ReadableSpan span) {
metrics.startRecordingQueueMetrics(worker.queue::size, () -> queueCapacity);
if (span != null && (exportUnsampledSpans || span.getSpanContext().isSampled())) {
worker.addSpan(span);
}
Expand All @@ -114,7 +109,7 @@ public CompletableResultCode shutdown() {
if (isShutdown.getAndSet(true)) {
return CompletableResultCode.ofSuccess();
}
return worker.shutdown();
return worker.shutdown().whenComplete(metrics::close);
}

@Override
Expand Down Expand Up @@ -159,11 +154,7 @@ public String toString() {

// Worker is a thread that batches multiple spans and calls the registered SpanExporter to export
// the data.
private static final class Worker implements Runnable {

private final LongCounter processedSpansCounter;
private final Attributes droppedAttrs;
private final Attributes exportedAttrs;
private final class Worker implements Runnable {

private final SpanExporter spanExporter;
private final long scheduleDelayNanos;
Expand All @@ -188,7 +179,6 @@ private static final class Worker implements Runnable {

private Worker(
SpanExporter spanExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
Expand All @@ -199,44 +189,12 @@ private Worker(
this.exporterTimeoutNanos = exporterTimeoutNanos;
this.queue = queue;
this.signal = new ArrayBlockingQueue<>(1);
Meter meter = meterProvider.meterBuilder("io.opentelemetry.sdk.trace").build();
meter
.gaugeBuilder("queueSize")
.ofLongs()
.setDescription("The number of items queued")
.setUnit("1")
.buildWithCallback(
result ->
result.record(
queue.size(),
Attributes.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE)));
processedSpansCounter =
meter
.counterBuilder("processedSpans")
.setUnit("1")
.setDescription(
"The number of spans processed by the BatchSpanProcessor. "
+ "[dropped=true if they were dropped due to high throughput]")
.build();
droppedAttrs =
Attributes.of(
SPAN_PROCESSOR_TYPE_LABEL,
SPAN_PROCESSOR_TYPE_VALUE,
SPAN_PROCESSOR_DROPPED_LABEL,
true);
exportedAttrs =
Attributes.of(
SPAN_PROCESSOR_TYPE_LABEL,
SPAN_PROCESSOR_TYPE_VALUE,
SPAN_PROCESSOR_DROPPED_LABEL,
false);

this.batch = new ArrayList<>(this.maxExportBatchSize);
}

private void addSpan(ReadableSpan span) {
if (!queue.offer(span)) {
processedSpansCounter.add(1, droppedAttrs);
metrics.recordSpansProcessed(1L, SpanProcessorMetrics.QUEUE_FULL_DROPPED_ERROR_TYPE);
} else {
if (queueSize.incrementAndGet() >= spansNeeded.get()) {
signal.offer(true);
Expand Down Expand Up @@ -341,10 +299,11 @@ private void exportCurrentBatch() {
}

try {
metrics.recordSpansProcessed(batch.size(), null);
CompletableResultCode result = spanExporter.export(Collections.unmodifiableList(batch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedSpansCounter.add(batch.size(), exportedAttrs);
metrics.recordSpansExportedSuccessfully(batch.size());
} else {
logger.log(Level.FINE, "Exporter failed");
}
Expand Down
Loading
Loading