diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a46d2fa49b..8331b016c94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/meter` synchronously de-duplicates the passed attributes instead of delegating it to the returned `MeterOption`. (#7266) - `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/log` synchronously de-duplicates the passed attributes instead of delegating it to the returned `LoggerOption`. (#7266) - `Distinct` in `go.opentelemetry.io/otel/attribute` is no longer guaranteed to uniquely identify an attribute set. Collisions between `Distinct` values for different Sets are possible with extremely high cardinality (billions of series per instrument), but are highly unlikely. (#7175) +- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7189) diff --git a/sdk/metric/internal/aggregate/atomic.go b/sdk/metric/internal/aggregate/atomic.go new file mode 100644 index 00000000000..5c0050650cf --- /dev/null +++ b/sdk/metric/internal/aggregate/atomic.go @@ -0,0 +1,109 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "math" + "sync/atomic" +) + +// atomicSum is an efficient way of adding to a number which is either an +// int64 or float64. +type atomicSum[N int64 | float64] struct { + // nFloatBits contains only the non-integer portion of the counter. + nFloatBits atomic.Uint64 + // nInt contains only the integer portion of the counter. + nInt atomic.Int64 +} + +// load returns the float or integer value. +func (n *atomicSum[N]) load() N { + fval := math.Float64frombits(n.nFloatBits.Load()) + ival := n.nInt.Load() + return N(fval + float64(ival)) +} + +func (n *atomicSum[N]) add(value N) { + ival := int64(value) + // This case is where the value is an int, or if it is a whole-numbered float. + if float64(ival) == float64(value) { + n.nInt.Add(ival) + return + } + + // Value must be a float below. + for { + oldBits := n.nFloatBits.Load() + newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value)) + if n.nFloatBits.CompareAndSwap(oldBits, newBits) { + return + } + } +} + +type atomicIntOrFloat[N int64 | float64] struct { + // nFloatBits contains the float bits if N is float64. + nFloatBits atomic.Uint64 + // nInt contains the int64 if N is int64 + nInt atomic.Int64 +} + +func (n *atomicIntOrFloat[N]) store(value N) { + switch v := any(value).(type) { + case int64: + n.nInt.Store(v) + case float64: + n.nFloatBits.Store(math.Float64bits(v)) + } +} + +func (n *atomicIntOrFloat[N]) load() (value N) { + switch any(value).(type) { + case int64: + value = N(n.nInt.Load()) + case float64: + value = N(math.Float64frombits(n.nFloatBits.Load())) + } + return +} + +func (n *atomicIntOrFloat[N]) compareAndSwap(oldVal, newVal N) bool { + switch any(oldVal).(type) { + case float64: + return n.nFloatBits.CompareAndSwap(math.Float64bits(float64(oldVal)), math.Float64bits(float64(newVal))) + default: + return n.nInt.CompareAndSwap(int64(oldVal), int64(newVal)) + } +} + +type atomicMinMax[N int64 | float64] struct { + min atomicIntOrFloat[N] + max atomicIntOrFloat[N] + isSet atomic.Bool +} + +func (n *atomicMinMax[N]) observe(value N) { + for { + minLoaded := n.min.load() + if (!n.isSet.Load() || value < minLoaded) && !n.min.compareAndSwap(minLoaded, value) { + // We got a new min value, but lost the race. Try again. + continue + } + maxLoaded := n.max.load() + if (!n.isSet.Load() || value > maxLoaded) && !n.max.compareAndSwap(maxLoaded, value) { + // We got a new max value, but lost the race. Try again. + continue + } + break + } + n.isSet.Store(true) +} + +func (n *atomicMinMax[N]) loadMin() (value N) { + return n.min.load() +} + +func (n *atomicMinMax[N]) loadMax() (value N) { + return n.max.load() +} diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index 857eddf305f..f19b7849394 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -8,6 +8,7 @@ import ( "errors" "math" "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel" @@ -20,32 +21,27 @@ const ( expoMinScale = -10 smallestNonZeroNormalFloat64 = 0x1p-1022 - - // These redefine the Math constants with a type, so the compiler won't coerce - // them into an int on 32 bit platforms. - maxInt64 int64 = math.MaxInt64 - minInt64 int64 = math.MinInt64 ) // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { - attrs attribute.Set - res FilteredExemplarReservoir[N] - - count uint64 - min N - max N - sum N + count uint64 + zeroCount uint64 + sum atomicSum[N] + minMax atomicMinMax[N] maxSize int noMinMax bool noSum bool - scale int32 + scaleMux sync.RWMutex + scale int32 posBuckets expoBuckets negBuckets expoBuckets - zeroCount uint64 + + attrs attribute.Set + res FilteredExemplarReservoir[N] } func newExpoHistogramDataPoint[N int64 | float64]( @@ -54,17 +50,8 @@ func newExpoHistogramDataPoint[N int64 | float64]( maxScale int32, noMinMax, noSum bool, ) *expoHistogramDataPoint[N] { // nolint:revive // we need this control flag - f := math.MaxFloat64 - ma := N(f) // if N is int64, max will overflow to -9223372036854775808 - mi := N(-f) - if N(maxInt64) > N(f) { - ma = N(maxInt64) - mi = N(minInt64) - } return &expoHistogramDataPoint[N]{ attrs: attrs, - min: ma, - max: mi, maxSize: maxSize, noMinMax: noMinMax, noSum: noSum, @@ -72,29 +59,30 @@ func newExpoHistogramDataPoint[N int64 | float64]( } } +func (p *expoHistogramDataPoint[N]) measure(ctx context.Context, v N, droppedAttr []attribute.KeyValue) { + p.record(v) + p.res.Offer(ctx, v, droppedAttr) +} + // record adds a new measurement to the histogram. It will rescale the buckets if needed. func (p *expoHistogramDataPoint[N]) record(v N) { - p.count++ + atomic.AddUint64(&p.count, 1) if !p.noMinMax { - if v < p.min { - p.min = v - } - if v > p.max { - p.max = v - } + p.minMax.observe(v) } if !p.noSum { - p.sum += v + p.sum.add(v) } absV := math.Abs(float64(v)) if float64(absV) == 0.0 { - p.zeroCount++ + atomic.AddUint64(&p.zeroCount, 1) return } + p.scaleMux.RLock() bin := p.getBin(absV) bucket := &p.posBuckets @@ -104,22 +92,34 @@ func (p *expoHistogramDataPoint[N]) record(v N) { // If the new bin would make the counts larger than maxScale, we need to // downscale current measurements. - if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 { - if p.scale-scaleDelta < expoMinScale { - // With a scale of -10 there is only two buckets for the whole range of float64 values. - // This can only happen if there is a max size of 1. - otel.Handle(errors.New("exponential histogram scale underflow")) - return - } - // Downscale + scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)) + if scaleDelta <= 0 && !bucket.needsResize(bin) { + // Fast path without requiring the full lock + bucket.recordWithoutResize(bin) + p.scaleMux.RUnlock() + return + } + if p.scale-scaleDelta < expoMinScale { + // With a scale of -10 there is only two buckets for the whole range of float64 values. + // This can only happen if there is a max size of 1. + otel.Handle(errors.New("exponential histogram scale underflow")) + p.scaleMux.RUnlock() + return + } + // Switch to a full Lock for downscaling or resizing + p.scaleMux.RUnlock() + p.scaleMux.Lock() + defer p.scaleMux.Unlock() + // recompute the scaleDelta now that we hold the full lock + scaleDelta = p.scaleChange(bin, bucket.startBin, len(bucket.counts)) + // Downscale + if scaleDelta > 0 { p.scale -= scaleDelta p.posBuckets.downscale(scaleDelta) p.negBuckets.downscale(scaleDelta) - - bin = p.getBin(absV) } - bucket.record(bin) + bucket.record(p.getBin(absV)) } // getBin returns the bin v should be recorded into. @@ -199,6 +199,15 @@ type expoBuckets struct { counts []uint64 } +func (b *expoBuckets) needsResize(bin int32) bool { + endBin := int(b.startBin) + len(b.counts) - 1 + return len(b.counts) == 0 || bin < b.startBin || int(bin) > endBin +} + +func (b *expoBuckets) recordWithoutResize(bin int32) { + atomic.AddUint64(&b.counts[bin-b.startBin], 1) +} + // record increments the count for the given bin, and expands the buckets if needed. // Size changes must be done before calling this function. func (b *expoBuckets) record(bin int32) { @@ -212,6 +221,7 @@ func (b *expoBuckets) record(bin int32) { // if the new bin is inside the current range if bin >= b.startBin && int(bin) <= endBin { + // No need to use atomics since we hold the RW lock. b.counts[bin-b.startBin]++ return } @@ -301,7 +311,7 @@ func newExponentialHistogram[N int64 | float64]( maxScale: maxScale, newRes: r, - limit: newLimiter[*expoHistogramDataPoint[N]](limit), + limit: newLimiter[expoHistogramDataPoint[N]](limit), values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]), start: now(), @@ -316,10 +326,10 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int32 - newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[*expoHistogramDataPoint[N]] - values map[attribute.Distinct]*expoHistogramDataPoint[N] - valuesMu sync.Mutex + newRes func(attribute.Set) FilteredExemplarReservoir[N] + limit limiter[expoHistogramDataPoint[N]] + values map[attribute.Distinct]*expoHistogramDataPoint[N] + sync.RWMutex start time.Time } @@ -335,19 +345,30 @@ func (e *expoHistogram[N]) measure( return } - e.valuesMu.Lock() - defer e.valuesMu.Unlock() - + // Hold the RLock even after we are done reading from the values map to + // ensure we don't race with collection. + e.RLock() attr := e.limit.Attributes(fltrAttr, e.values) v, ok := e.values[attr.Equivalent()] - if !ok { - v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) - v.res = e.newRes(attr) - - e.values[attr.Equivalent()] = v + if ok { + v.measure(ctx, value, droppedAttr) + e.RUnlock() + return + } + e.RUnlock() + // Switch to a full lock to add a new element to the map. + e.Lock() + defer e.Unlock() + // Check that the element wasn't added since we last checked. + v, ok = e.values[attr.Equivalent()] + if ok { + v.measure(ctx, value, droppedAttr) + return } - v.record(value) - v.res.Offer(ctx, value, droppedAttr) + v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) + v.res = e.newRes(attr) + v.measure(ctx, value, droppedAttr) + e.values[attr.Equivalent()] = v } func (e *expoHistogram[N]) delta( @@ -360,8 +381,8 @@ func (e *expoHistogram[N]) delta( h, _ := (*dest).(metricdata.ExponentialHistogram[N]) h.Temporality = metricdata.DeltaTemporality - e.valuesMu.Lock() - defer e.valuesMu.Unlock() + e.Lock() + defer e.Unlock() n := len(e.values) hDPts := reset(h.DataPoints, n, n) @@ -393,11 +414,11 @@ func (e *expoHistogram[N]) delta( copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) if !e.noSum { - hDPts[i].Sum = val.sum + hDPts[i].Sum = val.sum.load() } if !e.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + hDPts[i].Min = metricdata.NewExtrema(val.minMax.loadMin()) + hDPts[i].Max = metricdata.NewExtrema(val.minMax.loadMax()) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) @@ -423,8 +444,8 @@ func (e *expoHistogram[N]) cumulative( h, _ := (*dest).(metricdata.ExponentialHistogram[N]) h.Temporality = metricdata.CumulativeTemporality - e.valuesMu.Lock() - defer e.valuesMu.Unlock() + e.Lock() + defer e.Unlock() n := len(e.values) hDPts := reset(h.DataPoints, n, n) @@ -456,11 +477,11 @@ func (e *expoHistogram[N]) cumulative( copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) if !e.noSum { - hDPts[i].Sum = val.sum + hDPts[i].Sum = val.sum.load() } if !e.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + hDPts[i].Min = metricdata.NewExtrema(val.minMax.loadMin()) + hDPts[i].Max = metricdata.NewExtrema(val.minMax.loadMax()) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index c0ad6d53e0f..e498fceeeeb 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -178,9 +178,9 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) { } dp := h.values[alice.Equivalent()] - assert.Equal(t, tt.expected.max, dp.max) - assert.Equal(t, tt.expected.min, dp.min) - assert.InDelta(t, tt.expected.sum, dp.sum, 0.01) + assert.Equal(t, tt.expected.max, dp.minMax.loadMax()) + assert.Equal(t, tt.expected.min, dp.minMax.loadMin()) + assert.InDelta(t, tt.expected.sum, dp.sum.load(), 0.01) }) } } @@ -220,9 +220,9 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) { } dp := h.values[alice.Equivalent()] - assert.Equal(t, tt.expected.max, dp.max) - assert.Equal(t, tt.expected.min, dp.min) - assert.InDelta(t, tt.expected.sum, dp.sum, 0.01) + assert.Equal(t, tt.expected.max, dp.minMax.loadMax()) + assert.Equal(t, tt.expected.min, dp.minMax.loadMin()) + assert.InDelta(t, tt.expected.sum, dp.sum.load(), 0.01) }) } } @@ -703,19 +703,9 @@ func BenchmarkExponentialHistogram(b *testing.B) { } func TestSubNormal(t *testing.T) { - want := &expoHistogramDataPoint[float64]{ - attrs: alice, - maxSize: 4, - count: 3, - min: math.SmallestNonzeroFloat64, - max: math.SmallestNonzeroFloat64, - sum: 3 * math.SmallestNonzeroFloat64, - - scale: 20, - posBuckets: expoBuckets{ - startBin: -1126170625, - counts: []uint64{3}, - }, + wantBuckets := expoBuckets{ + startBin: -1126170625, + counts: []uint64{3}, } ehdp := newExpoHistogramDataPoint[float64](alice, 4, 20, false, false) @@ -723,7 +713,14 @@ func TestSubNormal(t *testing.T) { ehdp.record(math.SmallestNonzeroFloat64) ehdp.record(math.SmallestNonzeroFloat64) - assert.Equal(t, want, ehdp) + assert.Equal(t, alice, ehdp.attrs) + assert.Equal(t, 4, ehdp.maxSize) + assert.Equal(t, uint64(3), ehdp.count) + assert.Equal(t, math.SmallestNonzeroFloat64, ehdp.minMax.loadMin()) + assert.Equal(t, math.SmallestNonzeroFloat64, ehdp.minMax.loadMax()) + assert.Equal(t, 3*math.SmallestNonzeroFloat64, ehdp.sum.load()) + assert.Equal(t, int32(20), ehdp.scale) + assert.Equal(t, wantBuckets, ehdp.posBuckets) } func TestExponentialHistogramAggregation(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/filtered_reservoir.go b/sdk/metric/internal/aggregate/filtered_reservoir.go index d4c41642d79..590cef22373 100644 --- a/sdk/metric/internal/aggregate/filtered_reservoir.go +++ b/sdk/metric/internal/aggregate/filtered_reservoir.go @@ -5,6 +5,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" + "sync" "time" "go.opentelemetry.io/otel/attribute" @@ -27,6 +28,7 @@ type FilteredExemplarReservoir[N int64 | float64] interface { // filteredExemplarReservoir handles the pre-sampled exemplar of measurements made. type filteredExemplarReservoir[N int64 | float64] struct { + mu sync.Mutex filter exemplar.Filter reservoir exemplar.Reservoir } @@ -45,9 +47,17 @@ func NewFilteredExemplarReservoir[N int64 | float64]( func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { if f.filter(ctx) { + // We need to lock here because the individual aggregation only holds a + // read lock. + f.mu.Lock() + defer f.mu.Unlock() // only record the current time if we are sampling this measurement. f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr) } } -func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) } +func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { + // No need to lock here because the individual aggregation already holds + // the RW lock. + f.reservoir.Collect(dest) +} diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 22d6c67d162..7c376b05111 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -8,6 +8,7 @@ import ( "slices" "sort" "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" @@ -15,51 +16,51 @@ import ( ) type buckets[N int64 | float64] struct { - attrs attribute.Set - res FilteredExemplarReservoir[N] - - counts []uint64 count uint64 - total N - min, max N -} - -// newBuckets returns buckets with n bins. -func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] { - return &buckets[N]{attrs: attrs, counts: make([]uint64, n)} -} - -func (b *buckets[N]) sum(value N) { b.total += value } + counts []uint64 + minMax atomicMinMax[N] + total atomicSum[N] + noSum bool + noMinMax bool -func (b *buckets[N]) bin(idx int) { - b.counts[idx]++ - b.count++ + attrs attribute.Set + res FilteredExemplarReservoir[N] } -func (b *buckets[N]) minMax(value N) { - if value < b.min { - b.min = value - } else if value > b.max { - b.max = value +func (b *buckets[N]) measure( + ctx context.Context, + value N, + idx int, + droppedAttr []attribute.KeyValue, +) { + atomic.AddUint64(&b.counts[idx], 1) + atomic.AddUint64(&b.count, 1) + if !b.noMinMax { + b.minMax.observe(value) + } + if !b.noSum { + b.total.add(value) } + b.res.Offer(ctx, value, droppedAttr) } // histValues summarizes a set of measurements as an histValues with // explicitly defined buckets. type histValues[N int64 | float64] struct { - noMinMax bool noSum bool + noMinMax bool bounds []float64 - newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[*buckets[N]] - values map[attribute.Distinct]*buckets[N] - valuesMu sync.Mutex + newRes func(attribute.Set) FilteredExemplarReservoir[N] + limit limiter[buckets[N]] + values map[attribute.Distinct]*buckets[N] + sync.RWMutex } func newHistValues[N int64 | float64]( bounds []float64, - noMinMax, noSum bool, + noSum bool, + noMinMax bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], ) *histValues[N] { @@ -70,11 +71,11 @@ func newHistValues[N int64 | float64]( b := slices.Clone(bounds) slices.Sort(b) return &histValues[N]{ - noMinMax: noMinMax, noSum: noSum, + noMinMax: noMinMax, bounds: b, newRes: r, - limit: newLimiter[*buckets[N]](limit), + limit: newLimiter[buckets[N]](limit), values: make(map[attribute.Distinct]*buckets[N]), } } @@ -94,12 +95,28 @@ func (s *histValues[N]) measure( // (s.bounds[len(s.bounds)-1], +∞). idx := sort.SearchFloat64s(s.bounds, float64(value)) - s.valuesMu.Lock() - defer s.valuesMu.Unlock() - + // Hold the RLock even after we are done reading from the values map to + // ensure we don't race with collection. + s.RLock() attr := s.limit.Attributes(fltrAttr, s.values) b, ok := s.values[attr.Equivalent()] - if !ok { + if ok { + b.measure(ctx, value, idx, droppedAttr) + s.RUnlock() + return + } + s.RUnlock() + // Switch to a full lock to add a new element to the map. + s.Lock() + defer s.Unlock() + // Check that the element wasn't added since we last checked. + b, ok = s.values[attr.Equivalent()] + if ok { + b.measure(ctx, value, idx, droppedAttr) + return + } + b = &buckets[N]{ + attrs: attr, // N+1 buckets. For example: // // bounds = [0, 5, 10] @@ -107,21 +124,13 @@ func (s *histValues[N]) measure( // Then, // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) - b = newBuckets[N](attr, len(s.bounds)+1) - b.res = s.newRes(attr) - - // Ensure min and max are recorded values (not zero), for new buckets. - b.min, b.max = value, value - s.values[attr.Equivalent()] = b + counts: make([]uint64, len(s.bounds)+1), + res: s.newRes(attr), + noSum: s.noSum, + noMinMax: s.noMinMax, } - b.bin(idx) - if !s.noMinMax { - b.minMax(value) - } - if !s.noSum { - b.sum(value) - } - b.res.Offer(ctx, value, droppedAttr) + b.measure(ctx, value, idx, droppedAttr) + s.values[attr.Equivalent()] = b } // newHistogram returns an Aggregator that summarizes a set of measurements as @@ -133,7 +142,7 @@ func newHistogram[N int64 | float64]( r func(attribute.Set) FilteredExemplarReservoir[N], ) *histogram[N] { return &histogram[N]{ - histValues: newHistValues[N](boundaries, noMinMax, noSum, limit, r), + histValues: newHistValues[N](boundaries, noSum, noMinMax, limit, r), start: now(), } } @@ -156,8 +165,11 @@ func (s *histogram[N]) delta( h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.DeltaTemporality - s.valuesMu.Lock() - defer s.valuesMu.Unlock() + // Acquire a full lock to ensure there are no concurrent measure() calls. + // If we only used a RLock, we could observe "partial" measurements, such + // as a histogram count increment without a histogram total increment. + s.Lock() + defer s.Unlock() // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) @@ -175,12 +187,12 @@ func (s *histogram[N]) delta( hDPts[i].BucketCounts = val.counts if !s.noSum { - hDPts[i].Sum = val.total + hDPts[i].Sum = val.total.load() } if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + hDPts[i].Min = metricdata.NewExtrema(val.minMax.loadMin()) + hDPts[i].Max = metricdata.NewExtrema(val.minMax.loadMax()) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) @@ -208,8 +220,11 @@ func (s *histogram[N]) cumulative( h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.CumulativeTemporality - s.valuesMu.Lock() - defer s.valuesMu.Unlock() + // Acquire a full lock to ensure there are no concurrent measure() calls. + // If we only used a RLock, we could observe "partial" measurements, such + // as a histogram count increment without a histogram total increment. + s.Lock() + defer s.Unlock() // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) @@ -233,12 +248,12 @@ func (s *histogram[N]) cumulative( hDPts[i].BucketCounts = slices.Clone(val.counts) if !s.noSum { - hDPts[i].Sum = val.total + hDPts[i].Sum = val.total.load() } if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(val.min) - hDPts[i].Max = metricdata.NewExtrema(val.max) + hDPts[i].Min = metricdata.NewExtrema(val.minMax.loadMin()) + hDPts[i].Max = metricdata.NewExtrema(val.minMax.loadMax()) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index d184b46f287..9fe30856ff6 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -277,21 +277,22 @@ func TestBucketsBin(t *testing.T) { func testBucketsBin[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { - b := newBuckets[N](alice, 3) + b := &buckets[N]{ + attrs: alice, + counts: make([]uint64, 3), + res: dropExemplars[N](alice), + } assertB := func(counts []uint64, count uint64, mi, ma N) { t.Helper() assert.Equal(t, counts, b.counts) assert.Equal(t, count, b.count) - assert.Equal(t, mi, b.min) - assert.Equal(t, ma, b.max) + assert.Equal(t, mi, b.minMax.loadMin()) + assert.Equal(t, ma, b.minMax.loadMax()) } - assertB([]uint64{0, 0, 0}, 0, 0, 0) - b.bin(1) - b.minMax(2) - assertB([]uint64{0, 1, 0}, 1, 0, 2) - b.bin(0) - b.minMax(-1) + b.measure(context.Background(), 2, 1, nil) + assertB([]uint64{0, 1, 0}, 1, 2, 2) + b.measure(context.Background(), -1, 0, nil) assertB([]uint64{1, 1, 0}, 2, -1, 2) } } @@ -303,18 +304,21 @@ func TestBucketsSum(t *testing.T) { func testBucketsSum[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { - b := newBuckets[N](alice, 3) + b := &buckets[N]{ + attrs: alice, + counts: make([]uint64, 3), + } var want N - assert.Equal(t, want, b.total) + assert.Equal(t, want, b.total.load()) - b.sum(2) + b.total.add(2) want = 2 - assert.Equal(t, want, b.total) + assert.Equal(t, want, b.total.load()) - b.sum(-1) + b.total.add(-1) want = 1 - assert.Equal(t, want, b.total) + assert.Equal(t, want, b.total.load()) } } diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 4bbe624c77c..f0c6e118e25 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -14,44 +14,61 @@ import ( // datapoint is timestamped measurement data. type datapoint[N int64 | float64] struct { + value atomicIntOrFloat[N] attrs attribute.Set - value N res FilteredExemplarReservoir[N] } +func (d *datapoint[N]) measure(ctx context.Context, value N, droppedAttr []attribute.KeyValue) { + d.res.Offer(ctx, value, droppedAttr) + d.value.store(value) +} + func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] { return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), - values: make(map[attribute.Distinct]datapoint[N]), + values: make(map[attribute.Distinct]*datapoint[N]), start: now(), } } // lastValue summarizes a set of measurements as the last one made. type lastValue[N int64 | float64] struct { - sync.Mutex + sync.RWMutex newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[datapoint[N]] - values map[attribute.Distinct]datapoint[N] + values map[attribute.Distinct]*datapoint[N] start time.Time } func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - s.Lock() - defer s.Unlock() - + // Hold the RLock even after we are done reading from the values map to + // ensure we don't race with collection. + s.RLock() attr := s.limit.Attributes(fltrAttr, s.values) d, ok := s.values[attr.Equivalent()] - if !ok { - d.res = s.newRes(attr) + if ok { + d.measure(ctx, value, droppedAttr) + s.RUnlock() + return } - - d.attrs = attr - d.value = value - d.res.Offer(ctx, value, droppedAttr) - + s.RUnlock() + // Switch to a full lock to add a new element to the map. + s.Lock() + defer s.Unlock() + // Check that the element wasn't added since we last checked. + d, ok = s.values[attr.Equivalent()] + if ok { + d.measure(ctx, value, droppedAttr) + return + } + d = &datapoint[N]{ + res: s.newRes(attr), + attrs: attr, + } + d.measure(ctx, value, droppedAttr) s.values[attr.Equivalent()] = d } @@ -85,6 +102,9 @@ func (s *lastValue[N]) cumulative( // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) + // Acquire a full lock to ensure there are no concurrent measure() calls. + // If we only used a RLock, we could observe "partial" measurements, such + // as an exemplar without a last value. s.Lock() defer s.Unlock() @@ -109,7 +129,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in (*dest)[i].Attributes = v.attrs (*dest)[i].StartTime = s.start (*dest)[i].Time = t - (*dest)[i].Value = v.value + (*dest)[i].Value = v.value.load() collectExemplars(&(*dest)[i].Exemplars, v.res.Collect) i++ } @@ -138,6 +158,9 @@ func (s *precomputedLastValue[N]) delta( // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) + // Acquire a full lock to ensure there are no concurrent measure() calls. + // If we only used a RLock, we could observe "partial" measurements, such + // as an exemplar without a last value. s.Lock() defer s.Unlock() @@ -160,6 +183,9 @@ func (s *precomputedLastValue[N]) cumulative( // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) + // Acquire a full lock to ensure there are no concurrent measure() calls. + // If we only used a RLock, we could observe "partial" measurements, such + // as an exemplar without a last value. s.Lock() defer s.Unlock() diff --git a/sdk/metric/internal/aggregate/limit.go b/sdk/metric/internal/aggregate/limit.go index 9ea0251edd7..c19a1aff68f 100644 --- a/sdk/metric/internal/aggregate/limit.go +++ b/sdk/metric/internal/aggregate/limit.go @@ -30,7 +30,7 @@ func newLimiter[V any](aggregation int) limiter[V] { // aggregation cardinality limit for the existing measurements. If it will, // overflowSet is returned. Otherwise, if it will not exceed the limit, or the // limit is not set (limit <= 0), attr is returned. -func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set { +func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]*V) attribute.Set { if l.aggLimit > 0 { _, exists := measurements[attrs.Equivalent()] if !exists && len(measurements) >= l.aggLimit-1 { diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go index c61bae0e24f..236c1af43af 100644 --- a/sdk/metric/internal/aggregate/limit_test.go +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -12,7 +12,8 @@ import ( ) func TestLimiterAttributes(t *testing.T) { - m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}} + var val struct{} + m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val} t.Run("NoLimit", func(t *testing.T) { l := newLimiter[struct{}](0) assert.Equal(t, alice, l.Attributes(alice, m)) @@ -43,7 +44,8 @@ func TestLimiterAttributes(t *testing.T) { var limitedAttr attribute.Set func BenchmarkLimiterAttributes(b *testing.B) { - m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}} + var val struct{} + m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val} l := newLimiter[struct{}](2) b.ReportAllocs() diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 1b4b2304c0b..b471080a9a2 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -13,41 +13,58 @@ import ( ) type sumValue[N int64 | float64] struct { - n N + n atomicSum[N] res FilteredExemplarReservoir[N] attrs attribute.Set } +func (s *sumValue[N]) measure(ctx context.Context, value N, droppedAttr []attribute.KeyValue) { + s.res.Offer(ctx, value, droppedAttr) + s.n.add(value) +} + // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { - sync.Mutex + sync.RWMutex newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[sumValue[N]] - values map[attribute.Distinct]sumValue[N] + values map[attribute.Distinct]*sumValue[N] } func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), - values: make(map[attribute.Distinct]sumValue[N]), + values: make(map[attribute.Distinct]*sumValue[N]), } } func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - s.Lock() - defer s.Unlock() - + // Hold the RLock even after we are done reading from the values map to + // ensure we don't race with collection. + s.RLock() attr := s.limit.Attributes(fltrAttr, s.values) v, ok := s.values[attr.Equivalent()] - if !ok { - v.res = s.newRes(attr) + if ok { + v.measure(ctx, value, droppedAttr) + s.RUnlock() + return } - - v.attrs = attr - v.n += value - v.res.Offer(ctx, value, droppedAttr) - + s.RUnlock() + // Switch to a full lock to add a new element to the map. + s.Lock() + defer s.Unlock() + // Check that the element wasn't added since we last checked. + v, ok = s.values[attr.Equivalent()] + if ok { + v.measure(ctx, value, droppedAttr) + return + } + v = &sumValue[N]{ + res: s.newRes(attr), + attrs: attr, + } + v.measure(ctx, value, droppedAttr) s.values[attr.Equivalent()] = v } @@ -81,6 +98,9 @@ func (s *sum[N]) delta( sData.Temporality = metricdata.DeltaTemporality sData.IsMonotonic = s.monotonic + // Acquire a full lock to ensure there are no concurrent measure() calls. + // If we only used a RLock, we could observe "partial" measurements, such + // as an exemplar without a sum value. s.Lock() defer s.Unlock() @@ -92,7 +112,7 @@ func (s *sum[N]) delta( dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = val.n + dPts[i].Value = val.n.load() collectExemplars(&dPts[i].Exemplars, val.res.Collect) i++ } @@ -118,6 +138,9 @@ func (s *sum[N]) cumulative( sData.Temporality = metricdata.CumulativeTemporality sData.IsMonotonic = s.monotonic + // Acquire a full lock to ensure there are no concurrent measure() calls. + // If we only used a RLock, we could observe "partial" measurements, such + // as an exemplar without a sum value. s.Lock() defer s.Unlock() @@ -129,7 +152,7 @@ func (s *sum[N]) cumulative( dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value.n + dPts[i].Value = value.n.load() collectExemplars(&dPts[i].Exemplars, value.res.Collect) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute @@ -181,6 +204,9 @@ func (s *precomputedSum[N]) delta( sData.Temporality = metricdata.DeltaTemporality sData.IsMonotonic = s.monotonic + // Acquire a full lock to ensure there are no concurrent measure() calls. + // If we only used a RLock, we could observe "partial" measurements, such + // as an exemplar without a sum value. s.Lock() defer s.Unlock() @@ -189,7 +215,7 @@ func (s *precomputedSum[N]) delta( var i int for key, value := range s.values { - delta := value.n - s.reported[key] + delta := value.n.load() - s.reported[key] dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start @@ -197,7 +223,7 @@ func (s *precomputedSum[N]) delta( dPts[i].Value = delta collectExemplars(&dPts[i].Exemplars, value.res.Collect) - newReported[key] = value.n + newReported[key] = value.n.load() i++ } // Unused attribute sets do not report. @@ -223,6 +249,9 @@ func (s *precomputedSum[N]) cumulative( sData.Temporality = metricdata.CumulativeTemporality sData.IsMonotonic = s.monotonic + // Acquire a full lock to ensure there are no concurrent measure() calls. + // If we only used a RLock, we could observe "partial" measurements, such + // as an exemplar without a sum value. s.Lock() defer s.Unlock() @@ -234,7 +263,7 @@ func (s *precomputedSum[N]) cumulative( dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = val.n + dPts[i].Value = val.n.load() collectExemplars(&dPts[i].Exemplars, val.res.Collect) i++