Skip to content

Commit e4ab314

Browse files
authored
Encapsulate SDK BatchSpanProcessor observability (#7332)
Split from #7316 [Follow guidelines](https://github.com/open-telemetry/opentelemetry-go/blob/a5dcd68ebb2f3669f7685ac7b0f3f1624251a381/CONTRIBUTING.md#encapsulation) and move instrumentation into its own type. ### Benchmarks #### Added `sdk/trace/internal/observ` benchmarks ``` goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/trace/internal/observ cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz │ enc-trace-sdk-bsp-obs.out │ │ sec/op │ BSP/Processed-8 0.6394n ± 2% BSP/ProcessedQueueFull-8 0.6806n ± 3% BSP/Callback-8 3.591µ ± 12% geomean 11.60n │ enc-trace-sdk-bsp-obs.out │ │ B/op │ BSP/Processed-8 0.000 ± 0% BSP/ProcessedQueueFull-8 0.000 ± 0% BSP/Callback-8 2.626Ki ± 0% geomean ¹ ¹ summaries must be >0 to compute geomean │ enc-trace-sdk-bsp-obs.out │ │ allocs/op │ BSP/Processed-8 0.000 ± 0% BSP/ProcessedQueueFull-8 0.000 ± 0% BSP/Callback-8 16.00 ± 0% geomean ¹ ¹ summaries must be >0 to compute geomean ``` #### Existing `sdk/trace` benchmarks None
1 parent 6243f21 commit e4ab314

File tree

6 files changed

+508
-103
lines changed

6 files changed

+508
-103
lines changed

sdk/trace/batch_span_processor.go

Lines changed: 18 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,14 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
66
import (
77
"context"
88
"errors"
9-
"fmt"
109
"sync"
1110
"sync/atomic"
1211
"time"
1312

1413
"go.opentelemetry.io/otel"
15-
"go.opentelemetry.io/otel/attribute"
1614
"go.opentelemetry.io/otel/internal/global"
17-
"go.opentelemetry.io/otel/metric"
18-
"go.opentelemetry.io/otel/sdk"
1915
"go.opentelemetry.io/otel/sdk/internal/env"
20-
"go.opentelemetry.io/otel/sdk/trace/internal/x"
21-
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
22-
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
16+
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
2317
"go.opentelemetry.io/otel/trace"
2418
)
2519

@@ -33,8 +27,6 @@ const (
3327
DefaultMaxExportBatchSize = 512
3428
)
3529

36-
var queueFull = otelconv.ErrorTypeAttr("queue_full")
37-
3830
// BatchSpanProcessorOption configures a BatchSpanProcessor.
3931
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
4032

@@ -78,10 +70,7 @@ type batchSpanProcessor struct {
7870
queue chan ReadOnlySpan
7971
dropped uint32
8072

81-
observabilityEnabled bool
82-
callbackRegistration metric.Registration
83-
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
84-
componentNameAttr attribute.KeyValue
73+
inst *observ.BSP
8574

8675
batch []ReadOnlySpan
8776
batchMutex sync.Mutex
@@ -124,19 +113,14 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
124113
stopCh: make(chan struct{}),
125114
}
126115

127-
if x.Observability.Enabled() {
128-
bsp.observabilityEnabled = true
129-
bsp.componentNameAttr = componentName()
130-
131-
var err error
132-
bsp.spansProcessedCounter, bsp.callbackRegistration, err = newBSPObs(
133-
bsp.componentNameAttr,
134-
func() int64 { return int64(len(bsp.queue)) },
135-
int64(bsp.o.MaxQueueSize),
136-
)
137-
if err != nil {
138-
otel.Handle(err)
139-
}
116+
var err error
117+
bsp.inst, err = observ.NewBSP(
118+
nextProcessorID(),
119+
func() int64 { return int64(len(bsp.queue)) },
120+
int64(bsp.o.MaxQueueSize),
121+
)
122+
if err != nil {
123+
otel.Handle(err)
140124
}
141125

142126
bsp.stopWait.Add(1)
@@ -157,51 +141,6 @@ func nextProcessorID() int64 {
157141
return processorIDCounter.Add(1) - 1
158142
}
159143

160-
func componentName() attribute.KeyValue {
161-
id := nextProcessorID()
162-
name := fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, id)
163-
return semconv.OTelComponentName(name)
164-
}
165-
166-
// newBSPObs creates and returns a new set of metrics instruments and a
167-
// registration for a BatchSpanProcessor. It is the caller's responsibility
168-
// to unregister the registration when it is no longer needed.
169-
func newBSPObs(
170-
cmpnt attribute.KeyValue,
171-
qLen func() int64,
172-
qMax int64,
173-
) (otelconv.SDKProcessorSpanProcessed, metric.Registration, error) {
174-
meter := otel.GetMeterProvider().Meter(
175-
obsScopeName,
176-
metric.WithInstrumentationVersion(sdk.Version()),
177-
metric.WithSchemaURL(semconv.SchemaURL),
178-
)
179-
180-
qCap, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
181-
182-
qSize, e := otelconv.NewSDKProcessorSpanQueueSize(meter)
183-
err = errors.Join(err, e)
184-
185-
spansProcessed, e := otelconv.NewSDKProcessorSpanProcessed(meter)
186-
err = errors.Join(err, e)
187-
188-
cmpntT := semconv.OTelComponentTypeBatchingSpanProcessor
189-
attrs := metric.WithAttributes(cmpnt, cmpntT)
190-
191-
reg, e := meter.RegisterCallback(
192-
func(_ context.Context, o metric.Observer) error {
193-
o.ObserveInt64(qSize.Inst(), qLen(), attrs)
194-
o.ObserveInt64(qCap.Inst(), qMax, attrs)
195-
return nil
196-
},
197-
qSize.Inst(),
198-
qCap.Inst(),
199-
)
200-
err = errors.Join(err, e)
201-
202-
return spansProcessed, reg, err
203-
}
204-
205144
// OnStart method does nothing.
206145
func (*batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
207146

@@ -242,8 +181,8 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
242181
case <-ctx.Done():
243182
err = ctx.Err()
244183
}
245-
if bsp.observabilityEnabled {
246-
err = errors.Join(err, bsp.callbackRegistration.Unregister())
184+
if bsp.inst != nil {
185+
err = errors.Join(err, bsp.inst.Shutdown())
247186
}
248187
})
249188
return err
@@ -357,10 +296,8 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
357296

