Skip to content

Commit 22cfbce

Browse files
authored
Add concurrent safe tests for metric aggregations (#7379)
Prerequisite for #7189. Add tests to try to catch race conditions for concurrent measurements.
1 parent fc89784 commit 22cfbce

File tree

5 files changed

+315
-0
lines changed

5 files changed

+315
-0
lines changed

sdk/metric/internal/aggregate/aggregate_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
66
import (
77
"context"
88
"strconv"
9+
"sync"
910
"sync/atomic"
1011
"testing"
1112
"time"
@@ -138,6 +139,49 @@ func test[N int64 | float64](meas Measure[N], comp ComputeAggregation, steps []t
138139
}
139140
}
140141

142+
func testAggergationConcurrentSafe[N int64 | float64](
143+
meas Measure[N],
144+
comp ComputeAggregation,
145+
validate func(t *testing.T, agg metricdata.Aggregation),
146+
) func(*testing.T) {
147+
return func(t *testing.T) {
148+
t.Helper()
149+
150+
got := new(metricdata.Aggregation)
151+
ctx := t.Context()
152+
var wg sync.WaitGroup
153+
for _, args := range []arg[N]{
154+
{ctx, 2, alice},
155+
{ctx, 6, alice},
156+
{ctx, 4, alice},
157+
{ctx, 10, alice},
158+
{ctx, 22, alice},
159+
{ctx, -3, bob},
160+
{ctx, -6, bob},
161+
{ctx, 3, bob},
162+
{ctx, 6, bob},
163+
} {
164+
wg.Add(1)
165+
go func() {
166+
defer wg.Done()
167+
meas(args.ctx, args.value, args.attr)
168+
}()
169+
}
170+
wg.Add(1)
171+
go func() {
172+
defer wg.Done()
173+
for range 2 {
174+
comp(got)
175+
// We do not check expected output for each step because
176+
// computeAggregation is run concurrently with steps. Instead,
177+
// we validate that the output is a valid possible output.
178+
validate(t, *got)
179+
}
180+
}()
181+
wg.Wait()
182+
}
183+
}
184+
141185
func benchmarkAggregate[N int64 | float64](factory func() (Measure[N], ComputeAggregation)) func(*testing.B) {
142186
counts := []int{1, 10, 100}
143187
return func(b *testing.B) {

sdk/metric/internal/aggregate/exponential_histogram_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,69 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
10401040
})
10411041
}
10421042

1043+
func TestExponentialHistogramAggregationConcurrentSafe(t *testing.T) {
1044+
t.Run("Int64/Delta", testDeltaExpoHistConcurrentSafe[int64]())
1045+
t.Run("Float64/Delta", testDeltaExpoHistConcurrentSafe[float64]())
1046+
t.Run("Int64/Cumulative", testCumulativeExpoHistConcurrentSafe[int64]())
1047+
t.Run("Float64/Cumulative", testCumulativeExpoHistConcurrentSafe[float64]())
1048+
}
1049+
1050+
func testDeltaExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
1051+
in, out := Builder[N]{
1052+
Temporality: metricdata.DeltaTemporality,
1053+
Filter: attrFltr,
1054+
AggregationLimit: 3,
1055+
}.ExponentialBucketHistogram(4, 20, false, false)
1056+
return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
1057+
}
1058+
1059+
func testCumulativeExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
1060+
in, out := Builder[N]{
1061+
Temporality: metricdata.CumulativeTemporality,
1062+
Filter: attrFltr,
1063+
AggregationLimit: 3,
1064+
}.ExponentialBucketHistogram(4, 20, false, false)
1065+
return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
1066+
}
1067+
1068+
func validateExponentialHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
1069+
s, ok := got.(metricdata.ExponentialHistogram[N])
1070+
if !ok {
1071+
t.Fatalf("wrong aggregation type: %+v", got)
1072+
}
1073+
for _, dp := range s.DataPoints {
1074+
assert.False(t,
1075+
dp.Time.Before(dp.StartTime),
1076+
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
1077+
)
1078+
switch dp.Attributes {
1079+
case fltrAlice:
1080+
// alice observations are always a multiple of 2
1081+
assert.Equal(t, int64(0), int64(dp.Sum)%2)
1082+
case fltrBob:
1083+
// bob observations are always a multiple of 3
1084+
assert.Equal(t, int64(0), int64(dp.Sum)%3)
1085+
default:
1086+
t.Fatalf("wrong attributes %+v", dp.Attributes)
1087+
}
1088+
avg := float64(dp.Sum) / float64(dp.Count)
1089+
if minVal, ok := dp.Min.Value(); ok {
1090+
assert.GreaterOrEqual(t, avg, float64(minVal))
1091+
}
1092+
if maxVal, ok := dp.Max.Value(); ok {
1093+
assert.LessOrEqual(t, avg, float64(maxVal))
1094+
}
1095+
var totalCount uint64
1096+
for _, bc := range dp.PositiveBucket.Counts {
1097+
totalCount += bc
1098+
}
1099+
for _, bc := range dp.NegativeBucket.Counts {
1100+
totalCount += bc
1101+
}
1102+
assert.Equal(t, totalCount, dp.Count)
1103+
}
1104+
}
1105+
10431106
func FuzzGetBin(f *testing.F) {
10441107
values := []float64{
10451108
2.0,

sdk/metric/internal/aggregate/histogram_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,66 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
223223
})
224224
}
225225

