diff --git a/CHANGELOG.md b/CHANGELOG.md index b02ad434482..9ea58bd8178 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/log` synchronously de-duplicates the passed attributes instead of delegating it to the returned `LoggerOption`. (#7266) - `Distinct` in `go.opentelemetry.io/otel/attribute` is no longer guaranteed to uniquely identify an attribute set. Collisions between `Distinct` values for different Sets are possible with extremely high cardinality (billions of series per instrument), but are highly unlikely. (#7175) - The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421) +- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7189) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 0321da68150..8ab4bb24f8f 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -110,7 +110,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati // Sum returns a sum aggregate function input and output. func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { - s := newSum[N](monotonic, b.AggregationLimit, b.resFunc()) + s := newSum[N](monotonic, b.Temporality, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta diff --git a/sdk/metric/internal/aggregate/atomic.go b/sdk/metric/internal/aggregate/atomic.go new file mode 100644 index 00000000000..b85fc758369 --- /dev/null +++ b/sdk/metric/internal/aggregate/atomic.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "math" + "runtime" + "sync/atomic" +) + +// atomicCounter is an efficient way of adding to a number which is either an +// int64 or float64. It is designed to be efficient when adding whole +// numbers, regardless of whether N is an int64 or float64. +// +// Inspired by the Prometheus counter implementation: +// https://github.com/prometheus/client_golang/blob/14ccb93091c00f86b85af7753100aa372d63602b/prometheus/counter.go#L108 +type atomicCounter[N int64 | float64] struct { + // nFloatBits contains only the non-integer portion of the counter. + nFloatBits atomic.Uint64 + // nInt contains only the integer portion of the counter. + nInt atomic.Int64 +} + +// load returns the current value. The caller must ensure all calls to add have +// returned prior to calling load. +func (n *atomicCounter[N]) load() N { + fval := math.Float64frombits(n.nFloatBits.Load()) + ival := n.nInt.Load() + return N(fval + float64(ival)) +} + +func (n *atomicCounter[N]) add(value N) { + ival := int64(value) + // This case is where the value is an int, or if it is a whole-numbered float. + if float64(ival) == float64(value) { + n.nInt.Add(ival) + return + } + + // Value must be a float below. + for { + oldBits := n.nFloatBits.Load() + newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value)) + if n.nFloatBits.CompareAndSwap(oldBits, newBits) { + return + } + } +} + +// hotColdWaitGroup is a synchronization primitive which enables lockless +// writes for concurrent writers and enables a reader to acquire exclusive +// access to a snapshot of state including only completed operations. +// Conceptually, it can be thought of as a "hot" wait group, +// and a "cold" wait group, with the ability for the reader to atomically swap +// the hot and cold wait groups, and wait for the now-cold wait group to +// complete. +// +// Inspired by the prometheus/client_golang histogram implementation: +// https://github.com/prometheus/client_golang/blob/a974e0d45e0aa54c65492559114894314d8a2447/prometheus/histogram.go#L725 +// +// Usage: +// +// var hcwg hotColdWaitGroup +// var data [2]any +// +// func write() { +// hotIdx := hcwg.start() +// defer hcwg.done(hotIdx) +// // modify data without locking +// data[hotIdx].update() +// } +// +// func read() { +// coldIdx := hcwg.swapHotAndWait() +// // read data now that all writes to the cold data have completed. +// data[coldIdx].read() +// } +type hotColdWaitGroup struct { + // startedCountAndHotIdx contains a 63-bit counter in the lower bits, + // and a 1 bit hot index to denote which of the two data-points new + // measurements to write to. These are contained together so that read() + // can atomically swap the hot bit, reset the started writes to zero, and + // read the number writes that were started prior to the hot bit being + // swapped. + startedCountAndHotIdx atomic.Uint64 + // endedCounts is the number of writes that have completed to each + // dataPoint. + endedCounts [2]atomic.Uint64 +} + +// start returns the hot index that the writer should write to. The returned +// hot index is 0 or 1. The caller must call done(hot index) after it finishes +// its operation. start() is safe to call concurrently with other methods. +func (l *hotColdWaitGroup) start() uint64 { + // We increment h.startedCountAndHotIdx so that the counter in the lower + // 63 bits gets incremented. At the same time, we get the new value + // back, which we can use to return the currently-hot index. + return l.startedCountAndHotIdx.Add(1) >> 63 +} + +// done signals to the reader that an operation has fully completed. +// done is safe to call concurrently. +func (l *hotColdWaitGroup) done(hotIdx uint64) { + l.endedCounts[hotIdx].Add(1) +} + +// swapHotAndWait swaps the hot bit, waits for all start() calls to be done(), +// and then returns the now-cold index for the reader to read from. The +// returned index is 0 or 1. swapHotAndWait must not be called concurrently. +func (l *hotColdWaitGroup) swapHotAndWait() uint64 { + n := l.startedCountAndHotIdx.Load() + coldIdx := (^n) >> 63 + // Swap the hot and cold index while resetting the started measurements + // count to zero. + n = l.startedCountAndHotIdx.Swap((coldIdx << 63)) + hotIdx := n >> 63 + startedCount := n & ((1 << 63) - 1) + // Wait for all measurements to the previously-hot map to finish. + for startedCount != l.endedCounts[hotIdx].Load() { + runtime.Gosched() // Let measurements complete. + } + // reset the number of ended operations + l.endedCounts[hotIdx].Store(0) + return hotIdx +} diff --git a/sdk/metric/internal/aggregate/atomic_test.go b/sdk/metric/internal/aggregate/atomic_test.go new file mode 100644 index 00000000000..52f053248d7 --- /dev/null +++ b/sdk/metric/internal/aggregate/atomic_test.go @@ -0,0 +1,78 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "math" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAtomicSumAddFloatConcurrentSafe(t *testing.T) { + var wg sync.WaitGroup + var aSum atomicCounter[float64] + for _, in := range []float64{ + 0.2, + 0.25, + 1.6, + 10.55, + 42.4, + } { + wg.Add(1) + go func() { + defer wg.Done() + aSum.add(in) + }() + } + wg.Wait() + assert.Equal(t, float64(55), math.Round(aSum.load())) +} + +func TestAtomicSumAddIntConcurrentSafe(t *testing.T) { + var wg sync.WaitGroup + var aSum atomicCounter[int64] + for _, in := range []int64{ + 1, + 2, + 3, + 4, + 5, + } { + wg.Add(1) + go func() { + defer wg.Done() + aSum.add(in) + }() + } + wg.Wait() + assert.Equal(t, int64(15), aSum.load()) +} + +func TestHotColdWaitGroupConcurrentSafe(t *testing.T) { + var wg sync.WaitGroup + hcwg := &hotColdWaitGroup{} + var data [2]uint64 + for range 5 { + wg.Add(1) + go func() { + defer wg.Done() + hotIdx := hcwg.start() + defer hcwg.done(hotIdx) + atomic.AddUint64(&data[hotIdx], 1) + }() + } + for range 2 { + readIdx := hcwg.swapHotAndWait() + assert.NotPanics(t, func() { + // reading without using atomics should not panic since we are + // reading from the cold element, and have waited for all writes to + // finish. + t.Logf("read value %+v", data[readIdx]) + }) + } + wg.Wait() +} diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index cf4e86acf98..5b3a19c067d 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64]( maxScale: maxScale, newRes: r, - limit: newLimiter[*expoHistogramDataPoint[N]](limit), + limit: newLimiter[expoHistogramDataPoint[N]](limit), values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]), start: now(), @@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct { maxScale int32 newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[*expoHistogramDataPoint[N]] + limit limiter[expoHistogramDataPoint[N]] values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex diff --git a/sdk/metric/internal/aggregate/filtered_reservoir.go b/sdk/metric/internal/aggregate/filtered_reservoir.go index b42dfd357ea..ba7d7fa7e70 100644 --- a/sdk/metric/internal/aggregate/filtered_reservoir.go +++ b/sdk/metric/internal/aggregate/filtered_reservoir.go @@ -29,6 +29,7 @@ type FilteredExemplarReservoir[N int64 | float64] interface { // filteredExemplarReservoir handles the pre-sampled exemplar of measurements made. type filteredExemplarReservoir[N int64 | float64] struct { + mu sync.Mutex filter exemplar.Filter reservoir exemplar.Reservoir // The exemplar.Reservoir is not required to be concurrent safe, but @@ -54,12 +55,12 @@ func NewFilteredExemplarReservoir[N int64 | float64]( func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { if f.filter(ctx) { + // only record the current time if we are sampling this measurement. ts := time.Now() if !f.concurrentSafe { f.reservoirMux.Lock() defer f.reservoirMux.Unlock() } - // only record the current time if we are sampling this measurement. f.reservoir.Offer(ctx, ts, exemplar.NewValue(val), attr) } } diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 639c4d002c9..a094519cf6d 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -52,7 +52,7 @@ type histValues[N int64 | float64] struct { bounds []float64 newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[*buckets[N]] + limit limiter[buckets[N]] values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } @@ -74,7 +74,7 @@ func newHistValues[N int64 | float64]( noSum: noSum, bounds: b, newRes: r, - limit: newLimiter[*buckets[N]](limit), + limit: newLimiter[buckets[N]](limit), values: make(map[attribute.Distinct]*buckets[N]), } } diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 6faf4920c70..3e2ed741505 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -23,7 +23,7 @@ func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredEx return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), - values: make(map[attribute.Distinct]datapoint[N]), + values: make(map[attribute.Distinct]*datapoint[N]), start: now(), } } @@ -34,7 +34,7 @@ type lastValue[N int64 | float64] struct { newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[datapoint[N]] - values map[attribute.Distinct]datapoint[N] + values map[attribute.Distinct]*datapoint[N] start time.Time } @@ -45,9 +45,10 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. d, ok := s.values[fltrAttr.Equivalent()] if !ok { fltrAttr = s.limit.Attributes(fltrAttr, s.values) - d = s.values[fltrAttr.Equivalent()] - d.res = s.newRes(fltrAttr) - d.attrs = fltrAttr + d = &datapoint[N]{ + res: s.newRes(fltrAttr), + attrs: fltrAttr, + } } d.value = value diff --git a/sdk/metric/internal/aggregate/limit.go b/sdk/metric/internal/aggregate/limit.go index 9ea0251edd7..c19a1aff68f 100644 --- a/sdk/metric/internal/aggregate/limit.go +++ b/sdk/metric/internal/aggregate/limit.go @@ -30,7 +30,7 @@ func newLimiter[V any](aggregation int) limiter[V] { // aggregation cardinality limit for the existing measurements. If it will, // overflowSet is returned. Otherwise, if it will not exceed the limit, or the // limit is not set (limit <= 0), attr is returned. -func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set { +func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]*V) attribute.Set { if l.aggLimit > 0 { _, exists := measurements[attrs.Equivalent()] if !exists && len(measurements) >= l.aggLimit-1 { diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go index c61bae0e24f..236c1af43af 100644 --- a/sdk/metric/internal/aggregate/limit_test.go +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -12,7 +12,8 @@ import ( ) func TestLimiterAttributes(t *testing.T) { - m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}} + var val struct{} + m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val} t.Run("NoLimit", func(t *testing.T) { l := newLimiter[struct{}](0) assert.Equal(t, alice, l.Attributes(alice, m)) @@ -43,7 +44,8 @@ func TestLimiterAttributes(t *testing.T) { var limitedAttr attribute.Set func BenchmarkLimiterAttributes(b *testing.B) { - m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}} + var val struct{} + m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val} l := newLimiter[struct{}](2) b.ReportAllocs() diff --git a/sdk/metric/internal/aggregate/map.go b/sdk/metric/internal/aggregate/map.go new file mode 100644 index 00000000000..24ff1982fa7 --- /dev/null +++ b/sdk/metric/internal/aggregate/map.go @@ -0,0 +1,317 @@ +package aggregate + +import ( + "sync" + "sync/atomic" + + "go.opentelemetry.io/otel/attribute" +) + +type cappedMap[N int64 | float64] struct { + mu sync.Mutex + + // read contains the portion of the map's contents that are safe for + // concurrent access (with or without mu held). + // + // The read field itself is always safe to load, but must only be stored + // with mu held. + // + // Entries stored in read may be updated concurrently without mu, but + // updating a previously-expunged entry requires that the entry be copied + // to the dirty map and unexpunged with mu held. + read atomic.Pointer[readOnly[N]] + + // dirty contains the portion of the map's contents that require mu to be + // held. To ensure that the dirty map can be promoted to the read map + // quickly, it also includes all of the non-expunged entries in the read + // map. + // + // Expunged entries are not stored in the dirty map. An expunged entry in + // the clean map must be unexpunged and added to the dirty map before a new + // value can be stored to it. + // + // If the dirty map is nil, the next write to the map will initialize it by + // making a shallow copy of the clean map, omitting stale entries. + dirty map[attribute.Distinct]*entry[N] + + // misses counts the number of loads since the read map was last updated + // that needed to lock mu to determine whether the key was present. + // + // Once enough misses have occurred to cover the cost of copying the dirty + // map, the dirty map will be promoted to the read map (in the unamended + // state) and the next store to the map will make a new dirty copy. + misses int + + // limit is the maximum number of entries allowed in the map. + limit int + // size is an atomic counter for the number of entries. + size int64 + // overflow is called when the map exceeds its limit. + overflow func() *sumValue[N] + overflowed atomic.Bool +} + +func newCappedMap[N int64 | float64](limit int, newRes func(attribute.Set) FilteredExemplarReservoir[N]) *cappedMap[N] { + cm := &cappedMap[N]{ + dirty: make(map[attribute.Distinct]*entry[N]), + limit: limit, + } + cm.overflow = sync.OnceValue(func() *sumValue[N] { + cm.overflowed.Store(true) + return &sumValue[N]{attrs: overflowSet, res: newRes(overflowSet)} + }) + cm.read.Store(&readOnly[N]{m: make(map[attribute.Distinct]*entry[N])}) + return cm +} + +// readOnly is an immutable struct that contains the portion of the map that is +// safe for concurrent access without locking. +type readOnly[N int64 | float64] struct { + m map[attribute.Distinct]*entry[N] + amended bool // true if the dirty map contains keys not in m. +} + +// An entry is a slot in the map corresponding to a particular key. +type entry[N int64 | float64] struct { + p atomic.Pointer[sumValue[N]] +} + +func newEntry[N int64 | float64](i *sumValue[N]) *entry[N] { + e := &entry[N]{} + e.p.Store(i) + return e +} + +func (e *entry[N]) load() (*sumValue[N], bool) { + p := e.p.Load() + if p == nil { + return nil, false + } + return p, true +} + +func (m *cappedMap[N]) loadReadOnly() readOnly[N] { + if p := m.read.Load(); p != nil { + return *p + } + return readOnly[N]{} +} + +func (m *cappedMap[N]) Len() int { + n := int(atomic.LoadInt64(&m.size)) + if m.overflowed.Load() { + return n + 1 + } + return n +} + +// Load retrieves the value for a given key. +// It returns the value and a boolean indicating whether the key was found. +func (m *cappedMap[N]) Load(key attribute.Distinct) (value *sumValue[N], ok bool) { + read := m.loadReadOnly() + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + + // Check read map again under lock to avoid reporting a miss if the key + // was added while we were waiting for the lock. + read = m.loadReadOnly() + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + // Regardless of whether we find the key, record a miss. + m.missLocked() + } + m.mu.Unlock() + } + if !ok { + return nil, false + } + return e.load() +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +// +// If storing the given value would exceed the map's limit, the value is not +// stored and the overflow sumValue is returned instead. In this case, loaded +// will be true. +func (m *cappedMap[N]) LoadOrStore(key attribute.Distinct, value *sumValue[N]) (actual *sumValue[N], loaded bool) { + // Fast path - check read map first + read := m.loadReadOnly() + if e, ok := read.m[key]; ok { + return e.loadOrStore(value) + } + + // Slow path - lock and check again + + m.mu.Lock() + read = m.loadReadOnly() + if e, ok := read.m[key]; ok { + actual, loaded = e.loadOrStore(value) + } else if e, ok := m.dirty[key]; ok { + actual, loaded = e.loadOrStore(value) + m.missLocked() + } else if n := atomic.LoadInt64(&m.size); int(n) >= m.limit-1 { + actual, loaded = m.overflow(), true + } else { + if !read.amended { + // First new key to the dirty map. Make sure it is allocated and + // mark the read-only map as amended. + m.dirtyLocked() + m.read.Store(&readOnly[N]{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + atomic.AddInt64(&m.size, 1) + actual, loaded = value, false + } + m.mu.Unlock() + + return actual, loaded +} + +// loadOrStore atomically loads or stores a value. +func (e *entry[N]) loadOrStore(value *sumValue[N]) (actual *sumValue[N], loaded bool) { + p := e.p.Load() + if p != nil { + return p, true + } + + for { + if e.p.CompareAndSwap(nil, value) { + return value, false + } + p = e.p.Load() + if p != nil { + return p, true + } + } +} + +// dirtyLocked copies the read map to dirty - called with mutex held +func (m *cappedMap[N]) dirtyLocked() { + if m.dirty != nil { + return + } + + read := m.loadReadOnly() + m.dirty = make(map[attribute.Distinct]*entry[N], len(read.m)) + for k, v := range read.m { + m.dirty[k] = v + } +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently (including by f), Range may reflect any +// mapping for that key from any point during the Range call. Range does not +// block other methods on the receiver; even f itself may call any method on m. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *cappedMap[N]) Range(f func(key attribute.Distinct, value *sumValue[N]) bool) { + // We need to be able to iterate over all of the keys that were already + // present at the start of the call to Range. + // If read.amended is false, then read.m satisfies that property without + // requiring us to hold m.mu for a long time. + read := m.loadReadOnly() + if read.amended { + // m.dirty contains keys not in read.m. + m.mu.Lock() + read = m.loadReadOnly() + if read.amended { + read = readOnly[N]{m: m.dirty} + copyRead := read + m.read.Store(©Read) + m.dirty = nil + m.misses = 0 + } + m.mu.Unlock() + } + + for k, e := range read.m { + v, ok := e.load() + if !ok { + continue + } + if !f(k, v) { + break + } + } + if m.overflowed.Load() { + // If we have overflowed, we need to call f with the overflow + // value as well. + f(overflowSet.Equivalent(), m.overflow()) + } +} + +// Clear deletes all the entries, resulting in an empty Map. +func (m *cappedMap[N]) Clear() { + read := m.loadReadOnly() + if len(read.m) == 0 && !read.amended { + // Avoid allocation if already empty. + return + } + + m.mu.Lock() + defer m.mu.Unlock() + + read = m.loadReadOnly() + if len(read.m) > 0 || read.amended { + m.read.Store(&readOnly[N]{}) + } + + clear(m.dirty) + m.misses = 0 + atomic.StoreInt64(&m.size, 0) + m.overflowed.Store(false) +} + +// missLocked handles a miss in the read map - called with mutex held +func (m *cappedMap[N]) missLocked() { + m.misses++ + if m.misses < len(m.dirty) { + return + } + + // Promote dirty to read + m.read.Store(&readOnly[N]{m: m.dirty}) + m.dirty = nil + m.misses = 0 +} + +type atomicSyncMap[N int64 | float64] struct { + m sync.Map // map[attribute.Distinct]*sumValue[N] + size int64 // atomic counter for the number of entries. +} + +func newAtomicSyncMap[N int64 | float64]() atomicMap[N] { + return &atomicSyncMap[N]{} +} + +func (m *atomicSyncMap[N]) Len() int { + return int(atomic.LoadInt64(&m.size)) +} + +func (m *atomicSyncMap[N]) LoadOrStore(key attribute.Distinct, value *sumValue[N]) (actual *sumValue[N], loaded bool) { + v, loaded := m.m.LoadOrStore(key, value) + if !loaded { + atomic.AddInt64(&m.size, 1) + } + return v.(*sumValue[N]), loaded +} + +func (m *atomicSyncMap[N]) Range(f func(key attribute.Distinct, value *sumValue[N]) bool) { + m.m.Range(func(k, v any) bool { + return f(k.(attribute.Distinct), v.(*sumValue[N])) + }) +} + +func (m *atomicSyncMap[N]) Clear() { + m.m.Clear() + atomic.StoreInt64(&m.size, 0) +} diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 164feb86797..88e1243f358 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -5,7 +5,6 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" - "sync" "time" "go.opentelemetry.io/otel/attribute" @@ -13,51 +12,89 @@ import ( ) type sumValue[N int64 | float64] struct { - n N + n atomicCounter[N] res FilteredExemplarReservoir[N] attrs attribute.Set } // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { - sync.Mutex - newRes func(attribute.Set) FilteredExemplarReservoir[N] - limit limiter[sumValue[N]] - values map[attribute.Distinct]sumValue[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] + aggLimit int + + // cumulative sums do not reset values during collection, so in that case + // clearValuesOnCollection is false, hcwg is unused, and only values[0] + // and len[0] are used. All other aggregations reset on collection, so we + // use hcwg to swap between the hot and cold maps and len so measurements + // can continue without blocking on collection. + // + // see hotColdWaitGroup for how this works. + clearValuesOnCollection bool + hcwg hotColdWaitGroup + values [2]atomicMap[N] } -func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] { +type atomicMap[N int64 | float64] interface { + Len() int + LoadOrStore(key attribute.Distinct, value *sumValue[N]) (actual *sumValue[N], loaded bool) + Range(f func(key attribute.Distinct, value *sumValue[N]) bool) + Clear() +} + +func newValueMap[N int64 | float64]( + limit int, + r func(attribute.Set) FilteredExemplarReservoir[N], + clearValuesOnCollection bool, +) *valueMap[N] { + if limit < 0 { + limit = 0 + } + var values [2]atomicMap[N] + // Use sync.Map if there is no limit, otherwise use cappedMap. + if limit == 0 { + values[0] = newAtomicSyncMap[N]() + values[1] = newAtomicSyncMap[N]() + } else { + values[0] = newCappedMap[N](limit, r) + values[1] = newCappedMap[N](limit, r) + } return &valueMap[N]{ - newRes: r, - limit: newLimiter[sumValue[N]](limit), - values: make(map[attribute.Distinct]sumValue[N]), + newRes: r, + aggLimit: limit, + clearValuesOnCollection: clearValuesOnCollection, + values: values, } } func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - s.Lock() - defer s.Unlock() - - v, ok := s.values[fltrAttr.Equivalent()] - if !ok { - fltrAttr = s.limit.Attributes(fltrAttr, s.values) - v = s.values[fltrAttr.Equivalent()] - v.res = s.newRes(fltrAttr) - v.attrs = fltrAttr + hotIdx := uint64(0) + if s.clearValuesOnCollection { + hotIdx = s.hcwg.start() + defer s.hcwg.done(hotIdx) } - - v.n += value + v, _ := s.values[hotIdx].LoadOrStore(fltrAttr.Equivalent(), &sumValue[N]{ + res: s.newRes(fltrAttr), + attrs: fltrAttr, + }) + v.n.add(value) + // It is possible for collection to race with measurement and observe the + // exemplar in the batch of metrics after the add() for cumulative sums. + // This is an accepted tradeoff to avoid locking during measurement. v.res.Offer(ctx, value, droppedAttr) - - s.values[fltrAttr.Equivalent()] = v } // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] { +func newSum[N int64 | float64]( + monotonic bool, + temporality metricdata.Temporality, + limit int, + r func(attribute.Set) FilteredExemplarReservoir[N], +) *sum[N] { + clearValuesOnCollection := temporality == metricdata.DeltaTemporality return &sum[N]{ - valueMap: newValueMap[N](limit, r), + valueMap: newValueMap[N](limit, r, clearValuesOnCollection), monotonic: monotonic, start: now(), } @@ -82,30 +119,31 @@ func (s *sum[N]) delta( sData.Temporality = metricdata.DeltaTemporality sData.IsMonotonic = s.monotonic - s.Lock() - defer s.Unlock() - - n := len(s.values) + // delta always clears values on collection + readIdx := s.hcwg.swapHotAndWait() + // The len will not change while we iterate over values, since we waited + // for all writes to finish to the cold values and len. + n := s.values[readIdx].Len() dPts := reset(sData.DataPoints, n, n) var i int - for _, val := range s.values { + s.values[readIdx].Range(func(_ attribute.Distinct, val *sumValue[N]) bool { + collectExemplars(&dPts[i].Exemplars, val.res.Collect) dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = val.n - collectExemplars(&dPts[i].Exemplars, val.res.Collect) + dPts[i].Value = val.n.load() i++ - } - // Do not report stale values. - clear(s.values) + return true + }) + s.values[readIdx].Clear() // The delta collection cycle resets. s.start = t sData.DataPoints = dPts *dest = sData - return n + return i } func (s *sum[N]) cumulative( @@ -119,30 +157,33 @@ func (s *sum[N]) cumulative( sData.Temporality = metricdata.CumulativeTemporality sData.IsMonotonic = s.monotonic - s.Lock() - defer s.Unlock() - - n := len(s.values) - dPts := reset(sData.DataPoints, n, n) + readIdx := 0 + // Values are being concurrently written while we iterate, so only use the + // current length for capacity. + dPts := reset(sData.DataPoints, 0, s.values[readIdx].Len()) var i int - for _, value := range s.values { - dPts[i].Attributes = value.attrs - dPts[i].StartTime = s.start - dPts[i].Time = t - dPts[i].Value = value.n - collectExemplars(&dPts[i].Exemplars, value.res.Collect) + s.values[readIdx].Range(func(_ attribute.Distinct, val *sumValue[N]) bool { + newPt := metricdata.DataPoint[N]{ + Attributes: val.attrs, + StartTime: s.start, + Time: t, + Value: val.n.load(), + } + collectExemplars(&newPt.Exemplars, val.res.Collect) + dPts = append(dPts, newPt) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. i++ - } + return true + }) sData.DataPoints = dPts *dest = sData - return n + return i } // newPrecomputedSum returns an aggregator that summarizes a set of @@ -154,7 +195,7 @@ func newPrecomputedSum[N int64 | float64]( r func(attribute.Set) FilteredExemplarReservoir[N], ) *precomputedSum[N] { return &precomputedSum[N]{ - valueMap: newValueMap[N](limit, r), + valueMap: newValueMap[N](limit, r, true), monotonic: monotonic, start: now(), } @@ -167,14 +208,14 @@ type precomputedSum[N int64 | float64] struct { monotonic bool start time.Time - reported map[attribute.Distinct]N + reported map[any]N } func (s *precomputedSum[N]) delta( dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface ) int { t := now() - newReported := make(map[attribute.Distinct]N) + newReported := make(map[any]N) // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, // use the zero-value sData and hope for better alignment next cycle. @@ -182,27 +223,28 @@ func (s *precomputedSum[N]) delta( sData.Temporality = metricdata.DeltaTemporality sData.IsMonotonic = s.monotonic - s.Lock() - defer s.Unlock() - - n := len(s.values) + // delta always clears values on collection + readIdx := s.hcwg.swapHotAndWait() + // The len will not change while we iterate over values, since we waited + // for all writes to finish to the cold values and len. + n := s.values[readIdx].Len() dPts := reset(sData.DataPoints, n, n) var i int - for key, value := range s.values { - delta := value.n - s.reported[key] + s.values[readIdx].Range(func(key attribute.Distinct, val *sumValue[N]) bool { + n := val.n.load() - dPts[i].Attributes = value.attrs + delta := n - s.reported[key] + collectExemplars(&dPts[i].Exemplars, val.res.Collect) + dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = delta - collectExemplars(&dPts[i].Exemplars, value.res.Collect) - - newReported[key] = value.n + newReported[key] = n i++ - } - // Unused attribute sets do not report. - clear(s.values) + return true + }) + s.values[readIdx].Clear() s.reported = newReported // The delta collection cycle resets. s.start = t @@ -210,7 +252,7 @@ func (s *precomputedSum[N]) delta( sData.DataPoints = dPts *dest = sData - return n + return i } func (s *precomputedSum[N]) cumulative( @@ -224,27 +266,27 @@ func (s *precomputedSum[N]) cumulative( sData.Temporality = metricdata.CumulativeTemporality sData.IsMonotonic = s.monotonic - s.Lock() - defer s.Unlock() - - n := len(s.values) + // cumulative precomputed always clears values on collection + readIdx := s.hcwg.swapHotAndWait() + // The len will not change while we iterate over values, since we waited + // for all writes to finish to the cold values and len. + n := s.values[readIdx].Len() dPts := reset(sData.DataPoints, n, n) var i int - for _, val := range s.values { + s.values[readIdx].Range(func(_ attribute.Distinct, val *sumValue[N]) bool { + collectExemplars(&dPts[i].Exemplars, val.res.Collect) dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = val.n - collectExemplars(&dPts[i].Exemplars, val.res.Collect) - + dPts[i].Value = val.n.load() i++ - } - // Unused attribute sets do not report. - clear(s.values) + return true + }) + s.values[readIdx].Clear() sData.DataPoints = dPts *dest = sData - return n + return i }