Skip to content

Commit 9457877

Browse files
committed
use RWMutex for map access in the metrics SDK
1 parent b866e36 commit 9457877

File tree

11 files changed

+414
-200
lines changed

11 files changed

+414
-200
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
3131
- `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/meter` synchronously de-duplicates the passed attributes instead of delegating it to the returned `MeterOption`. (#7266)
3232
- `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/log` synchronously de-duplicates the passed attributes instead of delegating it to the returned `LoggerOption`. (#7266)
3333
- `Distinct` in `go.opentelemetry.io/otel/attribute` is no longer guaranteed to uniquely identify an attribute set. Collisions between `Distinct` values for different Sets are possible with extremely high cardinality (billions of series per instrument), but are highly unlikely. (#7175)
34+
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7189)
3435

3536
<!-- Released section -->
3637
<!-- Don't change this section unless doing release -->
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
5+
6+
import (
7+
"math"
8+
"sync/atomic"
9+
)
10+
11+
// atomicSum is an efficient way of adding to a number which is either an
12+
// int64 or float64.
13+
type atomicSum[N int64 | float64] struct {
14+
// nFloatBits contains only the non-integer portion of the counter.
15+
nFloatBits atomic.Uint64
16+
// nInt contains only the integer portion of the counter.
17+
nInt atomic.Int64
18+
}
19+
20+
// load returns the float or integer value.
21+
func (n *atomicSum[N]) load() N {
22+
fval := math.Float64frombits(n.nFloatBits.Load())
23+
ival := n.nInt.Load()
24+
return N(fval + float64(ival))
25+
}
26+
27+
func (n *atomicSum[N]) add(value N) {
28+
ival := int64(value)
29+
// This case is where the value is an int, or if it is a whole-numbered float.
30+
if float64(ival) == float64(value) {
31+
n.nInt.Add(ival)
32+
return
33+
}
34+
35+
// Value must be a float below.
36+
for {
37+
oldBits := n.nFloatBits.Load()
38+
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
39+
if n.nFloatBits.CompareAndSwap(oldBits, newBits) {
40+
return
41+
}
42+
}
43+
}
44+
45+
type atomicIntOrFloat[N int64 | float64] struct {
46+
// nFloatBits contains the float bits if N is float64.
47+
nFloatBits atomic.Uint64
48+
// nInt contains the int64 if N is int64
49+
nInt atomic.Int64
50+
}
51+
52+
func (n *atomicIntOrFloat[N]) store(value N) {
53+
switch v := any(value).(type) {
54+
case int64:
55+
n.nInt.Store(v)
56+
case float64:
57+
n.nFloatBits.Store(math.Float64bits(v))
58+
}
59+
}
60+
61+
func (n *atomicIntOrFloat[N]) load() (value N) {
62+
switch any(value).(type) {
63+
case int64:
64+
value = N(n.nInt.Load())
65+
case float64:
66+
value = N(math.Float64frombits(n.nFloatBits.Load()))
67+
}
68+
return
69+
}
70+
71+
func (n *atomicIntOrFloat[N]) compareAndSwap(oldVal, newVal N) bool {
72+
switch any(oldVal).(type) {
73+
case float64:
74+
return n.nFloatBits.CompareAndSwap(math.Float64bits(float64(oldVal)), math.Float64bits(float64(newVal)))
75+
default:
76+
return n.nInt.CompareAndSwap(int64(oldVal), int64(newVal))
77+
}
78+
}
79+
80+
type atomicMinMax[N int64 | float64] struct {
81+
min atomicIntOrFloat[N]
82+
max atomicIntOrFloat[N]
83+
isSet atomic.Bool
84+
}
85+
86+
func (n *atomicMinMax[N]) observe(value N) {
87+
for {
88+
minLoaded := n.min.load()
89+
if (!n.isSet.Load() || value < minLoaded) && !n.min.compareAndSwap(minLoaded, value) {
90+
// We got a new min value, but lost the race. Try again.
91+
continue
92+
}
93+
maxLoaded := n.max.load()
94+
if (!n.isSet.Load() || value > maxLoaded) && !n.max.compareAndSwap(maxLoaded, value) {
95+
// We got a new max value, but lost the race. Try again.
96+
continue
97+
}
98+
break
99+
}
100+
n.isSet.Store(true)
101+
}
102+
103+
func (n *atomicMinMax[N]) loadMin() (value N) {
104+
return n.min.load()
105+
}
106+
107+
func (n *atomicMinMax[N]) loadMax() (value N) {
108+
return n.max.load()
109+
}

0 commit comments

Comments
 (0)