@@ -5,13 +5,20 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
55
66import (
77 "context"
8+ "errors"
9+ "fmt"
810 "sync"
911 "sync/atomic"
1012 "time"
1113
1214 "go.opentelemetry.io/otel"
15+ "go.opentelemetry.io/otel/attribute"
1316 "go.opentelemetry.io/otel/internal/global"
17+ "go.opentelemetry.io/otel/metric"
18+ "go.opentelemetry.io/otel/metric/noop"
1419 "go.opentelemetry.io/otel/sdk/internal/env"
20+ "go.opentelemetry.io/otel/sdk/internal/x"
21+ semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
1522 "go.opentelemetry.io/otel/trace"
1623)
1724
@@ -21,6 +28,19 @@ const (
2128 DefaultScheduleDelay = 5000
2229 DefaultExportTimeout = 30000
2330 DefaultMaxExportBatchSize = 512
31+
32+ queueSizeMetricName = "otel.sdk.processor.span.queue.size"
33+ queueSizeMetricDescription = "The number of spans in the queue of a given instance of an SDK span processor"
34+ queueCapacityMetricName = "otel.sdk.processor.span.queue.capacity"
35+ queueCapacityMetricDescription = "The maximum number of spans the queue of a given instance of an SDK span processor can hold"
36+ spansProcessedMetricName = "otel.sdk.processor.span.processed.count"
37+ spansProcessedMetricDescription = "The number of spans for which the processing has finished, either successful or failed"
38+ spanCountUnit = "{span}"
39+ )
40+
41+ var (
42+ componentTypeKey = attribute .Key ("otel.component.type" )
43+ componentNameKey = attribute .Key ("otel.component.name" )
2444)
2545
2646// BatchSpanProcessorOption configures a BatchSpanProcessor.
@@ -66,6 +86,12 @@ type batchSpanProcessor struct {
6686 queue chan ReadOnlySpan
6787 dropped uint32
6888
89+ callbackRegistration metric.Registration
90+ spansProcessedCounter metric.Int64Counter
91+ successAttributes metric.MeasurementOption
92+ alreadyShutdownAttributes metric.MeasurementOption
93+ queueFullAttributes metric.MeasurementOption
94+
6995 batch []ReadOnlySpan
7096 batchMutex sync.Mutex
7197 timer * time.Timer
@@ -111,6 +137,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
111137 stopCh : make (chan struct {}),
112138 }
113139
140+ bsp .configureSelfObservability ()
141+
114142 bsp .stopWait .Add (1 )
115143 go func () {
116144 defer bsp .stopWait .Done ()
@@ -121,13 +149,74 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
121149 return bsp
122150}
123151
152+ var processorID atomic.Uint64
153+
154+ // nextProcessorID returns an identifier for this batch span processor,
155+ // starting with 0 and incrementing by 1 each time it is called.
156+ func nextProcessorID () int64 {
157+ return int64 (processorID .Add (1 ) - 1 )
158+ }
159+
160+ // configureSelfObservability configures metrics for the batch span processor.
161+ func (bsp * batchSpanProcessor ) configureSelfObservability () {
162+ mp := otel .GetMeterProvider ()
163+ if ! x .SelfObservability .Enabled () {
164+ mp = metric .MeterProvider (noop .NewMeterProvider ())
165+ }
166+ meter := mp .Meter (
167+ selfObsScopeName ,
168+ metric .WithInstrumentationVersion (version ()),
169+ )
170+
171+ queueCapacityUpDownCounter , err := meter .Int64ObservableUpDownCounter (queueCapacityMetricName ,
172+ metric .WithUnit (spanCountUnit ),
173+ metric .WithDescription (queueCapacityMetricDescription ),
174+ )
175+ if err != nil {
176+ otel .Handle (err )
177+ }
178+ queueSizeUpDownCounter , err := meter .Int64ObservableUpDownCounter (queueSizeMetricName ,
179+ metric .WithUnit (spanCountUnit ),
180+ metric .WithDescription (queueSizeMetricDescription ),
181+ )
182+ if err != nil {
183+ otel .Handle (err )
184+ }
185+ bsp .spansProcessedCounter , err = meter .Int64Counter (spansProcessedMetricName ,
186+ metric .WithUnit (spanCountUnit ),
187+ metric .WithDescription (spansProcessedMetricDescription ),
188+ )
189+ if err != nil {
190+ otel .Handle (err )
191+ }
192+
193+ componentTypeAttr := componentTypeKey .String ("batching_span_processor" )
194+ componentNameAttr := componentNameKey .String (fmt .Sprintf ("batching_span_processor/%d" , nextProcessorID ()))
195+ bsp .successAttributes = metric .WithAttributes (componentNameAttr , componentTypeAttr , semconv .ErrorTypeKey .String ("" ))
196+ bsp .alreadyShutdownAttributes = metric .WithAttributes (componentNameAttr , componentTypeAttr , semconv .ErrorTypeKey .String ("already_shutdown" ))
197+ bsp .queueFullAttributes = metric .WithAttributes (componentNameAttr , componentTypeAttr , semconv .ErrorTypeKey .String ("queue_full" ))
198+ callabckAttributesOpt := metric .WithAttributes (componentNameAttr , componentTypeAttr )
199+ bsp .callbackRegistration , err = meter .RegisterCallback (
200+ func (ctx context.Context , o metric.Observer ) error {
201+ o .ObserveInt64 (queueSizeUpDownCounter , int64 (len (bsp .queue )), callabckAttributesOpt )
202+ o .ObserveInt64 (queueCapacityUpDownCounter , int64 (bsp .o .MaxQueueSize ), callabckAttributesOpt )
203+ return nil
204+ },
205+ queueSizeUpDownCounter , queueCapacityUpDownCounter )
206+ if err != nil {
207+ otel .Handle (err )
208+ }
209+ }
210+
124211// OnStart method does nothing.
125212func (bsp * batchSpanProcessor ) OnStart (parent context.Context , s ReadWriteSpan ) {}
126213
127214// OnEnd method enqueues a ReadOnlySpan for later processing.
128215func (bsp * batchSpanProcessor ) OnEnd (s ReadOnlySpan ) {
216+ ctx := context .Background ()
129217 // Do not enqueue spans after Shutdown.
130218 if bsp .stopped .Load () {
219+ bsp .spansProcessedCounter .Add (ctx , 1 , bsp .alreadyShutdownAttributes )
131220 return
132221 }
133222
@@ -162,7 +251,7 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
162251 err = ctx .Err ()
163252 }
164253 })
165- return err
254+ return errors . Join ( err , bsp . callbackRegistration . Unregister ())
166255}
167256
168257type forceFlushSpan struct {
@@ -273,6 +362,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
273362
274363 if l := len (bsp .batch ); l > 0 {
275364 global .Debug ("exporting spans" , "count" , len (bsp .batch ), "total_dropped" , atomic .LoadUint32 (& bsp .dropped ))
365+ bsp .spansProcessedCounter .Add (ctx , int64 (len (bsp .batch )), bsp .successAttributes )
276366 err := bsp .e .ExportSpans (ctx , bsp .batch )
277367
278368 // A new batch is always created after exporting, even if the batch failed to be exported.
@@ -381,11 +471,12 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
381471 case bsp .queue <- sd :
382472 return true
383473 case <- ctx .Done ():
474+ bsp .spansProcessedCounter .Add (ctx , 1 , bsp .queueFullAttributes )
384475 return false
385476 }
386477}
387478
388- func (bsp * batchSpanProcessor ) enqueueDrop (_ context.Context , sd ReadOnlySpan ) bool {
479+ func (bsp * batchSpanProcessor ) enqueueDrop (ctx context.Context , sd ReadOnlySpan ) bool {
389480 if ! sd .SpanContext ().IsSampled () {
390481 return false
391482 }
@@ -395,6 +486,7 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
395486 return true
396487 default :
397488 atomic .AddUint32 (& bsp .dropped , 1 )
489+ bsp .spansProcessedCounter .Add (ctx , 1 , bsp .queueFullAttributes )
398490 }
399491 return false
400492}
0 commit comments