Skip to content

Commit e636fe3

Browse files
committed
add limitedSyncMap for enforcing limit
1 parent 6ab1a4d commit e636fe3

File tree

2 files changed

+86
-52
lines changed

2 files changed

+86
-52
lines changed

sdk/metric/internal/aggregate/atomic.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
66
import (
77
"math"
88
"runtime"
9+
"sync"
910
"sync/atomic"
11+
12+
"go.opentelemetry.io/otel/attribute"
1013
)
1114

1215
// atomicCounter is an efficient way of adding to a number which is either an
@@ -124,3 +127,58 @@ func (l *hotColdWaitGroup) swapHotAndWait() uint64 {
124127
l.endedCounts[hotIdx].Store(0)
125128
return hotIdx
126129
}
130+
131+
// limitedSyncMap is a sync.Map which enforces the aggregation limit on
132+
// attribute sets and provides a Len() function.
133+
type limitedSyncMap struct {
134+
sync.Map
135+
aggLimit int
136+
len int
137+
lenMux sync.Mutex
138+
}
139+
140+
func (m *limitedSyncMap) LoadOrStoreAttr(fltrAttr attribute.Set, newValue func(attribute.Set) any) any {
141+
actual, loaded := m.Load(fltrAttr.Equivalent())
142+
if loaded {
143+
return actual
144+
}
145+
// If the overflow set exists, assume we have already overflowed and don't
146+
// bother with the slow path below.
147+
actual, loaded = m.Load(overflowSet.Equivalent())
148+
if loaded {
149+
return actual
150+
}
151+
// Slow path: add a new attribute set.
152+
m.lenMux.Lock()
153+
defer m.lenMux.Unlock()
154+
155+
// re-fetch now that we hold the lock to ensure we don't use the overflow
156+
// set unless we are sure the attribute set isn't being written
157+
// concurrently.
158+
actual, loaded = m.Load(fltrAttr.Equivalent())
159+
if loaded {
160+
return actual
161+
}
162+
163+
if m.aggLimit > 0 && m.len >= m.aggLimit-1 {
164+
fltrAttr = overflowSet
165+
}
166+
actual, loaded = m.Map.LoadOrStore(fltrAttr.Equivalent(), newValue(fltrAttr))
167+
if !loaded {
168+
m.len++
169+
}
170+
return actual
171+
}
172+
173+
func (m *limitedSyncMap) Clear() {
174+
m.lenMux.Lock()
175+
defer m.lenMux.Unlock()
176+
m.len = 0
177+
m.Map.Clear()
178+
}
179+
180+
func (m *limitedSyncMap) Len() int {
181+
m.lenMux.Lock()
182+
defer m.lenMux.Unlock()
183+
return m.len
184+
}

sdk/metric/internal/aggregate/sum.go

Lines changed: 28 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
55

