Skip to content

Commit 1809feb

Browse files
committed
WIP
1 parent fd6f484 commit 1809feb

File tree

4 files changed

+166
-1
lines changed

4 files changed

+166
-1
lines changed

sdk/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/stretchr/testify v1.10.0
1212
go.opentelemetry.io/otel v1.34.0
1313
go.opentelemetry.io/otel/metric v1.34.0
14+
go.opentelemetry.io/otel/sdk/metric v1.34.0
1415
go.opentelemetry.io/otel/trace v1.34.0
1516
go.uber.org/goleak v1.3.0
1617
golang.org/x/sys v0.30.0

sdk/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
2121
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
2222
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
2323
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
24+
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
25+
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
2426
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
2527
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
2628
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=

sdk/trace/batch_span_processor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
127127
ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
128128
MaxQueueSize: maxQueueSize,
129129
MaxExportBatchSize: maxExportBatchSize,
130+
meterProvider: otel.GetMeterProvider(),
130131
}
131132
for _, opt := range options {
132133
opt(&o)
@@ -162,7 +163,7 @@ func nextProcessorID() int64 {
162163

163164
// configureSelfObservability configures metrics for the batch span processor.
164165
func (bsp *batchSpanProcessor) configureSelfObservability() {
165-
mp := otel.GetMeterProvider()
166+
mp := bsp.o.meterProvider
166167
if !x.SelfObservability.Enabled() {
167168
mp = metric.MeterProvider(noop.NewMeterProvider())
168169
}

sdk/trace/selfobs_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package trace
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
14+
"go.opentelemetry.io/otel/attribute"
15+
"go.opentelemetry.io/otel/sdk/instrumentation"
16+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
17+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
18+
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
19+
)
20+
21+
// func TestBatchSpanProcessorMetricsDisabled(t *testing.T) {
22+
// // TODO: test queueCapacityUpDownCounter
23+
// // TODO: test queueSizeUpDownCounter
24+
// // TODO: test spansProcessedCounter
25+
// }
26+
27+
// pausedExporter waits until shutdown to export spans.
28+
type pausedExporter struct {
29+
stop chan (struct{})
30+
}
31+
32+
func newPausedExporter(t *testing.T) pausedExporter {
33+
e := pausedExporter{stop: make(chan struct{})}
34+
return e
35+
}
36+
37+
func (e pausedExporter) Shutdown(context.Context) error {
38+
return nil
39+
}
40+
41+
func (e pausedExporter) ExportSpans(ctx context.Context, _ []ReadOnlySpan) error {
42+
<-e.stop
43+
return ctx.Err()
44+
}
45+
46+
func TestBatchSpanProcessorQueueMetrics(t *testing.T) {
47+
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
48+
tp := basicTracerProvider(t)
49+
reader := sdkmetric.NewManualReader()
50+
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
51+
pe := newPausedExporter(t)
52+
bsp := NewBatchSpanProcessor(
53+
pe,
54+
WithBlocking(),
55+
WithMaxExportBatchSize(1),
56+
withMeterProvider(meterProvider),
57+
)
58+
tp.RegisterSpanProcessor(bsp)
59+
t.Cleanup(func() {
60+
tp.UnregisterSpanProcessor(bsp)
61+
})
62+
t.Cleanup(func() {
63+
close(pe.stop)
64+
})
65+
66+
tr := tp.Tracer("TestBatchSpanProcessorQueueMetrics")
67+
generateSpan(t, tr, testOption{genNumSpans: 1})
68+
gotMetrics := new(metricdata.ResourceMetrics)
69+
reader.Collect(context.Background(), gotMetrics)
70+
require.Len(t, gotMetrics.ScopeMetrics, 1)
71+
assertScopeMetrics(t, gotMetrics.ScopeMetrics[0], expectMetrics{queueCapacity: 2048, queueSize: 1})
72+
// TODO: test queueCapacityUpDownCounter
73+
// TODO: test queueSizeUpDownCounter
74+
}
75+
76+
// func TestBatchSpanProcessorDropOnQueueFullMetrics(t *testing.T) {
77+
// // TODO: test spansProcessedCounter with queueFullAttributes (non-blocking, queue full)
78+
// // TODO: test spansProcessedCounter with queueFullAttributes (blocking, ctx expires)
79+
// // TODO: test spansProcessedCounter with successAttributes
80+
// // TODO: test spansProcessedCounter with alreadyShutdownAttributes
81+
// }
82+
83+
type expectMetrics struct {
84+
queueCapacity int64
85+
queueSize int64
86+
successProcessed int64
87+
alreadyShutdownProcessed int64
88+
queueFullProcessed int64
89+
}
90+
91+
func assertScopeMetrics(t *testing.T, sm metricdata.ScopeMetrics, expectation expectMetrics) {
92+
assert.Equal(t, instrumentation.Scope{
93+
Name: "go.opentelemetry.io/otel/sdk/trace",
94+
Version: version(),
95+
}, sm.Scope)
96+
wantProcessedDataPoints := []metricdata.DataPoint[int64]{}
97+
if expectation.successProcessed > 0 {
98+
// TODO attrs
99+
wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{Value: expectation.successProcessed})
100+
}
101+
if expectation.alreadyShutdownProcessed > 0 {
102+
// TODO attrs
103+
wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{Value: expectation.alreadyShutdownProcessed})
104+
}
105+
if expectation.queueFullProcessed > 0 {
106+
// TODO attrs
107+
wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{Value: expectation.queueFullProcessed})
108+
}
109+
110+
if len(wantProcessedDataPoints) > 0 {
111+
require.Len(t, sm.Metrics, 3)
112+
} else {
113+
require.Len(t, sm.Metrics, 2)
114+
}
115+
116+
componentTypeAttr := componentTypeKey.String("batching_span_processor")
117+
componentNameAttr := componentNameKey.String(fmt.Sprintf("batching_span_processor/%d", int64(processorID.Load()-1)))
118+
119+
baseAttrs := attribute.NewSet(
120+
componentTypeAttr,
121+
componentNameAttr,
122+
)
123+
124+
want := metricdata.Metrics{
125+
Name: queueCapacityMetricName,
126+
Description: queueCapacityMetricDescription,
127+
Unit: spanCountUnit,
128+
Data: metricdata.Sum[int64]{
129+
DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueCapacity}},
130+
Temporality: metricdata.CumulativeTemporality,
131+
IsMonotonic: false,
132+
},
133+
}
134+
metricdatatest.AssertEqual(t, want, sm.Metrics[0], metricdatatest.IgnoreTimestamp())
135+
136+
want = metricdata.Metrics{
137+
Name: queueSizeMetricName,
138+
Description: queueSizeMetricDescription,
139+
Unit: spanCountUnit,
140+
Data: metricdata.Sum[int64]{
141+
DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueSize}},
142+
Temporality: metricdata.CumulativeTemporality,
143+
IsMonotonic: false,
144+
},
145+
}
146+
metricdatatest.AssertEqual(t, want, sm.Metrics[1], metricdatatest.IgnoreTimestamp())
147+
148+
if len(wantProcessedDataPoints) > 0 {
149+
want = metricdata.Metrics{
150+
Name: spansProcessedMetricName,
151+
Description: spansProcessedMetricDescription,
152+
Unit: spanCountUnit,
153+
Data: metricdata.Sum[int64]{
154+
DataPoints: wantProcessedDataPoints,
155+
Temporality: metricdata.CumulativeTemporality,
156+
IsMonotonic: true,
157+
},
158+
}
159+
metricdatatest.AssertEqual(t, want, sm.Metrics[2], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
160+
}
161+
}

0 commit comments

Comments
 (0)