Skip to content

Commit 7d7acb4

Browse files
committed
use sync.Map and atomics to improve sum performance
1 parent 14d6372 commit 7d7acb4

File tree

11 files changed

+346
-91
lines changed

11 files changed

+346
-91
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
4444
- The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421)
4545
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
4646
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
47+
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7189)
4748

4849
<!-- Released section -->
4950
<!-- Don't change this section unless doing release -->

sdk/metric/internal/aggregate/aggregate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati
110110

111111
// Sum returns a sum aggregate function input and output.
112112
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
113-
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
113+
s := newSum[N](monotonic, b.Temporality, b.AggregationLimit, b.resFunc())
114114
switch b.Temporality {
115115
case metricdata.DeltaTemporality:
116116
return b.filter(s.measure), s.delta
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
5+
6+
import (
7+
"math"
8+
"runtime"
9+
"sync/atomic"
10+
)
11+
12+
// atomicCounter is an efficient way of adding to a number which is either an
13+
// int64 or float64. It is designed to be efficient when adding whole
14+
// numbers, regardless of whether N is an int64 or float64.
15+
//
16+
// Inspired by the Prometheus counter implementation:
17+
// https://github.com/prometheus/client_golang/blob/14ccb93091c00f86b85af7753100aa372d63602b/prometheus/counter.go#L108
18+
type atomicCounter[N int64 | float64] struct {
19+
// nFloatBits contains only the non-integer portion of the counter.
20+
nFloatBits atomic.Uint64
21+
// nInt contains only the integer portion of the counter.
22+
nInt atomic.Int64
23+
}
24+
25+
// load returns the current value. The caller must ensure all calls to add have
26+
// returned prior to calling load.
27+
func (n *atomicCounter[N]) load() N {
28+
fval := math.Float64frombits(n.nFloatBits.Load())
29+
ival := n.nInt.Load()
30+
return N(fval + float64(ival))
31+
}
32+
33+
func (n *atomicCounter[N]) add(value N) {
34+
ival := int64(value)
35+
// This case is where the value is an int, or if it is a whole-numbered float.
36+
if float64(ival) == float64(value) {
37+
n.nInt.Add(ival)
38+
return
39+
}
40+
41+
// Value must be a float below.
42+
for {
43+
oldBits := n.nFloatBits.Load()
44+
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
45+
if n.nFloatBits.CompareAndSwap(oldBits, newBits) {
46+
return
47+
}
48+
}
49+
}
50+
51+
// hotColdWaitGroup is a synchronization primitive which enables lockless
52+
// writes for concurrent writers and enables a reader to acquire exclusive
53+
// access to a snapshot of state including only completed operations.
54+
// Conceptually, it can be thought of as a "hot" wait group,
55+
// and a "cold" wait group, with the ability for the reader to atomically swap
56+
// the hot and cold wait groups, and wait for the now-cold wait group to
57+
// complete.
58+
//
59+
// Inspired by the prometheus/client_golang histogram implementation:
60+
// https://github.com/prometheus/client_golang/blob/a974e0d45e0aa54c65492559114894314d8a2447/prometheus/histogram.go#L725
61+
//
62+
// Usage:
63+
//
64+
// var hcwg hotColdWaitGroup
65+
// var data [2]any
66+
//
67+
// func write() {
68+
// hotIdx := hcwg.start()
69+
// defer hcwg.done(hotIdx)
70+
// // modify data without locking
71+
// data[hotIdx].update()
72+
// }
73+
//
74+
// func read() {
75+
// coldIdx := hcwg.swapHotAndWait()
76+
// // read data now that all writes to the cold data have completed.
77+
// data[coldIdx].read()
78+
// }
79+
type hotColdWaitGroup struct {
80+
// startedCountAndHotIdx contains a 63-bit counter in the lower bits,
81+
// and a 1 bit hot index to denote which of the two data-points new
82+
// measurements to write to. These are contained together so that read()
83+
// can atomically swap the hot bit, reset the started writes to zero, and
84+
// read the number writes that were started prior to the hot bit being
85+
// swapped.
86+
startedCountAndHotIdx atomic.Uint64
87+
// endedCounts is the number of writes that have completed to each
88+
// dataPoint.
89+
endedCounts [2]atomic.Uint64
90+
}
91+
92+
// start returns the hot index that the writer should write to. The returned
93+
// hot index is 0 or 1. The caller must call done(hot index) after it finishes
94+
// its operation. start() is safe to call concurrently with other methods.
95+
func (l *hotColdWaitGroup) start() uint64 {
96+
// We increment h.startedCountAndHotIdx so that the counter in the lower
97+
// 63 bits gets incremented. At the same time, we get the new value
98+
// back, which we can use to return the currently-hot index.
99+
return l.startedCountAndHotIdx.Add(1) >> 63
100+
}
101+
102+
// done signals to the reader that an operation has fully completed.
103+
// done is safe to call concurrently.
104+
func (l *hotColdWaitGroup) done(hotIdx uint64) {
105+
l.endedCounts[hotIdx].Add(1)
106+
}
107+
108+
// swapHotAndWait swaps the hot bit, waits for all start() calls to be done(),
109+
// and then returns the now-cold index for the reader to read from. The
110+
// returned index is 0 or 1. swapHotAndWait must not be called concurrently.
111+
func (l *hotColdWaitGroup) swapHotAndWait() uint64 {
112+
n := l.startedCountAndHotIdx.Load()
113+
coldIdx := (^n) >> 63
114+
// Swap the hot and cold index while resetting the started measurements
115+
// count to zero.
116+
n = l.startedCountAndHotIdx.Swap((coldIdx << 63))
117+
hotIdx := n >> 63
118+
startedCount := n & ((1 << 63) - 1)
119+
// Wait for all measurements to the previously-hot map to finish.
120+
for startedCount != l.endedCounts[hotIdx].Load() {
121+
runtime.Gosched() // Let measurements complete.
122+
}
123+
// reset the number of ended operations
124+
l.endedCounts[hotIdx].Store(0)
125+
return hotIdx
126+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
5+
6+
import (
7+
"math"
8+
"sync"
9+
"sync/atomic"
10+
"testing"
11+
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestAtomicSumAddFloatConcurrentSafe(t *testing.T) {
16+
var wg sync.WaitGroup
17+
var aSum atomicCounter[float64]
18+
for _, in := range []float64{
19+
0.2,
20+
0.25,
21+
1.6,
22+
10.55,
23+
42.4,
24+
} {
25+
wg.Add(1)
26+
go func() {
27+
defer wg.Done()
28+
aSum.add(in)
29+
}()
30+
}
31+
wg.Wait()
32+
assert.Equal(t, float64(55), math.Round(aSum.load()))
33+
}
34+
35+
func TestAtomicSumAddIntConcurrentSafe(t *testing.T) {
36+
var wg sync.WaitGroup
37+
var aSum atomicCounter[int64]
38+
for _, in := range []int64{
39+
1,
40+
2,
41+
3,
42+
4,
43+
5,
44+
} {
45+
wg.Add(1)
46+
go func() {
47+
defer wg.Done()
48+
aSum.add(in)
49+
}()
50+
}
51+
wg.Wait()
52+
assert.Equal(t, int64(15), aSum.load())
53+
}
54+
55+
func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
56+
var wg sync.WaitGroup
57+
hcwg := &hotColdWaitGroup{}
58+
var data [2]uint64
59+
for range 5 {
60+
wg.Add(1)
61+
go func() {
62+
defer wg.Done()
63+
hotIdx := hcwg.start()
64+
defer hcwg.done(hotIdx)
65+
atomic.AddUint64(&data[hotIdx], 1)
66+
}()
67+
}
68+
for range 2 {
69+
readIdx := hcwg.swapHotAndWait()
70+
assert.NotPanics(t, func() {
71+
// reading without using atomics should not panic since we are
72+
// reading from the cold element, and have waited for all writes to
73+
// finish.
74+
t.Logf("read value %+v", data[readIdx])
75+
})
76+
}
77+
wg.Wait()
78+
}

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64](
301301
maxScale: maxScale,
302302

303303
newRes: r,
304-
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
304+
limit: newLimiter[expoHistogramDataPoint[N]](limit),
305305
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
306306

307307
start: now(),
@@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct {
317317
maxScale int32
318318

319319
newRes func(attribute.Set) FilteredExemplarReservoir[N]
320-
limit limiter[*expoHistogramDataPoint[N]]
320+
limit limiter[expoHistogramDataPoint[N]]
321321
values map[attribute.Distinct]*expoHistogramDataPoint[N]
322322
valuesMu sync.Mutex
323323

sdk/metric/internal/aggregate/filtered_reservoir.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type FilteredExemplarReservoir[N int64 | float64] interface {
2929

3030
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
3131
type filteredExemplarReservoir[N int64 | float64] struct {
32+
mu sync.Mutex
3233
filter exemplar.Filter
3334
reservoir exemplar.Reservoir
3435
// The exemplar.Reservoir is not required to be concurrent safe, but
@@ -54,12 +55,12 @@ func NewFilteredExemplarReservoir[N int64 | float64](
5455

5556
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
5657
if f.filter(ctx) {
58+
// only record the current time if we are sampling this measurement.
5759
ts := time.Now()
5860
if !f.concurrentSafe {
5961
f.reservoirMux.Lock()
6062
defer f.reservoirMux.Unlock()
6163
}
62-
// only record the current time if we are sampling this measurement.
6364
f.reservoir.Offer(ctx, ts, exemplar.NewValue(val), attr)
6465
}
6566
}

sdk/metric/internal/aggregate/histogram.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type histValues[N int64 | float64] struct {
5252
bounds []float64
5353

5454
newRes func(attribute.Set) FilteredExemplarReservoir[N]
55-
limit limiter[*buckets[N]]
55+
limit limiter[buckets[N]]
5656
values map[attribute.Distinct]*buckets[N]
5757
valuesMu sync.Mutex
5858
}
@@ -74,7 +74,7 @@ func newHistValues[N int64 | float64](
7474
noSum: noSum,
7575
bounds: b,
7676
newRes: r,
77-
limit: newLimiter[*buckets[N]](limit),
77+
limit: newLimiter[buckets[N]](limit),
7878
values: make(map[attribute.Distinct]*buckets[N]),
7979
}
8080
}

sdk/metric/internal/aggregate/lastvalue.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredEx
2323
return &lastValue[N]{
2424
newRes: r,
2525
limit: newLimiter[datapoint[N]](limit),
26-
values: make(map[attribute.Distinct]datapoint[N]),
26+
values: make(map[attribute.Distinct]*datapoint[N]),
2727
start: now(),
2828
}
2929
}
@@ -34,7 +34,7 @@ type lastValue[N int64 | float64] struct {
3434

3535
newRes func(attribute.Set) FilteredExemplarReservoir[N]
3636
limit limiter[datapoint[N]]
37-
values map[attribute.Distinct]datapoint[N]
37+
values map[attribute.Distinct]*datapoint[N]
3838
start time.Time
3939
}
4040

@@ -45,9 +45,10 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
4545
d, ok := s.values[fltrAttr.Equivalent()]
4646
if !ok {
4747
fltrAttr = s.limit.Attributes(fltrAttr, s.values)
48-
d = s.values[fltrAttr.Equivalent()]
49-
d.res = s.newRes(fltrAttr)
50-
d.attrs = fltrAttr
48+
d = &datapoint[N]{
49+
res: s.newRes(fltrAttr),
50+
attrs: fltrAttr,
51+
}
5152
}
5253

5354
d.value = value

sdk/metric/internal/aggregate/limit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func newLimiter[V any](aggregation int) limiter[V] {
3030
// aggregation cardinality limit for the existing measurements. If it will,
3131
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
3232
// limit is not set (limit <= 0), attr is returned.
33-
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set {
33+
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]*V) attribute.Set {
3434
if l.aggLimit > 0 {
3535
_, exists := measurements[attrs.Equivalent()]
3636
if !exists && len(measurements) >= l.aggLimit-1 {

sdk/metric/internal/aggregate/limit_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
)
1313

1414
func TestLimiterAttributes(t *testing.T) {
15-
m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}}
15+
var val struct{}
16+
m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val}
1617
t.Run("NoLimit", func(t *testing.T) {
1718
l := newLimiter[struct{}](0)
1819
assert.Equal(t, alice, l.Attributes(alice, m))
@@ -43,7 +44,8 @@ func TestLimiterAttributes(t *testing.T) {
4344
var limitedAttr attribute.Set
4445

4546
func BenchmarkLimiterAttributes(b *testing.B) {
46-
m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}}
47+
var val struct{}
48+
m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val}
4749
l := newLimiter[struct{}](2)
4850

4951
b.ReportAllocs()

0 commit comments

Comments
 (0)