Skip to content

Commit 126c5cf

Browse files
committed
use RWMutex for map access in the metrics SDK
1 parent b866e36 commit 126c5cf

File tree

11 files changed

+413
-195
lines changed

11 files changed

+413
-195
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
3131
- `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/meter` synchronously de-duplicates the passed attributes instead of delegating it to the returned `MeterOption`. (#7266)
3232
- `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/log` synchronously de-duplicates the passed attributes instead of delegating it to the returned `LoggerOption`. (#7266)
3333
- `Distinct` in `go.opentelemetry.io/otel/attribute` is no longer guaranteed to uniquely identify an attribute set. Collisions between `Distinct` values for different Sets are possible with extremely high cardinality (billions of series per instrument), but are highly unlikely. (#7175)
34+
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7189)
3435

3536
<!-- Released section -->
3637
<!-- Don't change this section unless doing release -->
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
5+
6+
import (
7+
"math"
8+
"sync/atomic"
9+
)
10+
11+
// atomicSum is an efficient way of adding to a number which is either an
12+
// int64 or float64.
13+
type atomicSum[N int64 | float64] struct {
14+
// nFloatBits contains only the non-integer portion of the counter.
15+
nFloatBits atomic.Uint64
16+
// nInt contains only the integer portion of the counter.
17+
nInt atomic.Int64
18+
}
19+
20+
// load returns the float or integer value.
21+
func (n *atomicSum[N]) load() N {
22+
fval := math.Float64frombits(n.nFloatBits.Load())
23+
ival := n.nInt.Load()
24+
return N(fval + float64(ival))
25+
}
26+
27+
func (n *atomicSum[N]) add(value N) {
28+
ival := int64(value)
29+
// This case is where the value is an int, or if it is a whole-numbered float.
30+
if float64(ival) == float64(value) {
31+
n.nInt.Add(ival)
32+
return
33+
}
34+
35+
// Value must be a float below.
36+
for {
37+
oldBits := n.nFloatBits.Load()
38+
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
39+
if n.nFloatBits.CompareAndSwap(oldBits, newBits) {
40+
return
41+
}
42+
}
43+
}
44+
45+
type atomicIntOrFloat[N int64 | float64] struct {
46+
// nFloatBits contains the float bits if N is float64.
47+
nFloatBits atomic.Uint64
48+
// nInt contains the int64 if N is int64
49+
nInt atomic.Int64
50+
}
51+
52+
func (n *atomicIntOrFloat[N]) store(value N) {
53+
switch v := any(value).(type) {
54+
case int64:
55+
n.nInt.Store(v)
56+
case float64:
57+
n.nFloatBits.Store(math.Float64bits(v))
58+
}
59+
}
60+
61+
func (n *atomicIntOrFloat[N]) load() (value N) {
62+
switch any(value).(type) {
63+
case int64:
64+
value = N(n.nInt.Load())
65+
case float64:
66+
value = N(math.Float64frombits(n.nFloatBits.Load()))
67+
}
68+
return
69+
}
70+
71+
func (n *atomicIntOrFloat[N]) compareAndSwap(old, new N) bool {
72+
switch any(old).(type) {
73+
case float64:
74+
return n.nFloatBits.CompareAndSwap(math.Float64bits(float64(old)), math.Float64bits(float64(new)))
75+
}
76+
return n.nInt.CompareAndSwap(int64(old), int64(new))
77+
}
78+
79+
type atomicMinMax[N int64 | float64] struct {
80+
min atomicIntOrFloat[N]
81+
max atomicIntOrFloat[N]
82+
isSet atomic.Bool
83+
}
84+
85+
func (n *atomicMinMax[N]) observe(value N) {
86+
for {
87+
minLoaded := n.min.load()
88+
if (!n.isSet.Load() || value < minLoaded) && !n.min.compareAndSwap(minLoaded, value) {
89+
// We got a new min value, but lost the race. Try again.
90+
continue
91+
}
92+
maxLoaded := n.max.load()
93+
if (!n.isSet.Load() || value > maxLoaded) && !n.max.compareAndSwap(maxLoaded, value) {
94+
// We got a new max value, but lost the race. Try again.
95+
continue
96+
}
97+
break
98+
}
99+
n.isSet.Store(true)
100+
}
101+
102+
func (n *atomicMinMax[N]) loadMin() (value N) {
103+
return n.min.load()
104+
}
105+
106+
func (n *atomicMinMax[N]) loadMax() (value N) {
107+
return n.max.load()
108+
}

0 commit comments

Comments
 (0)