Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/log` synchronously de-duplicates the passed attributes instead of delegating it to the returned `LoggerOption`. (#7266)
- `Distinct` in `go.opentelemetry.io/otel/attribute` is no longer guaranteed to uniquely identify an attribute set. Collisions between `Distinct` values for different Sets are possible with extremely high cardinality (billions of series per instrument), but are highly unlikely. (#7175)
- The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421)
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7189)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
s := newSum[N](monotonic, b.Temporality, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand Down
126 changes: 126 additions & 0 deletions sdk/metric/internal/aggregate/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"math"
"runtime"
"sync/atomic"
)

// atomicCounter is an efficient way of adding to a number which is either an
// int64 or float64. It is designed to be efficient when adding whole
// numbers, regardless of whether N is an int64 or float64.
//
// Inspired by the Prometheus counter implementation:
// https://github.com/prometheus/client_golang/blob/14ccb93091c00f86b85af7753100aa372d63602b/prometheus/counter.go#L108
type atomicCounter[N int64 | float64] struct {
// nFloatBits contains only the non-integer portion of the counter.
nFloatBits atomic.Uint64
// nInt contains only the integer portion of the counter.
nInt atomic.Int64
}

// load returns the current value. The caller must ensure all calls to add have
// returned prior to calling load.
func (n *atomicCounter[N]) load() N {
fval := math.Float64frombits(n.nFloatBits.Load())
ival := n.nInt.Load()
return N(fval + float64(ival))
}

func (n *atomicCounter[N]) add(value N) {
ival := int64(value)
// This case is where the value is an int, or if it is a whole-numbered float.
if float64(ival) == float64(value) {
n.nInt.Add(ival)
return
}

// Value must be a float below.
for {
oldBits := n.nFloatBits.Load()
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
if n.nFloatBits.CompareAndSwap(oldBits, newBits) {
return
}
}
}

// hotColdWaitGroup is a synchronization primitive which enables lockless
// writes for concurrent writers and enables a reader to acquire exclusive
// access to a snapshot of state including only completed operations.
// Conceptually, it can be thought of as a "hot" wait group,
// and a "cold" wait group, with the ability for the reader to atomically swap
// the hot and cold wait groups, and wait for the now-cold wait group to
// complete.
//
// Inspired by the prometheus/client_golang histogram implementation:
// https://github.com/prometheus/client_golang/blob/a974e0d45e0aa54c65492559114894314d8a2447/prometheus/histogram.go#L725
//
// Usage:
//
// var hcwg hotColdWaitGroup
// var data [2]any
//
// func write() {
// hotIdx := hcwg.start()
// defer hcwg.done(hotIdx)
// // modify data without locking
// data[hotIdx].update()
// }
//
// func read() {
// coldIdx := hcwg.swapHotAndWait()
// // read data now that all writes to the cold data have completed.
// data[coldIdx].read()
// }
type hotColdWaitGroup struct {
// startedCountAndHotIdx contains a 63-bit counter in the lower bits,
// and a 1 bit hot index to denote which of the two data-points new
// measurements to write to. These are contained together so that read()
// can atomically swap the hot bit, reset the started writes to zero, and
// read the number writes that were started prior to the hot bit being
// swapped.
startedCountAndHotIdx atomic.Uint64
// endedCounts is the number of writes that have completed to each
// dataPoint.
endedCounts [2]atomic.Uint64
}

// start returns the hot index that the writer should write to. The returned
// hot index is 0 or 1. The caller must call done(hot index) after it finishes
// its operation. start() is safe to call concurrently with other methods.
func (l *hotColdWaitGroup) start() uint64 {
// We increment h.startedCountAndHotIdx so that the counter in the lower
// 63 bits gets incremented. At the same time, we get the new value
// back, which we can use to return the currently-hot index.
return l.startedCountAndHotIdx.Add(1) >> 63
}

// done signals to the reader that an operation has fully completed.
// done is safe to call concurrently.
func (l *hotColdWaitGroup) done(hotIdx uint64) {
l.endedCounts[hotIdx].Add(1)
}

// swapHotAndWait swaps the hot bit, waits for all start() calls to be done(),
// and then returns the now-cold index for the reader to read from. The
// returned index is 0 or 1. swapHotAndWait must not be called concurrently.
func (l *hotColdWaitGroup) swapHotAndWait() uint64 {
n := l.startedCountAndHotIdx.Load()
coldIdx := (^n) >> 63
// Swap the hot and cold index while resetting the started measurements
// count to zero.
n = l.startedCountAndHotIdx.Swap((coldIdx << 63))
hotIdx := n >> 63
startedCount := n & ((1 << 63) - 1)
// Wait for all measurements to the previously-hot map to finish.
for startedCount != l.endedCounts[hotIdx].Load() {
runtime.Gosched() // Let measurements complete.
}
// reset the number of ended operations
l.endedCounts[hotIdx].Store(0)
return hotIdx
}
78 changes: 78 additions & 0 deletions sdk/metric/internal/aggregate/atomic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"math"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAtomicSumAddFloatConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
var aSum atomicCounter[float64]
for _, in := range []float64{
0.2,
0.25,
1.6,
10.55,
42.4,
} {
wg.Add(1)
go func() {
defer wg.Done()
aSum.add(in)
}()
}
wg.Wait()
assert.Equal(t, float64(55), math.Round(aSum.load()))
}

func TestAtomicSumAddIntConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
var aSum atomicCounter[int64]
for _, in := range []int64{
1,
2,
3,
4,
5,
} {
wg.Add(1)
go func() {
defer wg.Done()
aSum.add(in)
}()
}
wg.Wait()
assert.Equal(t, int64(15), aSum.load())
}

func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
hcwg := &hotColdWaitGroup{}
var data [2]uint64
for range 5 {
wg.Add(1)
go func() {
defer wg.Done()
hotIdx := hcwg.start()
defer hcwg.done(hotIdx)
atomic.AddUint64(&data[hotIdx], 1)
}()
}
for range 2 {
readIdx := hcwg.swapHotAndWait()
assert.NotPanics(t, func() {
// reading without using atomics should not panic since we are
// reading from the cold element, and have waited for all writes to
// finish.
t.Logf("read value %+v", data[readIdx])
})
}
wg.Wait()
}
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64](
maxScale: maxScale,

newRes: r,
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
limit: newLimiter[expoHistogramDataPoint[N]](limit),
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),

start: now(),
Expand All @@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct {
maxScale int32

newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
limit limiter[expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

Expand Down
3 changes: 2 additions & 1 deletion sdk/metric/internal/aggregate/filtered_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type FilteredExemplarReservoir[N int64 | float64] interface {

// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct {
mu sync.Mutex
filter exemplar.Filter
reservoir exemplar.Reservoir
// The exemplar.Reservoir is not required to be concurrent safe, but
Expand All @@ -54,12 +55,12 @@ func NewFilteredExemplarReservoir[N int64 | float64](

func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurement.
ts := time.Now()
if !f.concurrentSafe {
f.reservoirMux.Lock()
defer f.reservoirMux.Unlock()
}
// only record the current time if we are sampling this measurement.
f.reservoir.Offer(ctx, ts, exemplar.NewValue(val), attr)
}
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type histValues[N int64 | float64] struct {
bounds []float64

newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
limit limiter[buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}
Expand All @@ -74,7 +74,7 @@ func newHistValues[N int64 | float64](
noSum: noSum,
bounds: b,
newRes: r,
limit: newLimiter[*buckets[N]](limit),
limit: newLimiter[buckets[N]](limit),
values: make(map[attribute.Distinct]*buckets[N]),
}
}
Expand Down
11 changes: 6 additions & 5 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredEx
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
values: make(map[attribute.Distinct]datapoint[N]),
values: make(map[attribute.Distinct]*datapoint[N]),
start: now(),
}
}
Expand All @@ -34,7 +34,7 @@ type lastValue[N int64 | float64] struct {

newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
values map[attribute.Distinct]*datapoint[N]
start time.Time
}

Expand All @@ -45,9 +45,10 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
d, ok := s.values[fltrAttr.Equivalent()]
if !ok {
fltrAttr = s.limit.Attributes(fltrAttr, s.values)
d = s.values[fltrAttr.Equivalent()]
d.res = s.newRes(fltrAttr)
d.attrs = fltrAttr
d = &datapoint[N]{
res: s.newRes(fltrAttr),
attrs: fltrAttr,
}
}

d.value = value
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/internal/aggregate/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newLimiter[V any](aggregation int) limiter[V] {
// aggregation cardinality limit for the existing measurements. If it will,
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
// limit is not set (limit <= 0), attr is returned.
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set {
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]*V) attribute.Set {
if l.aggLimit > 0 {
_, exists := measurements[attrs.Equivalent()]
if !exists && len(measurements) >= l.aggLimit-1 {
Expand Down
6 changes: 4 additions & 2 deletions sdk/metric/internal/aggregate/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

func TestLimiterAttributes(t *testing.T) {
m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}}
var val struct{}
m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val}
t.Run("NoLimit", func(t *testing.T) {
l := newLimiter[struct{}](0)
assert.Equal(t, alice, l.Attributes(alice, m))
Expand Down Expand Up @@ -43,7 +44,8 @@ func TestLimiterAttributes(t *testing.T) {
var limitedAttr attribute.Set

func BenchmarkLimiterAttributes(b *testing.B) {
m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}}
var val struct{}
m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val}
l := newLimiter[struct{}](2)

b.ReportAllocs()
Expand Down
Loading