358297
if l := len(bsp.batch); l > 0 {
359298
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
360-
if bsp.observabilityEnabled {
361-
bsp.spansProcessedCounter.Add(ctx, int64(l),
362-
bsp.componentNameAttr,
363-
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor))
299+
if bsp.inst != nil {
300+
bsp.inst.Processed(ctx, int64(l))
364301
}
365302
err := bsp.e.ExportSpans(ctx, bsp.batch)
366303

@@ -470,11 +407,8 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
470407
case bsp.queue <- sd:
471408
return true
472409
case <-ctx.Done():
473-
if bsp.observabilityEnabled {
474-
bsp.spansProcessedCounter.Add(ctx, 1,
475-
bsp.componentNameAttr,
476-
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
477-
bsp.spansProcessedCounter.AttrErrorType(queueFull))
410+
if bsp.inst != nil {
411+
bsp.inst.ProcessedQueueFull(ctx, 1)
478412
}
479413
return false
480414
}
@@ -490,11 +424,8 @@ func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan)
490424
return true
491425
default:
492426
atomic.AddUint32(&bsp.dropped, 1)
493-
if bsp.observabilityEnabled {
494-
bsp.spansProcessedCounter.Add(ctx, 1,
495-
bsp.componentNameAttr,
496-
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
497-
bsp.spansProcessedCounter.AttrErrorType(queueFull))
427+
if bsp.inst != nil {
428+
bsp.inst.ProcessedQueueFull(ctx, 1)
498429
}
499430
}
500431
return false

sdk/trace/batch_span_processor_test.go

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ import (
2525
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
2626
"go.opentelemetry.io/otel/sdk/metric/metricdata"
2727
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
28+
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
2829
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
2930
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
3031
"go.opentelemetry.io/otel/trace"
3132
)
3233

34+
const componentID = 0
35+
3336
type testBatchExporter struct {
3437
mu sync.Mutex
3538
spans []ReadOnlySpan
@@ -693,6 +696,9 @@ func TestBatchSpanProcessorMetricsDisabled(t *testing.T) {
693696
}
694697

695698
func TestBatchSpanProcessorMetrics(t *testing.T) {
699+
// Reset for deterministic component ID.
700+
processorIDCounter.Store(componentID)
701+
696702
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
697703
tp := basicTracerProvider(t)
698704
reader := sdkmetric.NewManualReader()
@@ -710,7 +716,6 @@ func TestBatchSpanProcessorMetrics(t *testing.T) {
710716
WithMaxQueueSize(2),
711717
WithMaxExportBatchSize(2),
712718
)
713-
internalBsp := bsp.(*batchSpanProcessor)
714719
tp.RegisterSpanProcessor(bsp)
715720

716721
tr := tp.Tracer("TestBatchSpanProcessorMetrics")
@@ -719,15 +724,25 @@ func TestBatchSpanProcessorMetrics(t *testing.T) {
719724
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
720725
defer cancel()
721726
assert.NoError(t, me.waitForSpans(ctx, 2))
722-
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
723-
expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2})
727+
assertObsScopeMetrics(t, reader, expectMetrics{
728+
queueCapacity: 2,
729+
queueSize: 0,
730+
successProcessed: 2,
731+
})
724732
// Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full.
725733
generateSpan(t, tr, testOption{genNumSpans: 3})
726-
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
727-
expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2})
734+
assertObsScopeMetrics(t, reader, expectMetrics{
735+
queueCapacity: 2,
736+
queueSize: 2,
737+
queueFullProcessed: 1,
738+
successProcessed: 2,
739+
})
728740
}
729741

