@@ -6,13 +6,20 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
66import (
77 "context"
88 "errors"
9+ "fmt"
910 "sync"
1011 "sync/atomic"
1112 "time"
1213
1314 "go.opentelemetry.io/otel"
15+ "go.opentelemetry.io/otel/attribute"
1416 "go.opentelemetry.io/otel/internal/global"
17+ "go.opentelemetry.io/otel/metric"
18+ "go.opentelemetry.io/otel/sdk"
1519 "go.opentelemetry.io/otel/sdk/internal/env"
20+ "go.opentelemetry.io/otel/sdk/trace/internal/x"
21+ semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
22+ "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
1623 "go.opentelemetry.io/otel/trace"
1724)
1825
@@ -26,6 +33,11 @@ const (
2633 DefaultMaxExportBatchSize = 512
2734)
2835
36+ var (
37+ noError = otelconv .ErrorTypeAttr ("" )
38+ queueFull = otelconv .ErrorTypeAttr ("queue_full" )
39+ )
40+
2941// BatchSpanProcessorOption configures a BatchSpanProcessor.
3042type BatchSpanProcessorOption func (o * BatchSpanProcessorOptions )
3143
@@ -69,6 +81,11 @@ type batchSpanProcessor struct {
6981 queue chan ReadOnlySpan
7082 dropped uint32
7183
84+ selfObservabilityEnabled bool
85+ callbackRegistration metric.Registration
86+ spansProcessedCounter otelconv.SDKProcessorSpanProcessed
87+ componentNameAttr attribute.KeyValue
88+
7289 batch []ReadOnlySpan
7390 batchMutex sync.Mutex
7491 timer * time.Timer
@@ -110,6 +127,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
110127 stopCh : make (chan struct {}),
111128 }
112129
130+ bsp .configureSelfObservability ()
131+
113132 bsp .stopWait .Add (1 )
114133 go func () {
115134 defer bsp .stopWait .Done ()
@@ -120,6 +139,55 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
120139 return bsp
121140}
122141
142+ var processorID atomic.Int64
143+
144+ // nextProcessorID returns an identifier for this batch span processor,
145+ // starting with 0 and incrementing by 1 each time it is called.
146+ func nextProcessorID () int64 {
147+ return processorID .Add (1 ) - 1
148+ }
149+
150+ // configureSelfObservability configures metrics for the batch span processor.
151+ func (bsp * batchSpanProcessor ) configureSelfObservability () {
152+ if ! x .SelfObservability .Enabled () {
153+ return
154+ }
155+ bsp .selfObservabilityEnabled = true
156+ bsp .componentNameAttr = semconv .OTelComponentName (
157+ fmt .Sprintf ("%s/%d" , otelconv .ComponentTypeBatchingSpanProcessor , nextProcessorID ()))
158+ meter := otel .GetMeterProvider ().Meter (
159+ selfObsScopeName ,
160+ metric .WithInstrumentationVersion (sdk .Version ()),
161+ metric .WithSchemaURL (semconv .SchemaURL ),
162+ )
163+
164+ queueCapacityUpDownCounter , err := otelconv .NewSDKProcessorSpanQueueCapacity (meter )
165+ if err != nil {
166+ otel .Handle (err )
167+ }
168+ queueSizeUpDownCounter , err := otelconv .NewSDKProcessorSpanQueueSize (meter )
169+ if err != nil {
170+ otel .Handle (err )
171+ }
172+ bsp .spansProcessedCounter , err = otelconv .NewSDKProcessorSpanProcessed (meter )
173+ if err != nil {
174+ otel .Handle (err )
175+ }
176+
177+ callabckAttributesOpt := metric .WithAttributes (bsp .componentNameAttr ,
178+ semconv .OTelComponentTypeBatchingSpanProcessor )
179+ bsp .callbackRegistration , err = meter .RegisterCallback (
180+ func (ctx context.Context , o metric.Observer ) error {
181+ o .ObserveInt64 (queueSizeUpDownCounter .Inst (), int64 (len (bsp .queue )), callabckAttributesOpt )
182+ o .ObserveInt64 (queueCapacityUpDownCounter .Inst (), int64 (bsp .o .MaxQueueSize ), callabckAttributesOpt )
183+ return nil
184+ },
185+ queueSizeUpDownCounter .Inst (), queueCapacityUpDownCounter .Inst ())
186+ if err != nil {
187+ otel .Handle (err )
188+ }
189+ }
190+
123191// OnStart method does nothing.
124192func (bsp * batchSpanProcessor ) OnStart (parent context.Context , s ReadWriteSpan ) {}
125193
@@ -160,6 +228,9 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
160228 case <- ctx .Done ():
161229 err = ctx .Err ()
162230 }
231+ if bsp .selfObservabilityEnabled {
232+ err = errors .Join (err , bsp .callbackRegistration .Unregister ())
233+ }
163234 })
164235 return err
165236}
@@ -272,6 +343,12 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
272343
273344 if l := len (bsp .batch ); l > 0 {
274345 global .Debug ("exporting spans" , "count" , len (bsp .batch ), "total_dropped" , atomic .LoadUint32 (& bsp .dropped ))
346+ if bsp .selfObservabilityEnabled {
347+ bsp .spansProcessedCounter .Add (ctx , int64 (l ),
348+ bsp .componentNameAttr ,
349+ bsp .spansProcessedCounter .AttrComponentType (otelconv .ComponentTypeBatchingSpanProcessor ),
350+ bsp .spansProcessedCounter .AttrErrorType (noError ))
351+ }
275352 err := bsp .e .ExportSpans (ctx , bsp .batch )
276353
277354 // A new batch is always created after exporting, even if the batch failed to be exported.
@@ -380,11 +457,17 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
380457 case bsp .queue <- sd :
381458 return true
382459 case <- ctx .Done ():
460+ if bsp .selfObservabilityEnabled {
461+ bsp .spansProcessedCounter .Add (ctx , 1 ,
462+ bsp .componentNameAttr ,
463+ bsp .spansProcessedCounter .AttrComponentType (otelconv .ComponentTypeBatchingSpanProcessor ),
464+ bsp .spansProcessedCounter .AttrErrorType (queueFull ))
465+ }
383466 return false
384467 }
385468}
386469
387- func (bsp * batchSpanProcessor ) enqueueDrop (_ context.Context , sd ReadOnlySpan ) bool {
470+ func (bsp * batchSpanProcessor ) enqueueDrop (ctx context.Context , sd ReadOnlySpan ) bool {
388471 if ! sd .SpanContext ().IsSampled () {
389472 return false
390473 }
@@ -394,6 +477,12 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
394477 return true
395478 default :
396479 atomic .AddUint32 (& bsp .dropped , 1 )
480+ if bsp .selfObservabilityEnabled {
481+ bsp .spansProcessedCounter .Add (ctx , 1 ,
482+ bsp .componentNameAttr ,
483+ bsp .spansProcessedCounter .AttrComponentType (otelconv .ComponentTypeBatchingSpanProcessor ),
484+ bsp .spansProcessedCounter .AttrErrorType (queueFull ))
485+ }
397486 }
398487 return false
399488}
0 commit comments