Skip to content

Commit 10f11a9

Browse files
committed
WIP
1 parent 93e2253 commit 10f11a9

File tree

5 files changed

+299
-62
lines changed

5 files changed

+299
-62
lines changed

CHANGELOG.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
4242
- Add `ErrorType` attribute helper function to the `go.opentelmetry.io/otel/semconv/v1.34.0` package. (#6962)
4343
- Add `WithAllowKeyDuplication` in `go.opentelemetry.io/otel/sdk/log` which can be used to disable deduplication for log records. (#6968)
4444
- Add `Clone` method to `Record` in `go.opentelemetry.io/otel/log` that returns a copy of the record with no shared state. (#7001)
45+
- Add `OTEL_GO_X_SELF_OBSERVABILITY` environment variable to control whether self-observability metrics and traces are produced by SDKs. (#6393)
46+
- Add experimental `otel.sdk.processor.span.queue.size`, `otel.sdk.processor.span.queue.capacity`, and `otel.sdk.processor.span.processed.count` metrics to the trace batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#6393)
4547

4648
### Changed
4749

@@ -163,11 +165,6 @@ The next release will require at least [Go 1.23].
163165

164166
### Added
165167

166-
- Add `OTEL_GO_X_SELF_OBSERVABILITY` environment variable to control whether self-observability metrics and traces are produced by SDKs. (#TODO)
167-
- Add experimental `otel.sdk.processor.span.queue.size`, `otel.sdk.processor.span.queue.capacity`, and `otel.sdk.processor.span.processed.count` metrics to the trace batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#TODO)
168-
169-
### Fixed
170-
171168
- Add `ValueFromAttribute` and `KeyValueFromAttribute` in `go.opentelemetry.io/otel/log`. (#6180)
172169
- Add `EventName` and `SetEventName` to `Record` in `go.opentelemetry.io/otel/log`. (#6187)
173170
- Add `EventName` to `RecordFactory` in `go.opentelemetry.io/otel/log/logtest`. (#6187)

sdk/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/stretchr/testify v1.10.0
1212
go.opentelemetry.io/otel v1.37.0
1313
go.opentelemetry.io/otel/metric v1.37.0
14+
go.opentelemetry.io/otel/sdk/metric v1.34.0
1415
go.opentelemetry.io/otel/trace v1.37.0
1516
go.uber.org/goleak v1.3.0
1617
golang.org/x/sys v0.34.0

sdk/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
2121
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
2222
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
2323
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
24+
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
25+
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
2426
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
2527
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
2628
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=

sdk/trace/batch_span_processor.go

Lines changed: 46 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ import (
1515
"go.opentelemetry.io/otel/attribute"
1616
"go.opentelemetry.io/otel/internal/global"
1717
"go.opentelemetry.io/otel/metric"
18-
"go.opentelemetry.io/otel/metric/noop"
1918
"go.opentelemetry.io/otel/sdk/internal/env"
2019
"go.opentelemetry.io/otel/sdk/internal/x"
21-
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
20+
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
21+
"go.opentelemetry.io/otel/semconv/v1.34.0/otelconv"
2222
"go.opentelemetry.io/otel/trace"
2323
)
2424

@@ -30,19 +30,11 @@ const (
3030
// DefaultExportTimeout is the duration after which an export is cancelled, in milliseconds.
3131
DefaultExportTimeout = 30000
3232
DefaultMaxExportBatchSize = 512
33-
34-
queueSizeMetricName = "otel.sdk.processor.span.queue.size"
35-
queueSizeMetricDescription = "The number of spans in the queue of a given instance of an SDK span processor"
36-
queueCapacityMetricName = "otel.sdk.processor.span.queue.capacity"
37-
queueCapacityMetricDescription = "The maximum number of spans the queue of a given instance of an SDK span processor can hold"
38-
spansProcessedMetricName = "otel.sdk.processor.span.processed.count"
39-
spansProcessedMetricDescription = "The number of spans for which the processing has finished, either successful or failed"
40-
spanCountUnit = "{span}"
4133
)
4234

4335
var (
44-
componentTypeKey = attribute.Key("otel.component.type")
45-
componentNameKey = attribute.Key("otel.component.name")
36+
noError = otelconv.ErrorTypeAttr("")
37+
queueFull = otelconv.ErrorTypeAttr("queue_full")
4638
)
4739

4840
// BatchSpanProcessorOption configures a BatchSpanProcessor.
@@ -77,9 +69,6 @@ type BatchSpanProcessorOptions struct {
7769
// Blocking option should be used carefully as it can severely affect the performance of an
7870
// application.
7971
BlockOnQueueFull bool
80-
81-
// meterProvider is the meterProvider used to record self-observability metrics.
82-
meterProvider metric.MeterProvider
8372
}
8473

8574
// batchSpanProcessor is a SpanProcessor that batches asynchronously-received
@@ -91,11 +80,10 @@ type batchSpanProcessor struct {
9180
queue chan ReadOnlySpan
9281
dropped uint32
9382

94-
callbackRegistration metric.Registration
95-
spansProcessedCounter metric.Int64Counter
96-
successAttributes metric.MeasurementOption
97-
alreadyShutdownAttributes metric.MeasurementOption
98-
queueFullAttributes metric.MeasurementOption
83+
selfObservabilityEnabled bool
84+
callbackRegistration metric.Registration
85+
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
86+
componentNameAttr attribute.KeyValue
9987

10088
batch []ReadOnlySpan
10189
batchMutex sync.Mutex
@@ -154,53 +142,48 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
154142
return bsp
155143
}
156144

157-
var processorID atomic.Uint64
145+
var processorID atomic.Int64
158146

159147
// nextProcessorID returns an identifier for this batch span processor,
160148
// starting with 0 and incrementing by 1 each time it is called.
161149
func nextProcessorID() int64 {
162-
return int64(processorID.Add(1) - 1)
150+
return processorID.Add(1) - 1
163151
}
164152

165153
// configureSelfObservability configures metrics for the batch span processor.
166154
func (bsp *batchSpanProcessor) configureSelfObservability() {
167-
mp := otel.GetMeterProvider()
168155
if !x.SelfObservability.Enabled() {
169-
mp = metric.MeterProvider(noop.NewMeterProvider())
156+
return
170157
}
171-
meter := mp.Meter(
158+
bsp.selfObservabilityEnabled = true
159+
bsp.componentNameAttr = semconv.OTelComponentName(fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, nextProcessorID()))
160+
meter := otel.GetMeterProvider().Meter(
172161
selfObsScopeName,
173162
metric.WithInstrumentationVersion(version()),
174163
)
175164

176-
queueCapacityUpDownCounter, err := meter.Int64ObservableUpDownCounter(queueCapacityMetricName,
177-
metric.WithUnit(spanCountUnit),
178-
metric.WithDescription(queueCapacityMetricDescription),
165+
// TODO(#7029): switch to otelconv.NewSDKProcessorSpanQueueCapacity after asynchronous instruments are supported.
166+
queueCapacityUpDownCounter, err := meter.Int64ObservableUpDownCounter(otelconv.SDKProcessorSpanQueueCapacity{}.Name(),
167+
metric.WithUnit(otelconv.SDKProcessorSpanQueueCapacity{}.Unit()),
168+
metric.WithDescription(otelconv.SDKProcessorSpanQueueCapacity{}.Description()),
179169
)
180170
if err != nil {
181171
otel.Handle(err)
182172
}
183-
queueSizeUpDownCounter, err := meter.Int64ObservableUpDownCounter(queueSizeMetricName,
184-
metric.WithUnit(spanCountUnit),
185-
metric.WithDescription(queueSizeMetricDescription),
173+
// TODO(#7029): switch to otelconv.NewSDKProcessorSpanQueueSize after asynchronous instruments are supported.
174+
queueSizeUpDownCounter, err := meter.Int64ObservableUpDownCounter(otelconv.SDKProcessorSpanQueueSize{}.Name(),
175+
metric.WithUnit(otelconv.SDKProcessorSpanQueueSize{}.Unit()),
176+
metric.WithDescription(otelconv.SDKProcessorSpanQueueSize{}.Description()),
186177
)
187178
if err != nil {
188179
otel.Handle(err)
189180
}
190-
bsp.spansProcessedCounter, err = meter.Int64Counter(spansProcessedMetricName,
191-
metric.WithUnit(spanCountUnit),
192-
metric.WithDescription(spansProcessedMetricDescription),
193-
)
181+
bsp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter)
194182
if err != nil {
195183
otel.Handle(err)
196184
}
197185

198-
componentTypeAttr := componentTypeKey.String("batching_span_processor")
199-
componentNameAttr := componentNameKey.String(fmt.Sprintf("batching_span_processor/%d", nextProcessorID()))
200-
bsp.successAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, semconv.ErrorTypeKey.String(""))
201-
bsp.alreadyShutdownAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, semconv.ErrorTypeKey.String("already_shutdown"))
202-
bsp.queueFullAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, semconv.ErrorTypeKey.String("queue_full"))
203-
callabckAttributesOpt := metric.WithAttributes(componentNameAttr, componentTypeAttr)
186+
callabckAttributesOpt := metric.WithAttributes(bsp.componentNameAttr, semconv.OTelComponentTypeBatchingSpanProcessor)
204187
bsp.callbackRegistration, err = meter.RegisterCallback(
205188
func(ctx context.Context, o metric.Observer) error {
206189
o.ObserveInt64(queueSizeUpDownCounter, int64(len(bsp.queue)), callabckAttributesOpt)
@@ -218,10 +201,8 @@ func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan)
218201

219202
// OnEnd method enqueues a ReadOnlySpan for later processing.
220203
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
221-
ctx := context.Background()
222204
// Do not enqueue spans after Shutdown.
223205
if bsp.stopped.Load() {
224-
bsp.spansProcessedCounter.Add(ctx, 1, bsp.alreadyShutdownAttributes)
225206
return
226207
}
227208

@@ -256,7 +237,10 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
256237
err = ctx.Err()
257238
}
258239
})
259-
return errors.Join(err, bsp.callbackRegistration.Unregister())
240+
if bsp.selfObservabilityEnabled {
241+
err = errors.Join(err, bsp.callbackRegistration.Unregister())
242+
}
243+
return err
260244
}
261245

262246
type forceFlushSpan struct {
@@ -352,16 +336,6 @@ func WithBlocking() BatchSpanProcessorOption {
352336
}
353337
}
354338

355-
// withMeterProvider allows configuring the meterProvider used for recording
356-
// self-observability metrics during testing.
357-
func withMeterProvider(provider metric.MeterProvider) BatchSpanProcessorOption {
358-
return func(o *BatchSpanProcessorOptions) {
359-
if provider != nil {
360-
o.meterProvider = provider
361-
}
362-
}
363-
}
364-
365339
// exportSpans is a subroutine of processing and draining the queue.
366340
func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
367341
bsp.timer.Reset(bsp.o.BatchTimeout)
@@ -377,7 +351,12 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
377351

378352
if l := len(bsp.batch); l > 0 {
379353
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
380-
bsp.spansProcessedCounter.Add(ctx, int64(len(bsp.batch)), bsp.successAttributes)
354+
if bsp.selfObservabilityEnabled {
355+
bsp.spansProcessedCounter.Add(ctx, int64(l),
356+
bsp.componentNameAttr,
357+
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
358+
bsp.spansProcessedCounter.AttrErrorType(noError))
359+
}
381360
err := bsp.e.ExportSpans(ctx, bsp.batch)
382361

383362
// A new batch is always created after exporting, even if the batch failed to be exported.
@@ -486,7 +465,12 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
486465
case bsp.queue <- sd:
487466
return true
488467
case <-ctx.Done():
489-
bsp.spansProcessedCounter.Add(ctx, 1, bsp.queueFullAttributes)
468+
if bsp.selfObservabilityEnabled {
469+
bsp.spansProcessedCounter.Add(ctx, 1,
470+
bsp.componentNameAttr,
471+
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
472+
bsp.spansProcessedCounter.AttrErrorType(queueFull))
473+
}
490474
return false
491475
}
492476
}
@@ -501,7 +485,12 @@ func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan)
501485
return true
502486
default:
503487
atomic.AddUint32(&bsp.dropped, 1)
504-
bsp.spansProcessedCounter.Add(ctx, 1, bsp.queueFullAttributes)
488+
if bsp.selfObservabilityEnabled {
489+
bsp.spansProcessedCounter.Add(ctx, 1,
490+
bsp.componentNameAttr,
491+
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
492+
bsp.spansProcessedCounter.AttrErrorType(queueFull))
493+
}
505494
}
506495
return false
507496
}

0 commit comments

Comments
 (0)