66
import (
77
"context"
8-
"sync"
9-
"sync/atomic"
108
"time"
119

1210
"go.opentelemetry.io/otel/attribute"
@@ -20,36 +18,22 @@ type sumValue[N int64 | float64] struct {
2018
}
2119

2220
type valueMap[N int64 | float64] struct {
23-
values sync.Map
24-
len atomic.Int64
21+
values limitedSyncMap
22+
newRes func(attribute.Set) FilteredExemplarReservoir[N]
2523
}
2624

2725
func (s *valueMap[N]) measure(
2826
ctx context.Context,
2927
value N,
3028
fltrAttr attribute.Set,
3129
droppedAttr []attribute.KeyValue,
32-
newRes func(attribute.Set) FilteredExemplarReservoir[N],
33-
aggLimit int,
3430
) {
35-
v, ok := s.values.Load(fltrAttr.Equivalent())
36-
if !ok {
37-
// It is possible to exceed the attribute limit if it races with other
38-
// new attribute sets. This is an accepted tradeoff to avoid locking
39-
// for writes.
40-
if aggLimit > 0 && s.len.Load() >= int64(aggLimit-1) {
41-
fltrAttr = overflowSet
31+
sv := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any {
32+
return &sumValue[N]{
33+
res: s.newRes(attr),
34+
attrs: attr,
4235
}
43-
var loaded bool
44-
v, loaded = s.values.LoadOrStore(fltrAttr.Equivalent(), &sumValue[N]{
45-
res: newRes(fltrAttr),
46-
attrs: fltrAttr,
47-
})
48-
if !loaded {
49-
s.len.Add(1)
50-
}
51-
}
52-
sv := v.(*sumValue[N])
36+
}).(*sumValue[N])
5337
sv.n.add(value)
5438
// It is possible for collection to race with measurement and observe the
5539
// exemplar in the batch of metrics after the add() for cumulative sums.
@@ -66,18 +50,23 @@ func newDeltaSum[N int64 | float64](
6650
r func(attribute.Set) FilteredExemplarReservoir[N],
6751
) *deltaSum[N] {
6852
return &deltaSum[N]{
69-
newRes: r,
70-
aggLimit: limit,
7153
monotonic: monotonic,
7254
start: now(),
55+
hotColdValMap: [2]valueMap[N]{
56+
{
57+
values: limitedSyncMap{aggLimit: limit},
58+
newRes: r,
59+
},
60+
{
61+
values: limitedSyncMap{aggLimit: limit},
62+
newRes: r,
63+
},
64+
},
7365
}
7466
}
7567

7668
// deltaSum is the storage for sums which resets every collection interval.
7769
type deltaSum[N int64 | float64] struct {
78-
newRes func(attribute.Set) FilteredExemplarReservoir[N]
79-
aggLimit int
80-
8170
monotonic bool
8271
start time.Time
8372

@@ -88,7 +77,7 @@ type deltaSum[N int64 | float64] struct {
8877
func (s *deltaSum[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
8978
hotIdx := s.hcwg.start()
9079
defer s.hcwg.done(hotIdx)
91-
s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr, s.newRes, s.aggLimit)
80+
s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr)
9281
}
9382

9483
func (s *deltaSum[N]) delta(
@@ -106,7 +95,7 @@ func (s *deltaSum[N]) delta(
10695
readIdx := s.hcwg.swapHotAndWait()
10796
// The len will not change while we iterate over values, since we waited
10897
// for all writes to finish to the cold values and len.
109-
n := int(s.hotColdValMap[readIdx].len.Load())
98+
n := s.hotColdValMap[readIdx].values.Len()
11099
dPts := reset(sData.DataPoints, n, n)
111100

112101
var i int
@@ -121,7 +110,6 @@ func (s *deltaSum[N]) delta(
121110
return true
122111
})
123112
s.hotColdValMap[readIdx].values.Clear()
124-
s.hotColdValMap[readIdx].len.Store(0)
125113
// The delta collection cycle resets.
126114
s.start = t
127115

@@ -140,31 +128,21 @@ func newCumulativeSum[N int64 | float64](
140128
r func(attribute.Set) FilteredExemplarReservoir[N],
141129
) *cumulativeSum[N] {
142130
return &cumulativeSum[N]{
143-
newRes: r,
144-
aggLimit: limit,
145131
monotonic: monotonic,
146132
start: now(),
133+
valueMap: valueMap[N]{
134+
values: limitedSyncMap{aggLimit: limit},
135+
newRes: r,
136+
},
147137
}
148138
}
149139

150140
// deltaSum is the storage for sums which never reset.
151141
type cumulativeSum[N int64 | float64] struct {
152-
newRes func(attribute.Set) FilteredExemplarReservoir[N]
153-
aggLimit int
154-
155142
monotonic bool
156143
start time.Time
157144

158-
valMap valueMap[N]
159-
}
160-
161-
func (s *cumulativeSum[N]) measure(
162-
ctx context.Context,
163-
value N,
164-
fltrAttr attribute.Set,
165-
droppedAttr []attribute.KeyValue,
166-
) {
167-
s.valMap.measure(ctx, value, fltrAttr, droppedAttr, s.newRes, s.aggLimit)
145+
valueMap[N]
168146
}
169147

170148
func (s *cumulativeSum[N]) cumulative(
@@ -180,10 +158,10 @@ func (s *cumulativeSum[N]) cumulative(
180158

181159
// Values are being concurrently written while we iterate, so only use the
182160
// current length for capacity.
183-
dPts := reset(sData.DataPoints, 0, int(s.valMap.len.Load()))
161+
dPts := reset(sData.DataPoints, 0, int(s.valueMap.values.Len()))
184162

185163
var i int
186-
s.valMap.values.Range(func(_, value any) bool {
164+
s.valueMap.values.Range(func(_, value any) bool {
187165
val := value.(*sumValue[N])
188166
newPt := metricdata.DataPoint[N]{
189167
Attributes: val.attrs,
@@ -243,7 +221,7 @@ func (s *precomputedSum[N]) delta(
243221
readIdx := s.hcwg.swapHotAndWait()
244222
// The len will not change while we iterate over values, since we waited
245223
// for all writes to finish to the cold values and len.
246-
n := int(s.hotColdValMap[readIdx].len.Load())
224+
n := s.hotColdValMap[readIdx].values.Len()
247225
dPts := reset(sData.DataPoints, n, n)
248226

249227
var i int
@@ -262,7 +240,6 @@ func (s *precomputedSum[N]) delta(
262240
return true
263241
})
264242
s.hotColdValMap[readIdx].values.Clear()
265-
s.hotColdValMap[readIdx].len.Store(0)
266243
s.reported = newReported
267244
// The delta collection cycle resets.
268245
s.start = t
@@ -288,7 +265,7 @@ func (s *precomputedSum[N]) cumulative(
288265
readIdx := s.hcwg.swapHotAndWait()
289266
// The len will not change while we iterate over values, since we waited
290267
// for all writes to finish to the cold values and len.
291-
n := int(s.hotColdValMap[readIdx].len.Load())
268+
n := s.hotColdValMap[readIdx].values.Len()
292269
dPts := reset(sData.DataPoints, n, n)
293270

294271
var i int
@@ -303,7 +280,6 @@ func (s *precomputedSum[N]) cumulative(
303280
return true
304281
})
305282
s.hotColdValMap[readIdx].values.Clear()
306-
s.hotColdValMap[readIdx].len.Store(0)
307283

308284
sData.DataPoints = dPts
309285
*dest = sData

0 commit comments

Comments
 (0)