@@ -6,13 +6,19 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
66import (
77 "context"
88 "errors"
9+ "fmt"
910 "sync"
1011 "sync/atomic"
1112 "time"
1213
1314 "go.opentelemetry.io/otel"
15+ "go.opentelemetry.io/otel/attribute"
1416 "go.opentelemetry.io/otel/internal/global"
17+ "go.opentelemetry.io/otel/metric"
18+ "go.opentelemetry.io/otel/metric/noop"
1519 "go.opentelemetry.io/otel/sdk/internal/env"
20+ "go.opentelemetry.io/otel/sdk/internal/x"
21+ semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
1622 "go.opentelemetry.io/otel/trace"
1723)
1824
@@ -24,6 +30,19 @@ const (
2430 // DefaultExportTimeout is the duration after which an export is cancelled, in milliseconds.
2531 DefaultExportTimeout = 30000
2632 DefaultMaxExportBatchSize = 512
33+
34+ queueSizeMetricName = "otel.sdk.processor.span.queue.size"
35+ queueSizeMetricDescription = "The number of spans in the queue of a given instance of an SDK span processor"
36+ queueCapacityMetricName = "otel.sdk.processor.span.queue.capacity"
37+ queueCapacityMetricDescription = "The maximum number of spans the queue of a given instance of an SDK span processor can hold"
38+ spansProcessedMetricName = "otel.sdk.processor.span.processed.count"
39+ spansProcessedMetricDescription = "The number of spans for which the processing has finished, either successful or failed"
40+ spanCountUnit = "{span}"
41+ )
42+
43+ var (
44+ componentTypeKey = attribute .Key ("otel.component.type" )
45+ componentNameKey = attribute .Key ("otel.component.name" )
2746)
2847
2948// BatchSpanProcessorOption configures a BatchSpanProcessor.
@@ -69,6 +88,12 @@ type batchSpanProcessor struct {
6988 queue chan ReadOnlySpan
7089 dropped uint32
7190
91+ callbackRegistration metric.Registration
92+ spansProcessedCounter metric.Int64Counter
93+ successAttributes metric.MeasurementOption
94+ alreadyShutdownAttributes metric.MeasurementOption
95+ queueFullAttributes metric.MeasurementOption
96+
7297 batch []ReadOnlySpan
7398 batchMutex sync.Mutex
7499 timer * time.Timer
@@ -114,6 +139,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
114139 stopCh : make (chan struct {}),
115140 }
116141
142+ bsp .configureSelfObservability ()
143+
117144 bsp .stopWait .Add (1 )
118145 go func () {
119146 defer bsp .stopWait .Done ()
@@ -124,13 +151,74 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
124151 return bsp
125152}
126153
154+ var processorID atomic.Uint64
155+
156+ // nextProcessorID returns an identifier for this batch span processor,
157+ // starting with 0 and incrementing by 1 each time it is called.
158+ func nextProcessorID () int64 {
159+ return int64 (processorID .Add (1 ) - 1 )
160+ }
161+
162+ // configureSelfObservability configures metrics for the batch span processor.
163+ func (bsp * batchSpanProcessor ) configureSelfObservability () {
164+ mp := otel .GetMeterProvider ()
165+ if ! x .SelfObservability .Enabled () {
166+ mp = metric .MeterProvider (noop .NewMeterProvider ())
167+ }
168+ meter := mp .Meter (
169+ selfObsScopeName ,
170+ metric .WithInstrumentationVersion (version ()),
171+ )
172+
173+ queueCapacityUpDownCounter , err := meter .Int64ObservableUpDownCounter (queueCapacityMetricName ,
174+ metric .WithUnit (spanCountUnit ),
175+ metric .WithDescription (queueCapacityMetricDescription ),
176+ )
177+ if err != nil {
178+ otel .Handle (err )
179+ }
180+ queueSizeUpDownCounter , err := meter .Int64ObservableUpDownCounter (queueSizeMetricName ,
181+ metric .WithUnit (spanCountUnit ),
182+ metric .WithDescription (queueSizeMetricDescription ),
183+ )
184+ if err != nil {
185+ otel .Handle (err )
186+ }
187+ bsp .spansProcessedCounter , err = meter .Int64Counter (spansProcessedMetricName ,
188+ metric .WithUnit (spanCountUnit ),
189+ metric .WithDescription (spansProcessedMetricDescription ),
190+ )
191+ if err != nil {
192+ otel .Handle (err )
193+ }
194+
195+ componentTypeAttr := componentTypeKey .String ("batching_span_processor" )
196+ componentNameAttr := componentNameKey .String (fmt .Sprintf ("batching_span_processor/%d" , nextProcessorID ()))
197+ bsp .successAttributes = metric .WithAttributes (componentNameAttr , componentTypeAttr , semconv .ErrorTypeKey .String ("" ))
198+ bsp .alreadyShutdownAttributes = metric .WithAttributes (componentNameAttr , componentTypeAttr , semconv .ErrorTypeKey .String ("already_shutdown" ))
199+ bsp .queueFullAttributes = metric .WithAttributes (componentNameAttr , componentTypeAttr , semconv .ErrorTypeKey .String ("queue_full" ))
200+ callabckAttributesOpt := metric .WithAttributes (componentNameAttr , componentTypeAttr )
201+ bsp .callbackRegistration , err = meter .RegisterCallback (
202+ func (ctx context.Context , o metric.Observer ) error {
203+ o .ObserveInt64 (queueSizeUpDownCounter , int64 (len (bsp .queue )), callabckAttributesOpt )
204+ o .ObserveInt64 (queueCapacityUpDownCounter , int64 (bsp .o .MaxQueueSize ), callabckAttributesOpt )
205+ return nil
206+ },
207+ queueSizeUpDownCounter , queueCapacityUpDownCounter )
208+ if err != nil {
209+ otel .Handle (err )
210+ }
211+ }
212+
127213// OnStart method does nothing.
128214func (bsp * batchSpanProcessor ) OnStart (parent context.Context , s ReadWriteSpan ) {}
129215
130216// OnEnd method enqueues a ReadOnlySpan for later processing.
131217func (bsp * batchSpanProcessor ) OnEnd (s ReadOnlySpan ) {
218+ ctx := context .Background ()
132219 // Do not enqueue spans after Shutdown.
133220 if bsp .stopped .Load () {
221+ bsp .spansProcessedCounter .Add (ctx , 1 , bsp .alreadyShutdownAttributes )
134222 return
135223 }
136224
@@ -165,7 +253,7 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
165253 err = ctx .Err ()
166254 }
167255 })
168- return err
256+ return errors . Join ( err , bsp . callbackRegistration . Unregister ())
169257}
170258
171259type forceFlushSpan struct {
@@ -276,6 +364,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
276364
277365 if l := len (bsp .batch ); l > 0 {
278366 global .Debug ("exporting spans" , "count" , len (bsp .batch ), "total_dropped" , atomic .LoadUint32 (& bsp .dropped ))
367+ bsp .spansProcessedCounter .Add (ctx , int64 (len (bsp .batch )), bsp .successAttributes )
279368 err := bsp .e .ExportSpans (ctx , bsp .batch )
280369
281370 // A new batch is always created after exporting, even if the batch failed to be exported.
@@ -384,11 +473,12 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
384473 case bsp .queue <- sd :
385474 return true
386475 case <- ctx .Done ():
476+ bsp .spansProcessedCounter .Add (ctx , 1 , bsp .queueFullAttributes )
387477 return false
388478 }
389479}
390480
391- func (bsp * batchSpanProcessor ) enqueueDrop (_ context.Context , sd ReadOnlySpan ) bool {
481+ func (bsp * batchSpanProcessor ) enqueueDrop (ctx context.Context , sd ReadOnlySpan ) bool {
392482 if ! sd .SpanContext ().IsSampled () {
393483 return false
394484 }
@@ -398,6 +488,7 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
398488 return true
399489 default :
400490 atomic .AddUint32 (& bsp .dropped , 1 )
491+ bsp .spansProcessedCounter .Add (ctx , 1 , bsp .queueFullAttributes )
401492 }
402493 return false
403494}
0 commit comments