Skip to content

Commit 1ccad8b

Browse files
committed
WIP
1 parent 93e2253 commit 1ccad8b

File tree

5 files changed

+293
-51
lines changed

5 files changed

+293
-51
lines changed

CHANGELOG.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
4242
- Add `ErrorType` attribute helper function to the `go.opentelmetry.io/otel/semconv/v1.34.0` package. (#6962)
4343
- Add `WithAllowKeyDuplication` in `go.opentelemetry.io/otel/sdk/log` which can be used to disable deduplication for log records. (#6968)
4444
- Add `Clone` method to `Record` in `go.opentelemetry.io/otel/log` that returns a copy of the record with no shared state. (#7001)
45+
- Add `OTEL_GO_X_SELF_OBSERVABILITY` environment variable to control whether self-observability metrics and traces are produced by SDKs. (#6393)
46+
- Add experimental `otel.sdk.processor.span.queue.size`, `otel.sdk.processor.span.queue.capacity`, and `otel.sdk.processor.span.processed.count` metrics to the trace batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#6393)
4547

4648
### Changed
4749

@@ -163,11 +165,6 @@ The next release will require at least [Go 1.23].
163165

164166
### Added
165167

166-
- Add `OTEL_GO_X_SELF_OBSERVABILITY` environment variable to control whether self-observability metrics and traces are produced by SDKs. (#TODO)
167-
- Add experimental `otel.sdk.processor.span.queue.size`, `otel.sdk.processor.span.queue.capacity`, and `otel.sdk.processor.span.processed.count` metrics to the trace batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#TODO)
168-
169-
### Fixed
170-
171168
- Add `ValueFromAttribute` and `KeyValueFromAttribute` in `go.opentelemetry.io/otel/log`. (#6180)
172169
- Add `EventName` and `SetEventName` to `Record` in `go.opentelemetry.io/otel/log`. (#6187)
173170
- Add `EventName` to `RecordFactory` in `go.opentelemetry.io/otel/log/logtest`. (#6187)

sdk/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/stretchr/testify v1.10.0
1212
go.opentelemetry.io/otel v1.37.0
1313
go.opentelemetry.io/otel/metric v1.37.0
14+
go.opentelemetry.io/otel/sdk/metric v1.34.0
1415
go.opentelemetry.io/otel/trace v1.37.0
1516
go.uber.org/goleak v1.3.0
1617
golang.org/x/sys v0.34.0

sdk/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
2121
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
2222
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
2323
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
24+
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
25+
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
2426
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
2527
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
2628
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=

sdk/trace/batch_span_processor.go

Lines changed: 40 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ import (
1818
"go.opentelemetry.io/otel/metric/noop"
1919
"go.opentelemetry.io/otel/sdk/internal/env"
2020
"go.opentelemetry.io/otel/sdk/internal/x"
21-
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
21+
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
22+
"go.opentelemetry.io/otel/semconv/v1.34.0/otelconv"
2223
"go.opentelemetry.io/otel/trace"
2324
)
2425

@@ -30,19 +31,11 @@ const (
3031
// DefaultExportTimeout is the duration after which an export is cancelled, in milliseconds.
3132
DefaultExportTimeout = 30000
3233
DefaultMaxExportBatchSize = 512
33-
34-
queueSizeMetricName = "otel.sdk.processor.span.queue.size"
35-
queueSizeMetricDescription = "The number of spans in the queue of a given instance of an SDK span processor"
36-
queueCapacityMetricName = "otel.sdk.processor.span.queue.capacity"
37-
queueCapacityMetricDescription = "The maximum number of spans the queue of a given instance of an SDK span processor can hold"
38-
spansProcessedMetricName = "otel.sdk.processor.span.processed.count"
39-
spansProcessedMetricDescription = "The number of spans for which the processing has finished, either successful or failed"
40-
spanCountUnit = "{span}"
4134
)
4235

4336
var (
44-
componentTypeKey = attribute.Key("otel.component.type")
45-
componentNameKey = attribute.Key("otel.component.name")
37+
noError = otelconv.ErrorTypeAttr("")
38+
queueFull = otelconv.ErrorTypeAttr("queue_full")
4639
)
4740

4841
// BatchSpanProcessorOption configures a BatchSpanProcessor.
@@ -91,11 +84,9 @@ type batchSpanProcessor struct {
9184
queue chan ReadOnlySpan
9285
dropped uint32
9386

94-
callbackRegistration metric.Registration
95-
spansProcessedCounter metric.Int64Counter
96-
successAttributes metric.MeasurementOption
97-
alreadyShutdownAttributes metric.MeasurementOption
98-
queueFullAttributes metric.MeasurementOption
87+
callbackRegistration metric.Registration
88+
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
89+
componentNameAttr attribute.KeyValue
9990

10091
batch []ReadOnlySpan
10192
batchMutex sync.Mutex
@@ -129,17 +120,19 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
129120
ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
130121
MaxQueueSize: maxQueueSize,
131122
MaxExportBatchSize: maxExportBatchSize,
123+
meterProvider: otel.GetMeterProvider(),
132124
}
133125
for _, opt := range options {
134126
opt(&o)
135127
}
136128
bsp := &batchSpanProcessor{
137-
e: exporter,
138-
o: o,
139-
batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize),
140-
timer: time.NewTimer(o.BatchTimeout),
141-
queue: make(chan ReadOnlySpan, o.MaxQueueSize),
142-
stopCh: make(chan struct{}),
129+
e: exporter,
130+
o: o,
131+
batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize),
132+
timer: time.NewTimer(o.BatchTimeout),
133+
queue: make(chan ReadOnlySpan, o.MaxQueueSize),
134+
stopCh: make(chan struct{}),
135+
componentNameAttr: semconv.OTelComponentName(fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, nextProcessorID())),
143136
}
144137

145138
bsp.configureSelfObservability()
@@ -154,17 +147,17 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
154147
return bsp
155148
}
156149

157-
var processorID atomic.Uint64
150+
var processorID atomic.Int64
158151

159152
// nextProcessorID returns an identifier for this batch span processor,
160153
// starting with 0 and incrementing by 1 each time it is called.
161154
func nextProcessorID() int64 {
162-
return int64(processorID.Add(1) - 1)
155+
return processorID.Add(1) - 1
163156
}
164157

165158
// configureSelfObservability configures metrics for the batch span processor.
166159
func (bsp *batchSpanProcessor) configureSelfObservability() {
167-
mp := otel.GetMeterProvider()
160+
mp := bsp.o.meterProvider
168161
if !x.SelfObservability.Enabled() {
169162
mp = metric.MeterProvider(noop.NewMeterProvider())
170163
}
@@ -173,34 +166,28 @@ func (bsp *batchSpanProcessor) configureSelfObservability() {
173166
metric.WithInstrumentationVersion(version()),
174167
)
175168

176-
queueCapacityUpDownCounter, err := meter.Int64ObservableUpDownCounter(queueCapacityMetricName,
177-
metric.WithUnit(spanCountUnit),
178-
metric.WithDescription(queueCapacityMetricDescription),
169+
// TODO(#7029): switch to otelconv.NewSDKProcessorSpanQueueCapacity after asynchronous instruments are supported.
170+
queueCapacityUpDownCounter, err := meter.Int64ObservableUpDownCounter(otelconv.SDKProcessorSpanQueueCapacity{}.Name(),
171+
metric.WithUnit(otelconv.SDKProcessorSpanQueueCapacity{}.Unit()),
172+
metric.WithDescription(otelconv.SDKProcessorSpanQueueCapacity{}.Description()),
179173
)
180174
if err != nil {
181175
otel.Handle(err)
182176
}
183-
queueSizeUpDownCounter, err := meter.Int64ObservableUpDownCounter(queueSizeMetricName,
184-
metric.WithUnit(spanCountUnit),
185-
metric.WithDescription(queueSizeMetricDescription),
177+
// TODO(#7029): switch to otelconv.NewSDKProcessorSpanQueueSize after asynchronous instruments are supported.
178+
queueSizeUpDownCounter, err := meter.Int64ObservableUpDownCounter(otelconv.SDKProcessorSpanQueueSize{}.Name(),
179+
metric.WithUnit(otelconv.SDKProcessorSpanQueueSize{}.Unit()),
180+
metric.WithDescription(otelconv.SDKProcessorSpanQueueSize{}.Description()),
186181
)
187182
if err != nil {
188183
otel.Handle(err)
189184
}
190-
bsp.spansProcessedCounter, err = meter.Int64Counter(spansProcessedMetricName,
191-
metric.WithUnit(spanCountUnit),
192-
metric.WithDescription(spansProcessedMetricDescription),
193-
)
185+
bsp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter)
194186
if err != nil {
195187
otel.Handle(err)
196188
}
197189

198-
componentTypeAttr := componentTypeKey.String("batching_span_processor")
199-
componentNameAttr := componentNameKey.String(fmt.Sprintf("batching_span_processor/%d", nextProcessorID()))
200-
bsp.successAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, semconv.ErrorTypeKey.String(""))
201-
bsp.alreadyShutdownAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, semconv.ErrorTypeKey.String("already_shutdown"))
202-
bsp.queueFullAttributes = metric.WithAttributes(componentNameAttr, componentTypeAttr, semconv.ErrorTypeKey.String("queue_full"))
203-
callabckAttributesOpt := metric.WithAttributes(componentNameAttr, componentTypeAttr)
190+
callabckAttributesOpt := metric.WithAttributes(bsp.componentNameAttr, semconv.OTelComponentTypeBatchingSpanProcessor)
204191
bsp.callbackRegistration, err = meter.RegisterCallback(
205192
func(ctx context.Context, o metric.Observer) error {
206193
o.ObserveInt64(queueSizeUpDownCounter, int64(len(bsp.queue)), callabckAttributesOpt)
@@ -218,10 +205,8 @@ func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan)
218205

219206
// OnEnd method enqueues a ReadOnlySpan for later processing.
220207
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
221-
ctx := context.Background()
222208
// Do not enqueue spans after Shutdown.
223209
if bsp.stopped.Load() {
224-
bsp.spansProcessedCounter.Add(ctx, 1, bsp.alreadyShutdownAttributes)
225210
return
226211
}
227212

@@ -377,7 +362,10 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
377362

378363
if l := len(bsp.batch); l > 0 {
379364
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
380-
bsp.spansProcessedCounter.Add(ctx, int64(len(bsp.batch)), bsp.successAttributes)
365+
bsp.spansProcessedCounter.Add(ctx, int64(l),
366+
bsp.componentNameAttr,
367+
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
368+
bsp.spansProcessedCounter.AttrErrorType(noError))
381369
err := bsp.e.ExportSpans(ctx, bsp.batch)
382370

383371
// A new batch is always created after exporting, even if the batch failed to be exported.
@@ -486,7 +474,10 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
486474
case bsp.queue <- sd:
487475
return true
488476
case <-ctx.Done():
489-
bsp.spansProcessedCounter.Add(ctx, 1, bsp.queueFullAttributes)
477+
bsp.spansProcessedCounter.Add(ctx, 1,
478+
bsp.componentNameAttr,
479+
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
480+
bsp.spansProcessedCounter.AttrErrorType(queueFull))
490481
return false
491482
}
492483
}
@@ -501,7 +492,10 @@ func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan)
501492
return true
502493
default:
503494
atomic.AddUint32(&bsp.dropped, 1)
504-
bsp.spansProcessedCounter.Add(ctx, 1, bsp.queueFullAttributes)
495+
bsp.spansProcessedCounter.Add(ctx, 1,
496+
bsp.componentNameAttr,
497+
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
498+
bsp.spansProcessedCounter.AttrErrorType(queueFull))
505499
}
506500
return false
507501
}

0 commit comments

Comments
 (0)