14
14
package prometheus
15
15
16
16
import (
17
+ "bytes"
17
18
"fmt"
19
+ "io"
18
20
"math"
19
21
"runtime"
20
22
"sort"
@@ -58,12 +60,14 @@ const bucketLabel = "le"
58
60
// tailored to broadly measure the response time (in seconds) of a network
59
61
// service. Most likely, however, you will be required to define buckets
60
62
// customized to your use case.
61
- var (
62
- DefBuckets = []float64 {.005 , .01 , .025 , .05 , .1 , .25 , .5 , 1 , 2.5 , 5 , 10 }
63
+ var DefBuckets = []float64 {.005 , .01 , .025 , .05 , .1 , .25 , .5 , 1 , 2.5 , 5 , 10 }
63
64
64
- errBucketLabelNotAllowed = fmt .Errorf (
65
- "%q is not allowed as label name in histograms" , bucketLabel ,
66
- )
65
+ // DefSparseBucketsZeroThreshold is the default value for
66
+ // SparseBucketsZeroThreshold in the HistogramOpts.
67
+ var DefSparseBucketsZeroThreshold = 1e-128
68
+
69
+ var errBucketLabelNotAllowed = fmt .Errorf (
70
+ "%q is not allowed as label name in histograms" , bucketLabel ,
67
71
)
68
72
69
73
// LinearBuckets creates 'count' buckets, each 'width' wide, where the lowest
@@ -146,8 +150,32 @@ type HistogramOpts struct {
146
150
// element in the slice is the upper inclusive bound of a bucket. The
147
151
// values must be sorted in strictly increasing order. There is no need
148
152
// to add a highest bucket with +Inf bound, it will be added
149
- // implicitly. The default value is DefBuckets.
153
+ // implicitly. If Buckets is left as nil or set to a slice of length
154
+ // zero, it is replaced by default buckets. The default buckets are
155
+ // DefBuckets if no sparse buckets (see below) are used, otherwise the
156
+ // default is no buckets. (In other words, if you want to use both
157
+ // reguler buckets and sparse buckets, you have to define the regular
158
+ // buckets here explicitly.)
150
159
Buckets []float64
160
+
161
+ // If SparseBucketsResolution is not zero, sparse buckets are used (in
162
+ // addition to the regular buckets, if defined above). Every power of
163
+ // ten is divided into the given number of exponential buckets. For
164
+ // example, if set to 3, the bucket boundaries are approximately […,
165
+ // 0.1, 0.215, 0.464, 1, 2.15, 4,64, 10, 21.5, 46.4, 100, …] Histograms
166
+ // can only be properly aggregated if they use the same
167
+ // resolution. Therefore, it is recommended to use 20 as a resolution,
168
+ // which is generally expected to be a good tradeoff between resource
169
+ // usage and accuracy (resulting in a maximum error of quantile values
170
+ // of about 6%).
171
+ SparseBucketsResolution uint8
172
+ // All observations with an absolute value of less or equal
173
+ // SparseBucketsZeroThreshold are accumulated into a “zero” bucket. For
174
+ // best results, this should be close to a bucket boundary. This is
175
+ // moste easily accomplished by picking a power of ten. If
176
+ // SparseBucketsZeroThreshold is left at zero (or set to a negative
177
+ // value), DefSparseBucketsZeroThreshold is used as the threshold.
178
+ SparseBucketsZeroThreshold float64
151
179
}
152
180
153
181
// NewHistogram creates a new Histogram based on the provided HistogramOpts. It
@@ -184,16 +212,20 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
184
212
}
185
213
}
186
214
187
- if len (opts .Buckets ) == 0 {
188
- opts .Buckets = DefBuckets
189
- }
190
-
191
215
h := & histogram {
192
- desc : desc ,
193
- upperBounds : opts .Buckets ,
194
- labelPairs : makeLabelPairs (desc , labelValues ),
195
- counts : [2 ]* histogramCounts {{}, {}},
196
- now : time .Now ,
216
+ desc : desc ,
217
+ upperBounds : opts .Buckets ,
218
+ sparseResolution : opts .SparseBucketsResolution ,
219
+ sparseThreshold : opts .SparseBucketsZeroThreshold ,
220
+ labelPairs : makeLabelPairs (desc , labelValues ),
221
+ counts : [2 ]* histogramCounts {{}, {}},
222
+ now : time .Now ,
223
+ }
224
+ if len (h .upperBounds ) == 0 && opts .SparseBucketsResolution == 0 {
225
+ h .upperBounds = DefBuckets
226
+ }
227
+ if h .sparseThreshold <= 0 {
228
+ h .sparseThreshold = DefSparseBucketsZeroThreshold
197
229
}
198
230
for i , upperBound := range h .upperBounds {
199
231
if i < len (h .upperBounds )- 1 {
@@ -228,6 +260,67 @@ type histogramCounts struct {
228
260
sumBits uint64
229
261
count uint64
230
262
buckets []uint64
263
+ // sparse buckets are implemented with a sync.Map for this PoC. A
264
+ // dedicated data structure will likely be more efficient.
265
+ // There are separate maps for negative and positive observations.
266
+ // The map's value is a *uint64, counting observations in that bucket.
267
+ // The map's key is the logarithmic index of the bucket. Index 0 is for an
268
+ // upper bound of 1. Each increment/decrement by SparseBucketsResolution
269
+ // multiplies/divides the upper bound by 10. Indices in between are
270
+ // spaced exponentially as defined in spareBounds.
271
+ sparseBucketsPositive , sparseBucketsNegative sync.Map
272
+ // sparseZeroBucket counts all (positive and negative) observations in
273
+ // the zero bucket (with an absolute value less or equal
274
+ // SparseBucketsZeroThreshold).
275
+ sparseZeroBucket uint64
276
+ }
277
+
278
+ // observe manages the parts of observe that only affects
279
+ // histogramCounts. doSparse is true if spare buckets should be done,
280
+ // too. whichSparse is 0 for the sparseZeroBucket and +1 or -1 for
281
+ // sparseBucketsPositive or sparseBucketsNegative, respectively. sparseKey is
282
+ // the key of the sparse bucket to use.
283
+ func (hc * histogramCounts ) observe (v float64 , bucket int , doSparse bool , whichSparse int , sparseKey int ) {
284
+ if bucket < len (hc .buckets ) {
285
+ atomic .AddUint64 (& hc .buckets [bucket ], 1 )
286
+ }
287
+ for {
288
+ oldBits := atomic .LoadUint64 (& hc .sumBits )
289
+ newBits := math .Float64bits (math .Float64frombits (oldBits ) + v )
290
+ if atomic .CompareAndSwapUint64 (& hc .sumBits , oldBits , newBits ) {
291
+ break
292
+ }
293
+ }
294
+ if doSparse {
295
+ switch whichSparse {
296
+ case 0 :
297
+ atomic .AddUint64 (& hc .sparseZeroBucket , 1 )
298
+ case + 1 :
299
+ addToSparseBucket (& hc .sparseBucketsPositive , sparseKey , 1 )
300
+ case - 1 :
301
+ addToSparseBucket (& hc .sparseBucketsNegative , sparseKey , 1 )
302
+ default :
303
+ panic (fmt .Errorf ("invalid value for whichSparse: %d" , whichSparse ))
304
+ }
305
+ }
306
+ // Increment count last as we take it as a signal that the observation
307
+ // is complete.
308
+ atomic .AddUint64 (& hc .count , 1 )
309
+ }
310
+
311
+ func addToSparseBucket (buckets * sync.Map , key int , increment uint64 ) {
312
+ if existingBucket , ok := buckets .Load (key ); ok {
313
+ // Fast path without allocation.
314
+ atomic .AddUint64 (existingBucket .(* uint64 ), increment )
315
+ return
316
+ }
317
+ // Bucket doesn't exist yet. Slow path allocating new counter.
318
+ newBucket := increment // TODO(beorn7): Check if this is sufficient to not let increment escape.
319
+ if actualBucket , loaded := buckets .LoadOrStore (key , & newBucket ); loaded {
320
+ // The bucket was created concurrently in another goroutine.
321
+ // Have to increment after all.
322
+ atomic .AddUint64 (actualBucket .(* uint64 ), increment )
323
+ }
231
324
}
232
325
233
326
type histogram struct {
@@ -259,9 +352,11 @@ type histogram struct {
259
352
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG.
260
353
counts [2 ]* histogramCounts
261
354
262
- upperBounds []float64
263
- labelPairs []* dto.LabelPair
264
- exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
355
+ upperBounds []float64
356
+ labelPairs []* dto.LabelPair
357
+ exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
358
+ sparseResolution uint8
359
+ sparseThreshold float64
265
360
266
361
now func () time.Time // To mock out time.Now() for testing.
267
362
}
@@ -309,6 +404,9 @@ func (h *histogram) Write(out *dto.Metric) error {
309
404
SampleCount : proto .Uint64 (count ),
310
405
SampleSum : proto .Float64 (math .Float64frombits (atomic .LoadUint64 (& coldCounts .sumBits ))),
311
406
}
407
+ out .Histogram = his
408
+ out .Label = h .labelPairs
409
+
312
410
var cumCount uint64
313
411
for i , upperBound := range h .upperBounds {
314
412
cumCount += atomic .LoadUint64 (& coldCounts .buckets [i ])
@@ -329,11 +427,7 @@ func (h *histogram) Write(out *dto.Metric) error {
329
427
}
330
428
his .Bucket = append (his .Bucket , b )
331
429
}
332
-
333
- out .Histogram = his
334
- out .Label = h .labelPairs
335
-
336
- // Finally add all the cold counts to the new hot counts and reset the cold counts.
430
+ // Add all the cold counts to the new hot counts and reset the cold counts.
337
431
atomic .AddUint64 (& hotCounts .count , count )
338
432
atomic .StoreUint64 (& coldCounts .count , 0 )
339
433
for {
@@ -348,9 +442,64 @@ func (h *histogram) Write(out *dto.Metric) error {
348
442
atomic .AddUint64 (& hotCounts .buckets [i ], atomic .LoadUint64 (& coldCounts .buckets [i ]))
349
443
atomic .StoreUint64 (& coldCounts .buckets [i ], 0 )
350
444
}
445
+ if h .sparseResolution != 0 {
446
+ zeroBucket := atomic .LoadUint64 (& coldCounts .sparseZeroBucket )
447
+
448
+ defer func () {
449
+ atomic .AddUint64 (& hotCounts .sparseZeroBucket , zeroBucket )
450
+ atomic .StoreUint64 (& coldCounts .sparseZeroBucket , 0 )
451
+ coldCounts .sparseBucketsPositive .Range (addAndReset (& hotCounts .sparseBucketsPositive ))
452
+ coldCounts .sparseBucketsNegative .Range (addAndReset (& hotCounts .sparseBucketsNegative ))
453
+ }()
454
+
455
+ var buf bytes.Buffer
456
+ // TODO(beorn7): encode zero bucket threshold and count.
457
+ fmt .Println ("Zero bucket:" , zeroBucket ) // DEBUG
458
+ fmt .Println ("Positive buckets:" ) // DEBUG
459
+ if _ , err := encodeSparseBuckets (& buf , & coldCounts .sparseBucketsPositive , zeroBucket ); err != nil {
460
+ return err
461
+ }
462
+ fmt .Println ("Negative buckets:" ) // DEBUG
463
+ if _ , err := encodeSparseBuckets (& buf , & coldCounts .sparseBucketsNegative , zeroBucket ); err != nil {
464
+ return err
465
+ }
466
+ }
351
467
return nil
352
468
}
353
469
470
+ func encodeSparseBuckets (w io.Writer , buckets * sync.Map , zeroBucket uint64 ) (n int , err error ) {
471
+ // TODO(beorn7): Add actual encoding of spare buckets.
472
+ var ii []int
473
+ buckets .Range (func (k , v interface {}) bool {
474
+ ii = append (ii , k .(int ))
475
+ return true
476
+ })
477
+ sort .Ints (ii )
478
+ fmt .Println (len (ii ), "buckets" )
479
+ var prev uint64
480
+ for _ , i := range ii {
481
+ v , _ := buckets .Load (i )
482
+ current := atomic .LoadUint64 (v .(* uint64 ))
483
+ fmt .Printf ("- %d: %d Δ=%d\n " , i , current , int (current )- int (prev ))
484
+ prev = current
485
+ }
486
+ return 0 , nil
487
+ }
488
+
489
+ // addAndReset returns a function to be used with sync.Map.Range of spare
490
+ // buckets in coldCounts. It increments the buckets in the provided hotBuckets
491
+ // according to the buckets ranged through. It then resets all buckets ranged
492
+ // through to 0 (but leaves them in place so that they don't need to get
493
+ // recreated on the next scrape).
494
+ func addAndReset (hotBuckets * sync.Map ) func (k , v interface {}) bool {
495
+ return func (k , v interface {}) bool {
496
+ bucket := v .(* uint64 )
497
+ addToSparseBucket (hotBuckets , k .(int ), atomic .LoadUint64 (bucket ))
498
+ atomic .StoreUint64 (bucket , 0 )
499
+ return true
500
+ }
501
+ }
502
+
354
503
// findBucket returns the index of the bucket for the provided value, or
355
504
// len(h.upperBounds) for the +Inf bucket.
356
505
func (h * histogram ) findBucket (v float64 ) int {
@@ -368,25 +517,22 @@ func (h *histogram) findBucket(v float64) int {
368
517
369
518
// observe is the implementation for Observe without the findBucket part.
370
519
func (h * histogram ) observe (v float64 , bucket int ) {
520
+ doSparse := h .sparseResolution != 0
521
+ var whichSparse , sparseKey int
522
+ if doSparse {
523
+ switch {
524
+ case v > h .sparseThreshold :
525
+ whichSparse = + 1
526
+ case v < - h .sparseThreshold :
527
+ whichSparse = - 1
528
+ }
529
+ sparseKey = int (math .Ceil (math .Log10 (math .Abs (v )) * float64 (h .sparseResolution )))
530
+ }
371
531
// We increment h.countAndHotIdx so that the counter in the lower
372
532
// 63 bits gets incremented. At the same time, we get the new value
373
533
// back, which we can use to find the currently-hot counts.
374
534
n := atomic .AddUint64 (& h .countAndHotIdx , 1 )
375
- hotCounts := h .counts [n >> 63 ]
376
-
377
- if bucket < len (h .upperBounds ) {
378
- atomic .AddUint64 (& hotCounts .buckets [bucket ], 1 )
379
- }
380
- for {
381
- oldBits := atomic .LoadUint64 (& hotCounts .sumBits )
382
- newBits := math .Float64bits (math .Float64frombits (oldBits ) + v )
383
- if atomic .CompareAndSwapUint64 (& hotCounts .sumBits , oldBits , newBits ) {
384
- break
385
- }
386
- }
387
- // Increment count last as we take it as a signal that the observation
388
- // is complete.
389
- atomic .AddUint64 (& hotCounts .count , 1 )
535
+ h .counts [n >> 63 ].observe (v , bucket , doSparse , whichSparse , sparseKey )
390
536
}
391
537
392
538
// updateExemplar replaces the exemplar for the provided bucket. With empty
0 commit comments