226+
func TestHistogramConcurrentSafe(t *testing.T) {
227+
t.Run("Int64/Delta", testDeltaHistConcurrentSafe[int64]())
228+
t.Run("Float64/Delta", testDeltaHistConcurrentSafe[float64]())
229+
t.Run("Int64/Cumulative", testCumulativeHistConcurrentSafe[int64]())
230+
t.Run("Float64/Cumulative", testCumulativeHistConcurrentSafe[float64]())
231+
}
232+
233+
func validateHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
234+
s, ok := got.(metricdata.Histogram[N])
235+
if !ok {
236+
t.Fatalf("wrong aggregation type: %+v", got)
237+
}
238+
for _, dp := range s.DataPoints {
239+
assert.False(t,
240+
dp.Time.Before(dp.StartTime),
241+
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
242+
)
243+
switch dp.Attributes {
244+
case fltrAlice:
245+
// alice observations are always a multiple of 2
246+
assert.Equal(t, int64(0), int64(dp.Sum)%2)
247+
case fltrBob:
248+
// bob observations are always a multiple of 3
249+
assert.Equal(t, int64(0), int64(dp.Sum)%3)
250+
default:
251+
t.Fatalf("wrong attributes %+v", dp.Attributes)
252+
}
253+
avg := float64(dp.Sum) / float64(dp.Count)
254+
if minVal, ok := dp.Min.Value(); ok {
255+
assert.GreaterOrEqual(t, avg, float64(minVal))
256+
}
257+
if maxVal, ok := dp.Max.Value(); ok {
258+
assert.LessOrEqual(t, avg, float64(maxVal))
259+
}
260+
var totalCount uint64
261+
for _, bc := range dp.BucketCounts {
262+
totalCount += bc
263+
}
264+
assert.Equal(t, totalCount, dp.Count)
265+
}
266+
}
267+
268+
func testDeltaHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
269+
in, out := Builder[N]{
270+
Temporality: metricdata.DeltaTemporality,
271+
Filter: attrFltr,
272+
AggregationLimit: 3,
273+
}.ExplicitBucketHistogram(bounds, noMinMax, false)
274+
return testAggergationConcurrentSafe[N](in, out, validateHistogram[N])
275+
}
276+
277+
func testCumulativeHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
278+
in, out := Builder[N]{
279+
Temporality: metricdata.CumulativeTemporality,
280+
Filter: attrFltr,
281+
AggregationLimit: 3,
282+
}.ExplicitBucketHistogram(bounds, noMinMax, false)
283+
return testAggergationConcurrentSafe[N](in, out, validateHistogram[N])
284+
}
285+
226286
// hPointSummed returns an HistogramDataPoint that started and ended now with
227287
// multi number of measurements values v. It includes a min and max (set to v).
228288
func hPointSummed[N int64 | float64](

sdk/metric/internal/aggregate/lastvalue_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"context"
88
"testing"
99

10+
"github.com/stretchr/testify/assert"
11+
1012
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1113
)
1214

