@@ -10,6 +10,7 @@ import (
1010 "time"
1111
1212 "go.opentelemetry.io/otel/attribute"
13+ "go.opentelemetry.io/otel/sdk/metric/internal/reservoir"
1314)
1415
1516// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
@@ -34,6 +35,7 @@ var _ Reservoir = &FixedSizeReservoir{}
3435// If there are more than k, the Reservoir will then randomly sample all
3536// additional measurement with a decreasing probability.
3637type FixedSizeReservoir struct {
38+ reservoir.ConcurrentSafe
3739 * storage
3840
3941 // count is the number of measurement seen.
@@ -123,12 +125,12 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
123125 // https://github.com/MrAlias/reservoir-sampling for a performance
124126 // comparison of reservoir sampling algorithms.
125127
126- if int (r .count ) < cap (r .store ) {
127- r .store [ r .count ] = newMeasurement (ctx , t , n , a )
128+ if int (r .count ) < cap (r .measurements ) {
129+ r .store ( int ( r .count ), newMeasurement (ctx , t , n , a ) )
128130 } else if r .count == r .next {
129131 // Overwrite a random existing measurement with the one offered.
130- idx := int (rand .Int64N (int64 (cap (r .store ))))
131- r .store [ idx ] = newMeasurement (ctx , t , n , a )
132+ idx := int (rand .Int64N (int64 (cap (r .measurements ))))
133+ r .store ( idx , newMeasurement (ctx , t , n , a ) )
132134 r .advance ()
133135 }
134136 r .count ++
@@ -139,7 +141,7 @@ func (r *FixedSizeReservoir) reset() {
139141 // This resets the number of exemplars known.
140142 r .count = 0
141143 // Random index inserts should only happen after the storage is full.
142- r .next = int64 (cap (r .store ))
144+ r .next = int64 (cap (r .measurements ))
143145
144146 // Initial random number in the series used to generate r.next.
145147 //
@@ -150,7 +152,7 @@ func (r *FixedSizeReservoir) reset() {
150152 // This maps the uniform random number in (0,1) to a geometric distribution
151153 // over the same interval. The mean of the distribution is inversely
152154 // proportional to the storage capacity.
153- r .w = math .Exp (math .Log (r .randomFloat64 ()) / float64 (cap (r .store )))
155+ r .w = math .Exp (math .Log (r .randomFloat64 ()) / float64 (cap (r .measurements )))
154156
155157 r .advance ()
156158}
@@ -170,7 +172,7 @@ func (r *FixedSizeReservoir) advance() {
170172 // therefore the next r.w will be based on the same distribution (i.e.
171173 // `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
172174 // computing the next random number `u` and take r.w as `w * u^(1/k)`.
173- r .w *= math .Exp (math .Log (r .randomFloat64 ()) / float64 (cap (r .store )))
175+ r .w *= math .Exp (math .Log (r .randomFloat64 ()) / float64 (cap (r .measurements )))
174176 // Use the new random number in the series to calculate the count of the
175177 // next measurement that will be stored.
176178 //
0 commit comments