730742
func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
743+
// Reset for deterministic component ID.
744+
processorIDCounter.Store(componentID)
745+
731746
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
732747
tp := basicTracerProvider(t)
733748
reader := sdkmetric.NewManualReader()
@@ -747,7 +762,6 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
747762
WithMaxQueueSize(2),
748763
WithMaxExportBatchSize(2),
749764
)
750-
internalBsp := bsp.(*batchSpanProcessor)
751765
tp.RegisterSpanProcessor(bsp)
752766

753767
tr := tp.Tracer("TestBatchSpanProcessorBlockingMetrics")
@@ -756,23 +770,33 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
756770
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
757771
defer cancel()
758772
assert.NoError(t, me.waitForSpans(ctx, 2))
759-
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
760-
expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2})
773+
assertObsScopeMetrics(t, reader, expectMetrics{
774+
queueCapacity: 2,
775+
queueSize: 0,
776+
successProcessed: 2,
777+
})
761778
// Generate 2 spans to fill the queue.
762779
generateSpan(t, tr, testOption{genNumSpans: 2})
763780
go func() {
764781
// Generate a span which blocks because the queue is full.
765782
generateSpan(t, tr, testOption{genNumSpans: 1})
766783
}()
767-
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
768-
expectMetrics{queueCapacity: 2, queueSize: 2, successProcessed: 2})
784+
assertObsScopeMetrics(t, reader, expectMetrics{
785+
queueCapacity: 2,
786+
queueSize: 2,
787+
successProcessed: 2,
788+
})
769789

770790
// Use ForceFlush to force the span that is blocking on the full queue to be dropped.
771791
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
772792
defer cancel()
773793
assert.Error(t, tp.ForceFlush(ctx))
774-
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
775-
expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2})
794+
assertObsScopeMetrics(t, reader, expectMetrics{
795+
queueCapacity: 2,
796+
queueSize: 2,
797+
queueFullProcessed: 1,
798+
successProcessed: 2,
799+
})
776800
}
777801

778802
type expectMetrics struct {
@@ -782,13 +806,16 @@ type expectMetrics struct {
782806
queueFullProcessed int64
783807
}
784808

785-
func assertObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, reader sdkmetric.Reader,
809+
func assertObsScopeMetrics(
810+
t *testing.T,
811+
reader sdkmetric.Reader,
786812
expectation expectMetrics,
787813
) {
788814
t.Helper()
789815
gotResourceMetrics := new(metricdata.ResourceMetrics)
790816
assert.NoError(t, reader.Collect(context.Background(), gotResourceMetrics))
791817

818+
componentNameAttr := observ.BSPComponentName(componentID)
792819
baseAttrs := attribute.NewSet(
793820
semconv.OTelComponentTypeBatchingSpanProcessor,
794821
componentNameAttr,
@@ -832,7 +859,7 @@ func assertObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, r
832859
Attributes: attribute.NewSet(
833860
semconv.OTelComponentTypeBatchingSpanProcessor,
834861
componentNameAttr,
835-
semconv.ErrorTypeKey.String(string(queueFull)),
862+
observ.ErrQueueFull,
836863
),
837864
})
838865
}
@@ -854,9 +881,9 @@ func assertObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, r
854881

855882
wantScopeMetric := metricdata.ScopeMetrics{
856883
Scope: instrumentation.Scope{
857-
Name: "go.opentelemetry.io/otel/sdk/trace",
884+
Name: observ.ScopeName,
858885
Version: sdk.Version(),
859-
SchemaURL: semconv.SchemaURL,
886+
SchemaURL: observ.SchemaURL,
860887
},
861888
Metrics: wantMetrics,
862889
}

0 commit comments

Comments
 (0)