@@ -468,6 +470,76 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) {
468470
})
469471
}
470472

473+
func TestLastValueConcurrentSafe(t *testing.T) {
474+
t.Run("Int64/DeltaLastValue", testDeltaLastValueConcurrentSafe[int64]())
475+
t.Run("Float64/DeltaLastValue", testDeltaLastValueConcurrentSafe[float64]())
476+
t.Run("Int64/CumulativeLastValue", testCumulativeLastValueConcurrentSafe[int64]())
477+
t.Run("Float64/CumulativeLastValue", testCumulativeLastValueConcurrentSafe[float64]())
478+
t.Run("Int64/DeltaPrecomputedLastValue", testDeltaPrecomputedLastValueConcurrentSafe[int64]())
479+
t.Run("Float64/DeltaPrecomputedLastValue", testDeltaPrecomputedLastValueConcurrentSafe[float64]())
480+
t.Run("Int64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValueConcurrentSafe[int64]())
481+
t.Run("Float64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValueConcurrentSafe[float64]())
482+
}
483+
484+
func validateGauge[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
485+
s, ok := got.(metricdata.Gauge[N])
486+
if !ok {
487+
t.Fatalf("wrong aggregation type: %+v", got)
488+
}
489+
for _, dp := range s.DataPoints {
490+
assert.False(t,
491+
dp.Time.Before(dp.StartTime),
492+
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
493+
)
494+
switch dp.Attributes {
495+
case fltrAlice:
496+
// alice observations are always a multiple of 2
497+
assert.Equal(t, int64(0), int64(dp.Value)%2)
498+
case fltrBob:
499+
// bob observations are always a multiple of 3
500+
assert.Equal(t, int64(0), int64(dp.Value)%3)
501+
default:
502+
t.Fatalf("wrong attributes %+v", dp.Attributes)
503+
}
504+
}
505+
}
506+
507+
func testCumulativeLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) {
508+
in, out := Builder[N]{
509+
Temporality: metricdata.CumulativeTemporality,
510+
Filter: attrFltr,
511+
AggregationLimit: 3,
512+
}.LastValue()
513+
return testAggergationConcurrentSafe[N](in, out, validateGauge[N])
514+
}
515+
516+
func testDeltaLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) {
517+
in, out := Builder[N]{
518+
Temporality: metricdata.DeltaTemporality,
519+
Filter: attrFltr,
520+
AggregationLimit: 3,
521+
}.LastValue()
522+
return testAggergationConcurrentSafe[N](in, out, validateGauge[N])
523+
}
524+
525+
func testDeltaPrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) {
526+
in, out := Builder[N]{
527+
Temporality: metricdata.DeltaTemporality,
528+
Filter: attrFltr,
529+
AggregationLimit: 3,
530+
}.PrecomputedLastValue()
531+
return testAggergationConcurrentSafe[N](in, out, validateGauge[N])
532+
}
533+
534+
func testCumulativePrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) {
535+
in, out := Builder[N]{
536+
Temporality: metricdata.CumulativeTemporality,
537+
Filter: attrFltr,
538+
AggregationLimit: 3,
539+
}.PrecomputedLastValue()
540+
return testAggergationConcurrentSafe[N](in, out, validateGauge[N])
541+
}
542+
471543
func BenchmarkLastValue(b *testing.B) {
472544
b.Run("Int64", benchmarkAggregate(Builder[int64]{}.PrecomputedLastValue))
473545
b.Run("Float64", benchmarkAggregate(Builder[float64]{}.PrecomputedLastValue))

sdk/metric/internal/aggregate/sum_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"context"
88
"testing"
99

10+
"github.com/stretchr/testify/assert"
11+
1012
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1113
)
1214

