Skip to content

Commit fcc3417

Browse files
dashpolepellared
andauthored
sdk/trace: self-observability: batch span processor metrics (#6393)
Fixes #7005 Adds `otel.sdk.processor.span.queue.size`, `otel.sdk.processor.span.queue.capacity`, and `otel.sdk.processor.span.processed.count` metrics to the trace batch span processor. These are defined in https://github.com/open-telemetry/semantic-conventions/blob/cb11bb9bac24f4b0e95ad0f61ce01813d8ceada8/docs/otel/sdk-metrics.md, and are experimental. Because of this, metrics are behind the OTEL_GO_X_SELF_OBSERVABILITY feature gate. Given the feature is experimental, it always uses the global meterprovider when enabled. --------- Co-authored-by: Robert Pająk <[email protected]>
1 parent c5e68b2 commit fcc3417

File tree

5 files changed

+359
-4
lines changed

5 files changed

+359
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
4646
- The `go.opentelemetry.io/otel/semconv/v1.36.0` package.
4747
The package contains semantic conventions from the `v1.36.0` version of the OpenTelemetry Semantic Conventions.
4848
See the [migration documentation](./semconv/v1.36.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.34.0.`(#7032)
49-
- Add experimental self-observability span metrics in `go.opentelemetry.io/otel/sdk/trace`.
50-
Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027)
49+
- Add experimental self-observability span and batch span processor metrics in `go.opentelemetry.io/otel/sdk/trace`.
50+
Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027, #6393)
5151
- Add native histogram exemplar support in `go.opentelemetry.io/otel/exporters/prometheus`. (#6772)
5252
- Add experimental self-observability log metrics in `go.opentelemetry.io/otel/sdk/log`.
5353
Check the `go.opentelemetry.io/otel/sdk/log/internal/x` package documentation for more information. (#7121)

sdk/trace/batch_span_processor.go

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,20 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
66
import (
77
"context"
88
"errors"
9+
"fmt"
910
"sync"
1011
"sync/atomic"
1112
"time"
1213

1314
"go.opentelemetry.io/otel"
15+
"go.opentelemetry.io/otel/attribute"
1416
"go.opentelemetry.io/otel/internal/global"
17+
"go.opentelemetry.io/otel/metric"
18+
"go.opentelemetry.io/otel/sdk"
1519
"go.opentelemetry.io/otel/sdk/internal/env"
20+
"go.opentelemetry.io/otel/sdk/trace/internal/x"
21+
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
22+
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
1623
"go.opentelemetry.io/otel/trace"
1724
)
1825

@@ -26,6 +33,8 @@ const (
2633
DefaultMaxExportBatchSize = 512
2734
)
2835

36+
var queueFull = otelconv.ErrorTypeAttr("queue_full")
37+
2938
// BatchSpanProcessorOption configures a BatchSpanProcessor.
3039
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
3140

@@ -69,6 +78,11 @@ type batchSpanProcessor struct {
6978
queue chan ReadOnlySpan
7079
dropped uint32
7180

81+
selfObservabilityEnabled bool
82+
callbackRegistration metric.Registration
83+
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
84+
componentNameAttr attribute.KeyValue
85+
7286
batch []ReadOnlySpan
7387
batchMutex sync.Mutex
7488
timer *time.Timer
@@ -110,6 +124,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
110124
stopCh: make(chan struct{}),
111125
}
112126

127+
bsp.configureSelfObservability()
128+
113129
bsp.stopWait.Add(1)
114130
go func() {
115131
defer bsp.stopWait.Done()
@@ -120,6 +136,55 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
120136
return bsp
121137
}
122138

139+
var processorIDCounter atomic.Int64
140+
141+
// nextProcessorID returns an identifier for this batch span processor,
142+
// starting with 0 and incrementing by 1 each time it is called.
143+
func nextProcessorID() int64 {
144+
return processorIDCounter.Add(1) - 1
145+
}
146+
147+
// configureSelfObservability configures metrics for the batch span processor.
148+
func (bsp *batchSpanProcessor) configureSelfObservability() {
149+
if !x.SelfObservability.Enabled() {
150+
return
151+
}
152+
bsp.selfObservabilityEnabled = true
153+
bsp.componentNameAttr = semconv.OTelComponentName(
154+
fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, nextProcessorID()))
155+
meter := otel.GetMeterProvider().Meter(
156+
selfObsScopeName,
157+
metric.WithInstrumentationVersion(sdk.Version()),
158+
metric.WithSchemaURL(semconv.SchemaURL),
159+
)
160+
161+
queueCapacityUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
162+
if err != nil {
163+
otel.Handle(err)
164+
}
165+
queueSizeUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueSize(meter)
166+
if err != nil {
167+
otel.Handle(err)
168+
}
169+
bsp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter)
170+
if err != nil {
171+
otel.Handle(err)
172+
}
173+
174+
callabckAttributesOpt := metric.WithAttributes(bsp.componentNameAttr,
175+
semconv.OTelComponentTypeBatchingSpanProcessor)
176+
bsp.callbackRegistration, err = meter.RegisterCallback(
177+
func(_ context.Context, o metric.Observer) error {
178+
o.ObserveInt64(queueSizeUpDownCounter.Inst(), int64(len(bsp.queue)), callabckAttributesOpt)
179+
o.ObserveInt64(queueCapacityUpDownCounter.Inst(), int64(bsp.o.MaxQueueSize), callabckAttributesOpt)
180+
return nil
181+
},
182+
queueSizeUpDownCounter.Inst(), queueCapacityUpDownCounter.Inst())
183+
if err != nil {
184+
otel.Handle(err)
185+
}
186+
}
187+
123188
// OnStart method does nothing.
124189
func (*batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
125190

@@ -160,6 +225,9 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
160225
case <-ctx.Done():
161226
err = ctx.Err()
162227
}
228+
if bsp.selfObservabilityEnabled {
229+
err = errors.Join(err, bsp.callbackRegistration.Unregister())
230+
}
163231
})
164232
return err
165233
}
@@ -272,6 +340,11 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
272340

273341
if l := len(bsp.batch); l > 0 {
274342
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
343+
if bsp.selfObservabilityEnabled {
344+
bsp.spansProcessedCounter.Add(ctx, int64(l),
345+
bsp.componentNameAttr,
346+
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor))
347+
}
275348
err := bsp.e.ExportSpans(ctx, bsp.batch)
276349

277350
// A new batch is always created after exporting, even if the batch failed to be exported.
@@ -380,11 +453,17 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
380453
case bsp.queue <- sd:
381454
return true
382455
case <-ctx.Done():
456+
if bsp.selfObservabilityEnabled {
457+
bsp.spansProcessedCounter.Add(ctx, 1,
458+
bsp.componentNameAttr,
459+
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
460+
bsp.spansProcessedCounter.AttrErrorType(queueFull))
461+
}
383462
return false
384463
}
385464
}
386465

387-
func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool {
466+
func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
388467
if !sd.SpanContext().IsSampled() {
389468
return false
390469
}
@@ -394,6 +473,12 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
394473
return true
395474
default:
396475
atomic.AddUint32(&bsp.dropped, 1)
476+
if bsp.selfObservabilityEnabled {
477+
bsp.spansProcessedCounter.Add(ctx, 1,
478+
bsp.componentNameAttr,
479+
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
480+
bsp.spansProcessedCounter.AttrErrorType(queueFull))
481+
}
397482
}
398483
return false
399484
}

0 commit comments

Comments
 (0)