@@ -124,7 +124,20 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
124
124
stopCh : make (chan struct {}),
125
125
}
126
126
127
- bsp .configureSelfObservability ()
127
+ if x .SelfObservability .Enabled () {
128
+ bsp .selfObservabilityEnabled = true
129
+ bsp .componentNameAttr = componentName ()
130
+
131
+ var err error
132
+ bsp .spansProcessedCounter , bsp .callbackRegistration , err = newBSPObs (
133
+ bsp .componentNameAttr ,
134
+ func () int64 { return int64 (len (bsp .queue )) },
135
+ int64 (bsp .o .MaxQueueSize ),
136
+ )
137
+ if err != nil {
138
+ otel .Handle (err )
139
+ }
140
+ }
128
141
129
142
bsp .stopWait .Add (1 )
130
143
go func () {
@@ -144,45 +157,49 @@ func nextProcessorID() int64 {
144
157
return processorIDCounter .Add (1 ) - 1
145
158
}
146
159
147
- // configureSelfObservability configures metrics for the batch span processor.
148
- func (bsp * batchSpanProcessor ) configureSelfObservability () {
149
- if ! x .SelfObservability .Enabled () {
150
- return
151
- }
152
- bsp .selfObservabilityEnabled = true
153
- bsp .componentNameAttr = semconv .OTelComponentName (
154
- fmt .Sprintf ("%s/%d" , otelconv .ComponentTypeBatchingSpanProcessor , nextProcessorID ()))
160
+ func componentName () attribute.KeyValue {
161
+ id := nextProcessorID ()
162
+ name := fmt .Sprintf ("%s/%d" , otelconv .ComponentTypeBatchingSpanProcessor , id )
163
+ return semconv .OTelComponentName (name )
164
+ }
165
+
166
+ // newBSPObs creates and returns a new set of metrics instruments and a
167
+ // registration for a BatchSpanProcessor. It is the caller's responsibility
168
+ // to unregister the registration when it is no longer needed.
169
+ func newBSPObs (
170
+ cmpnt attribute.KeyValue ,
171
+ qLen func () int64 ,
172
+ qMax int64 ,
173
+ ) (otelconv.SDKProcessorSpanProcessed , metric.Registration , error ) {
155
174
meter := otel .GetMeterProvider ().Meter (
156
175
selfObsScopeName ,
157
176
metric .WithInstrumentationVersion (sdk .Version ()),
158
177
metric .WithSchemaURL (semconv .SchemaURL ),
159
178
)
160
179
161
- queueCapacityUpDownCounter , err := otelconv .NewSDKProcessorSpanQueueCapacity (meter )
162
- if err != nil {
163
- otel .Handle (err )
164
- }
165
- queueSizeUpDownCounter , err := otelconv .NewSDKProcessorSpanQueueSize (meter )
166
- if err != nil {
167
- otel .Handle (err )
168
- }
169
- bsp .spansProcessedCounter , err = otelconv .NewSDKProcessorSpanProcessed (meter )
170
- if err != nil {
171
- otel .Handle (err )
172
- }
180
+ qCap , err := otelconv .NewSDKProcessorSpanQueueCapacity (meter )
181
+
182
+ qSize , e := otelconv .NewSDKProcessorSpanQueueSize (meter )
183
+ err = errors .Join (err , e )
184
+
185
+ spansProcessed , e := otelconv .NewSDKProcessorSpanProcessed (meter )
186
+ err = errors .Join (err , e )
173
187
174
- callbackAttributesOpt := metric .WithAttributes (bsp .componentNameAttr ,
175
- semconv .OTelComponentTypeBatchingSpanProcessor )
176
- bsp .callbackRegistration , err = meter .RegisterCallback (
188
+ cmpntT := semconv .OTelComponentTypeBatchingSpanProcessor
189
+ attrs := metric .WithAttributes (cmpnt , cmpntT )
190
+
191
+ reg , e := meter .RegisterCallback (
177
192
func (_ context.Context , o metric.Observer ) error {
178
- o .ObserveInt64 (queueSizeUpDownCounter .Inst (), int64 ( len ( bsp . queue )), callbackAttributesOpt )
179
- o .ObserveInt64 (queueCapacityUpDownCounter .Inst (), int64 ( bsp . o . MaxQueueSize ), callbackAttributesOpt )
193
+ o .ObserveInt64 (qSize .Inst (), qLen (), attrs )
194
+ o .ObserveInt64 (qCap .Inst (), qMax , attrs )
180
195
return nil
181
196
},
182
- queueSizeUpDownCounter .Inst (), queueCapacityUpDownCounter .Inst ())
183
- if err != nil {
184
- otel .Handle (err )
185
- }
197
+ qSize .Inst (),
198
+ qCap .Inst (),
199
+ )
200
+ err = errors .Join (err , e )
201
+
202
+ return spansProcessed , reg , err
186
203
}
187
204
188
205
// OnStart method does nothing.
0 commit comments