Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421)
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
4 changes: 4 additions & 0 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"math"
"math/rand/v2"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -37,6 +38,7 @@ var _ Reservoir = &FixedSizeReservoir{}
type FixedSizeReservoir struct {
reservoir.ConcurrentSafe
*storage
mu sync.Mutex

// count is the number of measurement seen.
count int64
Expand Down Expand Up @@ -192,6 +194,8 @@ func (r *FixedSizeReservoir) advance() {
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"slices"
"sort"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -42,6 +43,7 @@ var _ Reservoir = &HistogramReservoir{}
type HistogramReservoir struct {
reservoir.ConcurrentSafe
*storage
mu sync.Mutex

// bounds are bucket bounds in ascending order.
bounds []float64
Expand Down Expand Up @@ -76,3 +78,12 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
defer r.mu.Unlock()
r.store(idx, m)
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
r.storage.Collect(dest)
}
4 changes: 0 additions & 4 deletions sdk/metric/exemplar/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -14,7 +13,6 @@ import (

// storage is an exemplar storage for [Reservoir] implementations.
type storage struct {
mu sync.Mutex
// measurements are the measurements sampled.
//
// This does not use []metricdata.Exemplar because it potentially would
Expand All @@ -34,8 +32,6 @@ func (r *storage) store(idx int, m measurement) {
//
// The Reservoir state is preserved after this call.
func (r *storage) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
*dest = reset(*dest, len(r.measurements), len(r.measurements))
var n int
for _, m := range r.measurements {
Expand Down
7 changes: 4 additions & 3 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
s := newDeltaSum[N](monotonic, b.AggregationLimit, b.resFunc())
return b.filter(s.measure), s.collect
default:
return b.filter(s.measure), s.cumulative
s := newCumulativeSum[N](monotonic, b.AggregationLimit, b.resFunc())
return b.filter(s.measure), s.collect
}
}

Expand Down
184 changes: 184 additions & 0 deletions sdk/metric/internal/aggregate/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"math"
"runtime"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel/attribute"
)

// atomicCounter is an efficient way of adding to a number which is either an
// int64 or float64. It is designed to be efficient when adding whole
// numbers, regardless of whether N is an int64 or float64.
//
// Inspired by the Prometheus counter implementation:
// https://github.com/prometheus/client_golang/blob/14ccb93091c00f86b85af7753100aa372d63602b/prometheus/counter.go#L108
type atomicCounter[N int64 | float64] struct {
// nFloatBits contains only the non-integer portion of the counter.
nFloatBits atomic.Uint64
// nInt contains only the integer portion of the counter.
nInt atomic.Int64
}

// load returns the current value. The caller must ensure all calls to add have
// returned prior to calling load.
func (n *atomicCounter[N]) load() N {
fval := math.Float64frombits(n.nFloatBits.Load())
ival := n.nInt.Load()
return N(fval + float64(ival))
}

func (n *atomicCounter[N]) add(value N) {
ival := int64(value)
// This case is where the value is an int, or if it is a whole-numbered float.
if float64(ival) == float64(value) {
n.nInt.Add(ival)
return
}

// Value must be a float below.
for {
oldBits := n.nFloatBits.Load()
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
if n.nFloatBits.CompareAndSwap(oldBits, newBits) {
return
}
}
}

// hotColdWaitGroup is a synchronization primitive which enables lockless
// writes for concurrent writers and enables a reader to acquire exclusive
// access to a snapshot of state including only completed operations.
// Conceptually, it can be thought of as a "hot" wait group,
// and a "cold" wait group, with the ability for the reader to atomically swap
// the hot and cold wait groups, and wait for the now-cold wait group to
// complete.
//
// Inspired by the prometheus/client_golang histogram implementation:
// https://github.com/prometheus/client_golang/blob/a974e0d45e0aa54c65492559114894314d8a2447/prometheus/histogram.go#L725
//
// Usage:
//
// var hcwg hotColdWaitGroup
// var data [2]any
//
// func write() {
// hotIdx := hcwg.start()
// defer hcwg.done(hotIdx)
// // modify data without locking
// data[hotIdx].update()
// }
//
// func read() {
// coldIdx := hcwg.swapHotAndWait()
// // read data now that all writes to the cold data have completed.
// data[coldIdx].read()
// }
type hotColdWaitGroup struct {
// startedCountAndHotIdx contains a 63-bit counter in the lower bits,
// and a 1 bit hot index to denote which of the two data-points new
// measurements to write to. These are contained together so that read()
// can atomically swap the hot bit, reset the started writes to zero, and
// read the number writes that were started prior to the hot bit being
// swapped.
startedCountAndHotIdx atomic.Uint64
// endedCounts is the number of writes that have completed to each
// dataPoint.
endedCounts [2]atomic.Uint64
}

// start returns the hot index that the writer should write to. The returned
// hot index is 0 or 1. The caller must call done(hot index) after it finishes
// its operation. start() is safe to call concurrently with other methods.
func (l *hotColdWaitGroup) start() uint64 {
// We increment h.startedCountAndHotIdx so that the counter in the lower
// 63 bits gets incremented. At the same time, we get the new value
// back, which we can use to return the currently-hot index.
return l.startedCountAndHotIdx.Add(1) >> 63
}

// done signals to the reader that an operation has fully completed.
// done is safe to call concurrently.
func (l *hotColdWaitGroup) done(hotIdx uint64) {
l.endedCounts[hotIdx].Add(1)
}

// swapHotAndWait swaps the hot bit, waits for all start() calls to be done(),
// and then returns the now-cold index for the reader to read from. The
// returned index is 0 or 1. swapHotAndWait must not be called concurrently.
func (l *hotColdWaitGroup) swapHotAndWait() uint64 {
n := l.startedCountAndHotIdx.Load()
coldIdx := (^n) >> 63
// Swap the hot and cold index while resetting the started measurements
// count to zero.
n = l.startedCountAndHotIdx.Swap((coldIdx << 63))
hotIdx := n >> 63
startedCount := n & ((1 << 63) - 1)
// Wait for all measurements to the previously-hot map to finish.
for startedCount != l.endedCounts[hotIdx].Load() {
runtime.Gosched() // Let measurements complete.
}
// reset the number of ended operations
l.endedCounts[hotIdx].Store(0)
return hotIdx
}

// limitedSyncMap is a sync.Map which enforces the aggregation limit on
// attribute sets and provides a Len() function.
type limitedSyncMap struct {
sync.Map
aggLimit int
len int
lenMux sync.Mutex
}

func (m *limitedSyncMap) LoadOrStoreAttr(fltrAttr attribute.Set, newValue func(attribute.Set) any) any {
actual, loaded := m.Load(fltrAttr.Equivalent())
if loaded {
return actual
}
// If the overflow set exists, assume we have already overflowed and don't
// bother with the slow path below.
actual, loaded = m.Load(overflowSet.Equivalent())
if loaded {
return actual
}
// Slow path: add a new attribute set.
m.lenMux.Lock()
defer m.lenMux.Unlock()

// re-fetch now that we hold the lock to ensure we don't use the overflow
// set unless we are sure the attribute set isn't being written
// concurrently.
actual, loaded = m.Load(fltrAttr.Equivalent())
if loaded {
return actual
}

if m.aggLimit > 0 && m.len >= m.aggLimit-1 {
fltrAttr = overflowSet
}
actual, loaded = m.LoadOrStore(fltrAttr.Equivalent(), newValue(fltrAttr))
if !loaded {
m.len++
}
return actual
}

func (m *limitedSyncMap) Clear() {
m.lenMux.Lock()
defer m.lenMux.Unlock()
m.len = 0
m.Map.Clear()
}

func (m *limitedSyncMap) Len() int {
m.lenMux.Lock()
defer m.lenMux.Unlock()
return m.len
}
78 changes: 78 additions & 0 deletions sdk/metric/internal/aggregate/atomic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"math"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAtomicSumAddFloatConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
var aSum atomicCounter[float64]
for _, in := range []float64{
0.2,
0.25,
1.6,
10.55,
42.4,
} {
wg.Add(1)
go func() {
defer wg.Done()
aSum.add(in)
}()
}
wg.Wait()
assert.Equal(t, float64(55), math.Round(aSum.load()))
}

func TestAtomicSumAddIntConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
var aSum atomicCounter[int64]
for _, in := range []int64{
1,
2,
3,
4,
5,
} {
wg.Add(1)
go func() {
defer wg.Done()
aSum.add(in)
}()
}
wg.Wait()
assert.Equal(t, int64(15), aSum.load())
}

func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
hcwg := &hotColdWaitGroup{}
var data [2]uint64
for range 5 {
wg.Add(1)
go func() {
defer wg.Done()
hotIdx := hcwg.start()
defer hcwg.done(hotIdx)
atomic.AddUint64(&data[hotIdx], 1)
}()
}
for range 2 {
readIdx := hcwg.swapHotAndWait()
assert.NotPanics(t, func() {
// reading without using atomics should not panic since we are
// reading from the cold element, and have waited for all writes to
// finish.
t.Logf("read value %+v", data[readIdx])
})
}
wg.Wait()
}
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64](
maxScale: maxScale,

newRes: r,
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
limit: newLimiter[expoHistogramDataPoint[N]](limit),
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),

start: now(),
Expand All @@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct {
maxScale int32

newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
limit limiter[expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/internal/aggregate/filtered_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func NewFilteredExemplarReservoir[N int64 | float64](

func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurement.
ts := time.Now()
if !f.concurrentSafe {
f.reservoirMux.Lock()
defer f.reservoirMux.Unlock()
}
// only record the current time if we are sampling this measurement.
f.reservoir.Offer(ctx, ts, exemplar.NewValue(val), attr)
}
}
Expand Down
Loading