Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421)
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
4 changes: 4 additions & 0 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"math"
"math/rand/v2"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -37,6 +38,7 @@ var _ Reservoir = &FixedSizeReservoir{}
type FixedSizeReservoir struct {
reservoir.ConcurrentSafe
*storage
mu sync.Mutex

// count is the number of measurement seen.
count int64
Expand Down Expand Up @@ -192,6 +194,8 @@ func (r *FixedSizeReservoir) advance() {
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"slices"
"sort"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -42,6 +43,7 @@ var _ Reservoir = &HistogramReservoir{}
type HistogramReservoir struct {
reservoir.ConcurrentSafe
*storage
mu sync.Mutex

// bounds are bucket bounds in ascending order.
bounds []float64
Expand Down Expand Up @@ -76,3 +78,12 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
defer r.mu.Unlock()
r.store(idx, m)
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
r.storage.Collect(dest)
}
4 changes: 0 additions & 4 deletions sdk/metric/exemplar/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -14,7 +13,6 @@ import (

// storage is an exemplar storage for [Reservoir] implementations.
type storage struct {
mu sync.Mutex
// measurements are the measurements sampled.
//
// This does not use []metricdata.Exemplar because it potentially would
Expand All @@ -34,8 +32,6 @@ func (r *storage) store(idx int, m measurement) {
//
// The Reservoir state is preserved after this call.
func (r *storage) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
*dest = reset(*dest, len(r.measurements), len(r.measurements))
var n int
for _, m := range r.measurements {
Expand Down
3 changes: 2 additions & 1 deletion sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
s := newDeltaSum[N](monotonic, b.AggregationLimit, b.resFunc())
return b.filter(s.measure), s.delta
default:
s := newCumulativeSum[N](monotonic, b.AggregationLimit, b.resFunc())
return b.filter(s.measure), s.cumulative
}
}
Expand Down
184 changes: 184 additions & 0 deletions sdk/metric/internal/aggregate/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"math"
"runtime"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel/attribute"
)

// atomicCounter is an efficient way of adding to a number which is either an
// int64 or float64. It is designed to be efficient when adding whole
// numbers, regardless of whether N is an int64 or float64.
//
// Inspired by the Prometheus counter implementation:
// https://github.com/prometheus/client_golang/blob/14ccb93091c00f86b85af7753100aa372d63602b/prometheus/counter.go#L108
type atomicCounter[N int64 | float64] struct {
// nFloatBits contains only the non-integer portion of the counter.
nFloatBits atomic.Uint64
// nInt contains only the integer portion of the counter.
nInt atomic.Int64
}

// load returns the current value. The caller must ensure all calls to add have
// returned prior to calling load.
func (n *atomicCounter[N]) load() N {
fval := math.Float64frombits(n.nFloatBits.Load())
ival := n.nInt.Load()
return N(fval + float64(ival))
}

func (n *atomicCounter[N]) add(value N) {
ival := int64(value)
// This case is where the value is an int, or if it is a whole-numbered float.
if float64(ival) == float64(value) {
n.nInt.Add(ival)
return
}

// Value must be a float below.
for {
oldBits := n.nFloatBits.Load()
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
if n.nFloatBits.CompareAndSwap(oldBits, newBits) {
return
}
}
}

// hotColdWaitGroup is a synchronization primitive which enables lockless
// writes for concurrent writers and enables a reader to acquire exclusive
// access to a snapshot of state including only completed operations.
// Conceptually, it can be thought of as a "hot" wait group,
// and a "cold" wait group, with the ability for the reader to atomically swap
// the hot and cold wait groups, and wait for the now-cold wait group to
// complete.
//
// Inspired by the prometheus/client_golang histogram implementation:
// https://github.com/prometheus/client_golang/blob/a974e0d45e0aa54c65492559114894314d8a2447/prometheus/histogram.go#L725
//
// Usage:
//
// var hcwg hotColdWaitGroup
// var data [2]any
//
// func write() {
// hotIdx := hcwg.start()
// defer hcwg.done(hotIdx)
// // modify data without locking
// data[hotIdx].update()
// }
//
// func read() {
// coldIdx := hcwg.swapHotAndWait()
// // read data now that all writes to the cold data have completed.
// data[coldIdx].read()
// }
type hotColdWaitGroup struct {
// startedCountAndHotIdx contains a 63-bit counter in the lower bits,
// and a 1 bit hot index to denote which of the two data-points new
// measurements to write to. These are contained together so that read()
// can atomically swap the hot bit, reset the started writes to zero, and
// read the number writes that were started prior to the hot bit being
// swapped.
startedCountAndHotIdx atomic.Uint64
// endedCounts is the number of writes that have completed to each
// dataPoint.
endedCounts [2]atomic.Uint64
}

// start returns the hot index that the writer should write to. The returned
// hot index is 0 or 1. The caller must call done(hot index) after it finishes
// its operation. start() is safe to call concurrently with other methods.
func (l *hotColdWaitGroup) start() uint64 {
// We increment h.startedCountAndHotIdx so that the counter in the lower
// 63 bits gets incremented. At the same time, we get the new value
// back, which we can use to return the currently-hot index.
return l.startedCountAndHotIdx.Add(1) >> 63
}

// done signals to the reader that an operation has fully completed.
// done is safe to call concurrently.
func (l *hotColdWaitGroup) done(hotIdx uint64) {
l.endedCounts[hotIdx].Add(1)
}

// swapHotAndWait swaps the hot bit, waits for all start() calls to be done(),
// and then returns the now-cold index for the reader to read from. The
// returned index is 0 or 1. swapHotAndWait must not be called concurrently.
func (l *hotColdWaitGroup) swapHotAndWait() uint64 {
n := l.startedCountAndHotIdx.Load()
coldIdx := (^n) >> 63
// Swap the hot and cold index while resetting the started measurements
// count to zero.
n = l.startedCountAndHotIdx.Swap((coldIdx << 63))
hotIdx := n >> 63
startedCount := n & ((1 << 63) - 1)
// Wait for all measurements to the previously-hot map to finish.
for startedCount != l.endedCounts[hotIdx].Load() {
runtime.Gosched() // Let measurements complete.
}
// reset the number of ended operations
l.endedCounts[hotIdx].Store(0)
return hotIdx
}

// limitedSyncMap is a sync.Map which enforces the aggregation limit on
// attribute sets and provides a Len() function.
type limitedSyncMap struct {
sync.Map
aggLimit int
len int
lenMux sync.Mutex
}

func (m *limitedSyncMap) LoadOrStoreAttr(fltrAttr attribute.Set, newValue func(attribute.Set) any) any {
actual, loaded := m.Load(fltrAttr.Equivalent())
if loaded {
return actual
}
// If the overflow set exists, assume we have already overflowed and don't
// bother with the slow path below.
actual, loaded = m.Load(overflowSet.Equivalent())
if loaded {
return actual
}
// Slow path: add a new attribute set.
m.lenMux.Lock()
defer m.lenMux.Unlock()

// re-fetch now that we hold the lock to ensure we don't use the overflow
// set unless we are sure the attribute set isn't being written
// concurrently.
actual, loaded = m.Load(fltrAttr.Equivalent())
if loaded {
return actual
}

if m.aggLimit > 0 && m.len >= m.aggLimit-1 {
fltrAttr = overflowSet
}
actual, loaded = m.LoadOrStore(fltrAttr.Equivalent(), newValue(fltrAttr))
if !loaded {
m.len++
}
return actual
}

func (m *limitedSyncMap) Clear() {
m.lenMux.Lock()
defer m.lenMux.Unlock()
m.len = 0
m.Map.Clear()
}

func (m *limitedSyncMap) Len() int {
m.lenMux.Lock()
defer m.lenMux.Unlock()
return m.len
}
78 changes: 78 additions & 0 deletions sdk/metric/internal/aggregate/atomic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"math"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAtomicSumAddFloatConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
var aSum atomicCounter[float64]
for _, in := range []float64{
0.2,
0.25,
1.6,
10.55,
42.4,
} {
wg.Add(1)
go func() {
defer wg.Done()
aSum.add(in)
}()
}
wg.Wait()
assert.Equal(t, float64(55), math.Round(aSum.load()))
}

func TestAtomicSumAddIntConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
var aSum atomicCounter[int64]
for _, in := range []int64{
1,
2,
3,
4,
5,
} {
wg.Add(1)
go func() {
defer wg.Done()
aSum.add(in)
}()
}
wg.Wait()
assert.Equal(t, int64(15), aSum.load())
}

func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
hcwg := &hotColdWaitGroup{}
var data [2]uint64
for range 5 {
wg.Add(1)
go func() {
defer wg.Done()
hotIdx := hcwg.start()
defer hcwg.done(hotIdx)
atomic.AddUint64(&data[hotIdx], 1)
}()
}
for range 2 {
readIdx := hcwg.swapHotAndWait()
assert.NotPanics(t, func() {
// reading without using atomics should not panic since we are
// reading from the cold element, and have waited for all writes to
// finish.
t.Logf("read value %+v", data[readIdx])
})
}
wg.Wait()
}
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64](
maxScale: maxScale,

newRes: r,
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
limit: newLimiter[expoHistogramDataPoint[N]](limit),
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),

start: now(),
Expand All @@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct {
maxScale int32

newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
limit limiter[expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/internal/aggregate/filtered_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func NewFilteredExemplarReservoir[N int64 | float64](

func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurement.
ts := time.Now()
if !f.concurrentSafe {
f.reservoirMux.Lock()
defer f.reservoirMux.Unlock()
}
// only record the current time if we are sampling this measurement.
f.reservoir.Offer(ctx, ts, exemplar.NewValue(val), attr)
}
}
Expand Down
Loading