From a0d68e7ee6b4b34722e3348cc3438c83735ba17d Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 22 Jul 2025 17:53:29 +0000 Subject: [PATCH 1/8] add experimental batch span processor metrics --- CHANGELOG.md | 4 +- sdk/trace/batch_span_processor.go | 91 ++++++++- sdk/trace/batch_span_processor_test.go | 243 +++++++++++++++++++++++++ sdk/trace/provider.go | 1 + sdk/trace/tracer.go | 2 +- 5 files changed, 337 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cc352f444c..fabb582c303 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,8 +46,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `go.opentelemetry.io/otel/semconv/v1.36.0` package. The package contains semantic conventions from the `v1.36.0` version of the OpenTelemetry Semantic Conventions. See the [migration documentation](./semconv/v1.36.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.34.0.`(#7032) -- Add experimental self-observability span metrics in `go.opentelemetry.io/otel/sdk/trace`. - Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027) +- Add experimental self-observability span and batch span processor metrics in `go.opentelemetry.io/otel/sdk/trace`. + Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027, #6393) - Add native histogram exemplar support in `go.opentelemetry.io/otel/exporters/prometheus`. (#6772) ### Changed diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index a6e35919b76..bac3abce9e0 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -6,13 +6,20 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" "errors" + "fmt" "sync" "sync/atomic" "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" "go.opentelemetry.io/otel/sdk/internal/env" + "go.opentelemetry.io/otel/sdk/trace/internal/x" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" "go.opentelemetry.io/otel/trace" ) @@ -26,6 +33,11 @@ const ( DefaultMaxExportBatchSize = 512 ) +var ( + noError = otelconv.ErrorTypeAttr("") + queueFull = otelconv.ErrorTypeAttr("queue_full") +) + // BatchSpanProcessorOption configures a BatchSpanProcessor. type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) @@ -69,6 +81,11 @@ type batchSpanProcessor struct { queue chan ReadOnlySpan dropped uint32 + selfObservabilityEnabled bool + callbackRegistration metric.Registration + spansProcessedCounter otelconv.SDKProcessorSpanProcessed + componentNameAttr attribute.KeyValue + batch []ReadOnlySpan batchMutex sync.Mutex timer *time.Timer @@ -110,6 +127,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO stopCh: make(chan struct{}), } + bsp.configureSelfObservability() + bsp.stopWait.Add(1) go func() { defer bsp.stopWait.Done() @@ -120,6 +139,55 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO return bsp } +var processorID atomic.Int64 + +// nextProcessorID returns an identifier for this batch span processor, +// starting with 0 and incrementing by 1 each time it is called. +func nextProcessorID() int64 { + return processorID.Add(1) - 1 +} + +// configureSelfObservability configures metrics for the batch span processor. +func (bsp *batchSpanProcessor) configureSelfObservability() { + if !x.SelfObservability.Enabled() { + return + } + bsp.selfObservabilityEnabled = true + bsp.componentNameAttr = semconv.OTelComponentName( + fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, nextProcessorID())) + meter := otel.GetMeterProvider().Meter( + selfObsScopeName, + metric.WithInstrumentationVersion(sdk.Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + + queueCapacityUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter) + if err != nil { + otel.Handle(err) + } + queueSizeUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueSize(meter) + if err != nil { + otel.Handle(err) + } + bsp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter) + if err != nil { + otel.Handle(err) + } + + callabckAttributesOpt := metric.WithAttributes(bsp.componentNameAttr, + semconv.OTelComponentTypeBatchingSpanProcessor) + bsp.callbackRegistration, err = meter.RegisterCallback( + func(ctx context.Context, o metric.Observer) error { + o.ObserveInt64(queueSizeUpDownCounter.Inst(), int64(len(bsp.queue)), callabckAttributesOpt) + o.ObserveInt64(queueCapacityUpDownCounter.Inst(), int64(bsp.o.MaxQueueSize), callabckAttributesOpt) + return nil + }, + queueSizeUpDownCounter.Inst(), queueCapacityUpDownCounter.Inst()) + if err != nil { + otel.Handle(err) + } +} + // OnStart method does nothing. func (bsp *batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {} @@ -160,6 +228,9 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { case <-ctx.Done(): err = ctx.Err() } + if bsp.selfObservabilityEnabled { + err = errors.Join(err, bsp.callbackRegistration.Unregister()) + } }) return err } @@ -272,6 +343,12 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { if l := len(bsp.batch); l > 0 { global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) + if bsp.selfObservabilityEnabled { + bsp.spansProcessedCounter.Add(ctx, int64(l), + bsp.componentNameAttr, + bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor), + bsp.spansProcessedCounter.AttrErrorType(noError)) + } err := bsp.e.ExportSpans(ctx, bsp.batch) // A new batch is always created after exporting, even if the batch failed to be exported. @@ -380,11 +457,17 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R case bsp.queue <- sd: return true case <-ctx.Done(): + if bsp.selfObservabilityEnabled { + bsp.spansProcessedCounter.Add(ctx, 1, + bsp.componentNameAttr, + bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor), + bsp.spansProcessedCounter.AttrErrorType(queueFull)) + } return false } } -func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool { +func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool { if !sd.SpanContext().IsSampled() { return false } @@ -394,6 +477,12 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b return true default: atomic.AddUint32(&bsp.dropped, 1) + if bsp.selfObservabilityEnabled { + bsp.spansProcessedCounter.Add(ctx, 1, + bsp.componentNameAttr, + bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor), + bsp.spansProcessedCounter.AttrErrorType(queueFull)) + } } return false } diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index f3f4596e1b9..51d85f50341 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -9,13 +9,23 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/internal/env" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" "go.opentelemetry.io/otel/trace" ) @@ -633,3 +643,236 @@ func TestBatchSpanProcessorConcurrentSafe(t *testing.T) { wg.Wait() } + +func TestBatchSpanProcessorMetricsDisabled(t *testing.T) { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "false") + tp := basicTracerProvider(t) + reader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(meterProvider) + me := newBlockingExporter() + t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) }) + bsp := NewBatchSpanProcessor( + me, + // Make sure timeout doesn't trigger during the test. + WithBatchTimeout(time.Hour), + WithMaxQueueSize(2), + WithMaxExportBatchSize(2), + ) + tp.RegisterSpanProcessor(bsp) + + tr := tp.Tracer("TestBatchSpanProcessorMetricsDisabled") + // Generate 2 spans, which export and block during the export call. + generateSpan(t, tr, testOption{genNumSpans: 2}) + me.waitForSpans(2) + + // Validate that there are no metrics produced. + gotMetrics := new(metricdata.ResourceMetrics) + assert.NoError(t, reader.Collect(context.Background(), gotMetrics)) + require.Empty(t, gotMetrics.ScopeMetrics) + // Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full. + generateSpan(t, tr, testOption{genNumSpans: 3}) + // Validate that there are no metrics produced. + gotMetrics = new(metricdata.ResourceMetrics) + assert.NoError(t, reader.Collect(context.Background(), gotMetrics)) + require.Empty(t, gotMetrics.ScopeMetrics) +} + +func TestBatchSpanProcessorMetrics(t *testing.T) { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + tp := basicTracerProvider(t) + reader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(meterProvider) + me := newBlockingExporter() + t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) }) + bsp := NewBatchSpanProcessor( + me, + // Make sure timeout doesn't trigger during the test. + WithBatchTimeout(time.Hour), + WithMaxQueueSize(2), + WithMaxExportBatchSize(2), + ) + internalBsp := bsp.(*batchSpanProcessor) + tp.RegisterSpanProcessor(bsp) + + tr := tp.Tracer("TestBatchSpanProcessorMetrics") + // Generate 2 spans, which export and block during the export call. + generateSpan(t, tr, testOption{genNumSpans: 2}) + me.waitForSpans(2) + assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2}) + // Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full. + generateSpan(t, tr, testOption{genNumSpans: 3}) + assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2}) +} + +func TestBatchSpanProcessorBlockingMetrics(t *testing.T) { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + tp := basicTracerProvider(t) + reader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(meterProvider) + me := newBlockingExporter() + t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) }) + bsp := NewBatchSpanProcessor( + me, + // Use WithBlocking so we can trigger a queueFull using ForceFlush. + WithBlocking(), + // Make sure timeout doesn't trigger during the test. + WithBatchTimeout(time.Hour), + WithMaxQueueSize(2), + WithMaxExportBatchSize(2), + ) + internalBsp := bsp.(*batchSpanProcessor) + tp.RegisterSpanProcessor(bsp) + + tr := tp.Tracer("TestBatchSpanProcessorBlockingMetrics") + // Generate 2 spans that are exported to the exporter, which blocks. + generateSpan(t, tr, testOption{genNumSpans: 2}) + me.waitForSpans(2) + assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2}) + // Generate 2 spans to fill the queue. + generateSpan(t, tr, testOption{genNumSpans: 2}) + go func() { + // Generate a span which blocks because the queue is full. + generateSpan(t, tr, testOption{genNumSpans: 1}) + }() + assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 2, successProcessed: 2}) + + // Use ForceFlush to force the span that is blocking on the full queue to be dropped. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + assert.Error(t, tp.ForceFlush(ctx)) + assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2}) +} + +type expectMetrics struct { + queueCapacity int64 + queueSize int64 + successProcessed int64 + queueFullProcessed int64 +} + +func assertScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, reader sdkmetric.Reader, + expectation expectMetrics, +) { + t.Helper() + gotMetrics := new(metricdata.ResourceMetrics) + assert.NoError(t, reader.Collect(context.Background(), gotMetrics)) + require.Len(t, gotMetrics.ScopeMetrics, 1) + sm := gotMetrics.ScopeMetrics[0] + assert.Equal(t, instrumentation.Scope{ + Name: selfObsScopeName, + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, sm.Scope) + wantProcessedDataPoints := []metricdata.DataPoint[int64]{} + if expectation.successProcessed > 0 { + wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{ + Value: expectation.successProcessed, + Attributes: attribute.NewSet( + semconv.OTelComponentTypeBatchingSpanProcessor, + componentNameAttr, + semconv.ErrorTypeKey.String(string(noError)), + ), + }) + } + if expectation.queueFullProcessed > 0 { + wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{ + Value: expectation.queueFullProcessed, + Attributes: attribute.NewSet( + semconv.OTelComponentTypeBatchingSpanProcessor, + componentNameAttr, + semconv.ErrorTypeKey.String(string(queueFull)), + ), + }) + } + + // sm.Metrics also includes otel.sdk.span.live and otel.sdk.span.started + if len(wantProcessedDataPoints) > 0 { + require.Len(t, sm.Metrics, 5) + } else { + require.Len(t, sm.Metrics, 4) + } + + baseAttrs := attribute.NewSet( + semconv.OTelComponentTypeBatchingSpanProcessor, + componentNameAttr, + ) + + want := metricdata.Metrics{ + Name: otelconv.SDKProcessorSpanQueueCapacity{}.Name(), + Description: otelconv.SDKProcessorSpanQueueCapacity{}.Description(), + Unit: otelconv.SDKProcessorSpanQueueCapacity{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueCapacity}}, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + } + metricdatatest.AssertEqual(t, want, sm.Metrics[0], metricdatatest.IgnoreTimestamp()) + + want = metricdata.Metrics{ + Name: otelconv.SDKProcessorSpanQueueSize{}.Name(), + Description: otelconv.SDKProcessorSpanQueueSize{}.Description(), + Unit: otelconv.SDKProcessorSpanQueueSize{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueSize}}, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + } + metricdatatest.AssertEqual(t, want, sm.Metrics[1], metricdatatest.IgnoreTimestamp()) + + if len(wantProcessedDataPoints) > 0 { + want = metricdata.Metrics{ + Name: otelconv.SDKProcessorSpanProcessed{}.Name(), + Description: otelconv.SDKProcessorSpanProcessed{}.Description(), + Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: wantProcessedDataPoints, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + } + metricdatatest.AssertEqual(t, want, sm.Metrics[2], metricdatatest.IgnoreTimestamp()) + } +} + +// blockingExporter blocks until the exported span is removed from the channel. +type blockingExporter struct { + shutdown chan struct{} + total atomic.Int32 +} + +func newBlockingExporter() *blockingExporter { + e := &blockingExporter{shutdown: make(chan struct{})} + return e +} + +func (e *blockingExporter) Shutdown(ctx context.Context) error { + select { + case <-e.shutdown: + default: + close(e.shutdown) + } + return ctx.Err() +} + +func (e *blockingExporter) ExportSpans(ctx context.Context, s []ReadOnlySpan) error { + e.total.Add(int32(len(s))) + <-e.shutdown + return ctx.Err() +} + +func (e *blockingExporter) waitForSpans(n int32) { + // Wait for all n spans to reach the export call + // nolint: revive // intentionally empty block + for e.total.Load() < n { + } +} diff --git a/sdk/trace/provider.go b/sdk/trace/provider.go index fd942d23e80..62e87fd631e 100644 --- a/sdk/trace/provider.go +++ b/sdk/trace/provider.go @@ -20,6 +20,7 @@ import ( const ( defaultTracerName = "go.opentelemetry.io/otel/sdk/tracer" + selfObsScopeName = "go.opentelemetry.io/otel/sdk/trace" ) // tracerProviderConfig. diff --git a/sdk/trace/tracer.go b/sdk/trace/tracer.go index ddb72cb77d4..9d0ff05a31b 100644 --- a/sdk/trace/tracer.go +++ b/sdk/trace/tracer.go @@ -39,7 +39,7 @@ func (tr *tracer) initSelfObservability() { tr.selfObservabilityEnabled = true mp := otel.GetMeterProvider() - m := mp.Meter("go.opentelemetry.io/otel/sdk/trace", + m := mp.Meter(selfObsScopeName, metric.WithInstrumentationVersion(sdk.Version()), metric.WithSchemaURL(semconv.SchemaURL)) From d20963d94b689d1757d4295511f9edccef8cd396 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 30 Jul 2025 14:37:33 +0000 Subject: [PATCH 2/8] address comments --- sdk/trace/batch_span_processor.go | 4 ++-- sdk/trace/batch_span_processor_test.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index bac3abce9e0..2d6ec469047 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -139,12 +139,12 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO return bsp } -var processorID atomic.Int64 +var processorIDCounter atomic.Int64 // nextProcessorID returns an identifier for this batch span processor, // starting with 0 and incrementing by 1 each time it is called. func nextProcessorID() int64 { - return processorID.Add(1) - 1 + return processorIDCounter.Add(1) - 1 } // configureSelfObservability configures metrics for the batch span processor. diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 51d85f50341..6d546295521 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "errors" "fmt" + "runtime" "sync" "sync/atomic" "testing" @@ -872,7 +873,7 @@ func (e *blockingExporter) ExportSpans(ctx context.Context, s []ReadOnlySpan) er func (e *blockingExporter) waitForSpans(n int32) { // Wait for all n spans to reach the export call - // nolint: revive // intentionally empty block for e.total.Load() < n { + runtime.Gosched() } } From e0d4b487e5235790b2f7c626e6f1cf35437ed96a Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 6 Aug 2025 16:55:48 +0000 Subject: [PATCH 3/8] remove noError --- sdk/trace/batch_span_processor.go | 8 ++------ sdk/trace/batch_span_processor_test.go | 1 - 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 2d6ec469047..845f4dfd548 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -33,10 +33,7 @@ const ( DefaultMaxExportBatchSize = 512 ) -var ( - noError = otelconv.ErrorTypeAttr("") - queueFull = otelconv.ErrorTypeAttr("queue_full") -) +var queueFull = otelconv.ErrorTypeAttr("queue_full") // BatchSpanProcessorOption configures a BatchSpanProcessor. type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) @@ -346,8 +343,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { if bsp.selfObservabilityEnabled { bsp.spansProcessedCounter.Add(ctx, int64(l), bsp.componentNameAttr, - bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor), - bsp.spansProcessedCounter.AttrErrorType(noError)) + bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor)) } err := bsp.e.ExportSpans(ctx, bsp.batch) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 6d546295521..1e99795fbd8 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -779,7 +779,6 @@ func assertScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, read Attributes: attribute.NewSet( semconv.OTelComponentTypeBatchingSpanProcessor, componentNameAttr, - semconv.ErrorTypeKey.String(string(noError)), ), }) } From 901fda839750dbe5b1f2b5da2ac499fdf572d2b4 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 6 Aug 2025 13:03:54 -0400 Subject: [PATCH 4/8] Fix lint --- sdk/trace/batch_span_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 845f4dfd548..8a2de20f544 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -174,7 +174,7 @@ func (bsp *batchSpanProcessor) configureSelfObservability() { callabckAttributesOpt := metric.WithAttributes(bsp.componentNameAttr, semconv.OTelComponentTypeBatchingSpanProcessor) bsp.callbackRegistration, err = meter.RegisterCallback( - func(ctx context.Context, o metric.Observer) error { + func(_ context.Context, o metric.Observer) error { o.ObserveInt64(queueSizeUpDownCounter.Inst(), int64(len(bsp.queue)), callabckAttributesOpt) o.ObserveInt64(queueCapacityUpDownCounter.Inst(), int64(bsp.o.MaxQueueSize), callabckAttributesOpt) return nil From 8369e1553ced1df27045e4f70d581f2dddd711b3 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 7 Aug 2025 09:48:53 -0400 Subject: [PATCH 5/8] Update sdk/trace/batch_span_processor_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert Pająk --- sdk/trace/batch_span_processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 1e99795fbd8..920a3f019e9 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -759,7 +759,7 @@ type expectMetrics struct { queueFullProcessed int64 } -func assertScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, reader sdkmetric.Reader, +func assertSelfObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, reader sdkmetric.Reader, expectation expectMetrics, ) { t.Helper() From f1c41eb55f1aa38952a9f5935a26d7d062ddd6ba Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 7 Aug 2025 14:44:20 +0000 Subject: [PATCH 6/8] assert against entire ScopeMetrics --- sdk/trace/batch_span_processor_test.go | 135 ++++++++++++++----------- 1 file changed, 74 insertions(+), 61 deletions(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 920a3f019e9..6e35bde4648 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -645,11 +645,22 @@ func TestBatchSpanProcessorConcurrentSafe(t *testing.T) { wg.Wait() } +// Drop metrics not being tested in this test. +var dropSpanMetricsView = sdkmetric.NewView( + sdkmetric.Instrument{ + Name: "otel.sdk.span.*", + }, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationDrop{}}, +) + func TestBatchSpanProcessorMetricsDisabled(t *testing.T) { t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "false") tp := basicTracerProvider(t) reader := sdkmetric.NewManualReader() - meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + sdkmetric.WithView(dropSpanMetricsView), + ) otel.SetMeterProvider(meterProvider) me := newBlockingExporter() t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) }) @@ -683,7 +694,10 @@ func TestBatchSpanProcessorMetrics(t *testing.T) { t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") tp := basicTracerProvider(t) reader := sdkmetric.NewManualReader() - meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + sdkmetric.WithView(dropSpanMetricsView), + ) otel.SetMeterProvider(meterProvider) me := newBlockingExporter() t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) }) @@ -701,11 +715,11 @@ func TestBatchSpanProcessorMetrics(t *testing.T) { // Generate 2 spans, which export and block during the export call. generateSpan(t, tr, testOption{genNumSpans: 2}) me.waitForSpans(2) - assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2}) // Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full. generateSpan(t, tr, testOption{genNumSpans: 3}) - assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2}) } @@ -713,7 +727,10 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) { t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") tp := basicTracerProvider(t) reader := sdkmetric.NewManualReader() - meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + sdkmetric.WithView(dropSpanMetricsView), + ) otel.SetMeterProvider(meterProvider) me := newBlockingExporter() t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) }) @@ -733,7 +750,7 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) { // Generate 2 spans that are exported to the exporter, which blocks. generateSpan(t, tr, testOption{genNumSpans: 2}) me.waitForSpans(2) - assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2}) // Generate 2 spans to fill the queue. generateSpan(t, tr, testOption{genNumSpans: 2}) @@ -741,14 +758,14 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) { // Generate a span which blocks because the queue is full. generateSpan(t, tr, testOption{genNumSpans: 1}) }() - assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, expectMetrics{queueCapacity: 2, queueSize: 2, successProcessed: 2}) // Use ForceFlush to force the span that is blocking on the full queue to be dropped. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() assert.Error(t, tp.ForceFlush(ctx)) - assertScopeMetrics(t, internalBsp.componentNameAttr, reader, + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2}) } @@ -763,15 +780,36 @@ func assertSelfObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValu expectation expectMetrics, ) { t.Helper() - gotMetrics := new(metricdata.ResourceMetrics) - assert.NoError(t, reader.Collect(context.Background(), gotMetrics)) - require.Len(t, gotMetrics.ScopeMetrics, 1) - sm := gotMetrics.ScopeMetrics[0] - assert.Equal(t, instrumentation.Scope{ - Name: selfObsScopeName, - Version: sdk.Version(), - SchemaURL: semconv.SchemaURL, - }, sm.Scope) + gotResourceMetrics := new(metricdata.ResourceMetrics) + assert.NoError(t, reader.Collect(context.Background(), gotResourceMetrics)) + + baseAttrs := attribute.NewSet( + semconv.OTelComponentTypeBatchingSpanProcessor, + componentNameAttr, + ) + wantMetrics := []metricdata.Metrics{ + { + Name: otelconv.SDKProcessorSpanQueueCapacity{}.Name(), + Description: otelconv.SDKProcessorSpanQueueCapacity{}.Description(), + Unit: otelconv.SDKProcessorSpanQueueCapacity{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueCapacity}}, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + { + Name: otelconv.SDKProcessorSpanQueueSize{}.Name(), + Description: otelconv.SDKProcessorSpanQueueSize{}.Description(), + Unit: otelconv.SDKProcessorSpanQueueSize{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueSize}}, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + } + wantProcessedDataPoints := []metricdata.DataPoint[int64]{} if expectation.successProcessed > 0 { wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{ @@ -793,55 +831,30 @@ func assertSelfObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValu }) } - // sm.Metrics also includes otel.sdk.span.live and otel.sdk.span.started if len(wantProcessedDataPoints) > 0 { - require.Len(t, sm.Metrics, 5) - } else { - require.Len(t, sm.Metrics, 4) - } - - baseAttrs := attribute.NewSet( - semconv.OTelComponentTypeBatchingSpanProcessor, - componentNameAttr, - ) - - want := metricdata.Metrics{ - Name: otelconv.SDKProcessorSpanQueueCapacity{}.Name(), - Description: otelconv.SDKProcessorSpanQueueCapacity{}.Description(), - Unit: otelconv.SDKProcessorSpanQueueCapacity{}.Unit(), - Data: metricdata.Sum[int64]{ - DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueCapacity}}, - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: false, - }, + wantMetrics = append(wantMetrics, + metricdata.Metrics{ + Name: otelconv.SDKProcessorSpanProcessed{}.Name(), + Description: otelconv.SDKProcessorSpanProcessed{}.Description(), + Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: wantProcessedDataPoints, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + ) } - metricdatatest.AssertEqual(t, want, sm.Metrics[0], metricdatatest.IgnoreTimestamp()) - want = metricdata.Metrics{ - Name: otelconv.SDKProcessorSpanQueueSize{}.Name(), - Description: otelconv.SDKProcessorSpanQueueSize{}.Description(), - Unit: otelconv.SDKProcessorSpanQueueSize{}.Unit(), - Data: metricdata.Sum[int64]{ - DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueSize}}, - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: false, + wantScopeMetric := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: selfObsScopeName, + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, }, + Metrics: wantMetrics, } - metricdatatest.AssertEqual(t, want, sm.Metrics[1], metricdatatest.IgnoreTimestamp()) - - if len(wantProcessedDataPoints) > 0 { - want = metricdata.Metrics{ - Name: otelconv.SDKProcessorSpanProcessed{}.Name(), - Description: otelconv.SDKProcessorSpanProcessed{}.Description(), - Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(), - Data: metricdata.Sum[int64]{ - DataPoints: wantProcessedDataPoints, - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - }, - } - metricdatatest.AssertEqual(t, want, sm.Metrics[2], metricdatatest.IgnoreTimestamp()) - } + metricdatatest.AssertEqual(t, wantScopeMetric, gotResourceMetrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) } // blockingExporter blocks until the exported span is removed from the channel. From bd6e7b2243257e16595391c86885feae74dd6831 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 7 Aug 2025 14:55:37 +0000 Subject: [PATCH 7/8] waitForSpans respects cancellation --- sdk/trace/batch_span_processor_test.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 6e35bde4648..860e19f0818 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -676,7 +676,9 @@ func TestBatchSpanProcessorMetricsDisabled(t *testing.T) { tr := tp.Tracer("TestBatchSpanProcessorMetricsDisabled") // Generate 2 spans, which export and block during the export call. generateSpan(t, tr, testOption{genNumSpans: 2}) - me.waitForSpans(2) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + assert.NoError(t, me.waitForSpans(ctx, 2)) // Validate that there are no metrics produced. gotMetrics := new(metricdata.ResourceMetrics) @@ -714,7 +716,9 @@ func TestBatchSpanProcessorMetrics(t *testing.T) { tr := tp.Tracer("TestBatchSpanProcessorMetrics") // Generate 2 spans, which export and block during the export call. generateSpan(t, tr, testOption{genNumSpans: 2}) - me.waitForSpans(2) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + assert.NoError(t, me.waitForSpans(ctx, 2)) assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2}) // Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full. @@ -749,7 +753,9 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) { tr := tp.Tracer("TestBatchSpanProcessorBlockingMetrics") // Generate 2 spans that are exported to the exporter, which blocks. generateSpan(t, tr, testOption{genNumSpans: 2}) - me.waitForSpans(2) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + assert.NoError(t, me.waitForSpans(ctx, 2)) assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2}) // Generate 2 spans to fill the queue. @@ -762,7 +768,7 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) { expectMetrics{queueCapacity: 2, queueSize: 2, successProcessed: 2}) // Use ForceFlush to force the span that is blocking on the full queue to be dropped. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() assert.Error(t, tp.ForceFlush(ctx)) assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, @@ -883,9 +889,16 @@ func (e *blockingExporter) ExportSpans(ctx context.Context, s []ReadOnlySpan) er return ctx.Err() } -func (e *blockingExporter) waitForSpans(n int32) { +func (e *blockingExporter) waitForSpans(ctx context.Context, n int32) error { // Wait for all n spans to reach the export call for e.total.Load() < n { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for %d spans to be exported", n) + default: + // So the select will not block + } runtime.Gosched() } + return nil } From 33868c27894cb0016b202debbee86cc057cdc792 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 11 Aug 2025 09:54:44 -0400 Subject: [PATCH 8/8] Update sdk/trace/batch_span_processor_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert Pająk --- sdk/trace/batch_span_processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 860e19f0818..c2e596e312d 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -854,7 +854,7 @@ func assertSelfObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValu wantScopeMetric := metricdata.ScopeMetrics{ Scope: instrumentation.Scope{ - Name: selfObsScopeName, + Name: "go.opentelemetry.io/otel/sdk/trace", Version: sdk.Version(), SchemaURL: semconv.SchemaURL, },