@@ -538,6 +540,80 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) {
538540
})
539541
}
540542

543+
func TestSumConcurrentSafe(t *testing.T) {
544+
t.Run("Int64/DeltaSum", testDeltaSumConcurrentSafe[int64]())
545+
t.Run("Float64/DeltaSum", testDeltaSumConcurrentSafe[float64]())
546+
t.Run("Int64/CumulativeSum", testCumulativeSumConcurrentSafe[int64]())
547+
t.Run("Float64/CumulativeSum", testCumulativeSumConcurrentSafe[float64]())
548+
t.Run("Int64/DeltaPrecomputedSum", testDeltaPrecomputedSumConcurrentSafe[int64]())
549+
t.Run("Float64/DeltaPrecomputedSum", testDeltaPrecomputedSumConcurrentSafe[float64]())
550+
t.Run("Int64/CumulativePrecomputedSum", testCumulativePrecomputedSumConcurrentSafe[int64]())
551+
t.Run("Float64/CumulativePrecomputedSum", testCumulativePrecomputedSumConcurrentSafe[float64]())
552+
}
553+
554+
func validateSum[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
555+
s, ok := got.(metricdata.Sum[N])
556+
if !ok {
557+
t.Fatalf("wrong aggregation type: %+v", got)
558+
}
559+
for _, dp := range s.DataPoints {
560+
assert.False(t,
561+
dp.Time.Before(dp.StartTime),
562+
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
563+
)
564+
switch dp.Attributes {
565+
case fltrAlice:
566+
// alice observations are always a multiple of 2
567+
assert.Equal(t, int64(0), int64(dp.Value)%2)
568+
case fltrBob:
569+
// bob observations are always a multiple of 3
570+
assert.Equal(t, int64(0), int64(dp.Value)%3)
571+
default:
572+
t.Fatalf("wrong attributes %+v", dp.Attributes)
573+
}
574+
}
575+
}
576+
577+
func testDeltaSumConcurrentSafe[N int64 | float64]() func(t *testing.T) {
578+
mono := false
579+
in, out := Builder[N]{
580+
Temporality: metricdata.DeltaTemporality,
581+
Filter: attrFltr,
582+
AggregationLimit: 3,
583+
}.Sum(mono)
584+
return testAggergationConcurrentSafe[N](in, out, validateSum[N])
585+
}
586+
587+
func testCumulativeSumConcurrentSafe[N int64 | float64]() func(t *testing.T) {
588+
mono := false
589+
in, out := Builder[N]{
590+
Temporality: metricdata.CumulativeTemporality,
591+
Filter: attrFltr,
592+
AggregationLimit: 3,
593+
}.Sum(mono)
594+
return testAggergationConcurrentSafe[N](in, out, validateSum[N])
595+
}
596+
597+
func testDeltaPrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) {
598+
mono := false
599+
in, out := Builder[N]{
600+
Temporality: metricdata.DeltaTemporality,
601+
Filter: attrFltr,
602+
AggregationLimit: 3,
603+
}.PrecomputedSum(mono)
604+
return testAggergationConcurrentSafe[N](in, out, validateSum[N])
605+
}
606+
607+
func testCumulativePrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) {
608+
mono := false
609+
in, out := Builder[N]{
610+
Temporality: metricdata.CumulativeTemporality,
611+
Filter: attrFltr,
612+
AggregationLimit: 3,
613+
}.PrecomputedSum(mono)
614+
return testAggergationConcurrentSafe[N](in, out, validateSum[N])
615+
}
616+
541617
func BenchmarkSum(b *testing.B) {
542618
// The monotonic argument is only used to annotate the Sum returned from
543619
// the Aggregation method. It should not have an effect on operational

0 commit comments

Comments
 (0)