|
14 | 14 | package prometheus
|
15 | 15 |
|
16 | 16 | import (
|
17 |
| - "bytes" |
18 | 17 | "fmt"
|
19 |
| - "io" |
20 | 18 | "math"
|
21 | 19 | "runtime"
|
22 | 20 | "sort"
|
@@ -215,7 +213,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
215 | 213 | h := &histogram{
|
216 | 214 | desc: desc,
|
217 | 215 | upperBounds: opts.Buckets,
|
218 |
| - sparseResolution: opts.SparseBucketsResolution, |
| 216 | + sparseResolution: uint32(opts.SparseBucketsResolution), |
219 | 217 | sparseThreshold: opts.SparseBucketsZeroThreshold,
|
220 | 218 | labelPairs: makeLabelPairs(desc, labelValues),
|
221 | 219 | counts: [2]*histogramCounts{{}, {}},
|
@@ -355,7 +353,7 @@ type histogram struct {
|
355 | 353 | upperBounds []float64
|
356 | 354 | labelPairs []*dto.LabelPair
|
357 | 355 | exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
|
358 |
| - sparseResolution uint8 |
| 356 | + sparseResolution uint32 // Instead of uint8 to be ready for protobuf encoding. |
359 | 357 | sparseThreshold float64
|
360 | 358 |
|
361 | 359 | now func() time.Time // To mock out time.Now() for testing.
|
@@ -400,9 +398,11 @@ func (h *histogram) Write(out *dto.Metric) error {
|
400 | 398 | }
|
401 | 399 |
|
402 | 400 | his := &dto.Histogram{
|
403 |
| - Bucket: make([]*dto.Bucket, len(h.upperBounds)), |
404 |
| - SampleCount: proto.Uint64(count), |
405 |
| - SampleSum: proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))), |
| 401 | + Bucket: make([]*dto.Bucket, len(h.upperBounds)), |
| 402 | + SampleCount: proto.Uint64(count), |
| 403 | + SampleSum: proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))), |
| 404 | + SbResolution: &h.sparseResolution, |
| 405 | + SbZeroThreshold: &h.sparseThreshold, |
406 | 406 | }
|
407 | 407 | out.Histogram = his
|
408 | 408 | out.Label = h.labelPairs
|
@@ -452,38 +452,43 @@ func (h *histogram) Write(out *dto.Metric) error {
|
452 | 452 | coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative))
|
453 | 453 | }()
|
454 | 454 |
|
455 |
| - var buf bytes.Buffer |
456 |
| - // TODO(beorn7): encode zero bucket threshold and count. |
457 |
| - fmt.Println("Zero bucket:", zeroBucket) // DEBUG |
458 |
| - fmt.Println("Positive buckets:") // DEBUG |
459 |
| - if _, err := encodeSparseBuckets(&buf, &coldCounts.sparseBucketsPositive, zeroBucket); err != nil { |
460 |
| - return err |
461 |
| - } |
462 |
| - fmt.Println("Negative buckets:") // DEBUG |
463 |
| - if _, err := encodeSparseBuckets(&buf, &coldCounts.sparseBucketsNegative, zeroBucket); err != nil { |
464 |
| - return err |
465 |
| - } |
| 455 | + his.SbZeroCount = proto.Uint64(zeroBucket) |
| 456 | + his.SbNegative = makeSparseBuckets(&coldCounts.sparseBucketsNegative) |
| 457 | + his.SbPositive = makeSparseBuckets(&coldCounts.sparseBucketsPositive) |
466 | 458 | }
|
467 | 459 | return nil
|
468 | 460 | }
|
469 | 461 |
|
470 |
| -func encodeSparseBuckets(w io.Writer, buckets *sync.Map, zeroBucket uint64) (n int, err error) { |
471 |
| - // TODO(beorn7): Add actual encoding of spare buckets. |
| 462 | +func makeSparseBuckets(buckets *sync.Map) *dto.SparseBuckets { |
472 | 463 | var ii []int
|
473 | 464 | buckets.Range(func(k, v interface{}) bool {
|
474 | 465 | ii = append(ii, k.(int))
|
475 | 466 | return true
|
476 | 467 | })
|
477 | 468 | sort.Ints(ii)
|
478 |
| - fmt.Println(len(ii), "buckets") |
479 |
| - var prev uint64 |
480 |
| - for _, i := range ii { |
| 469 | + |
| 470 | + if len(ii) == 0 { |
| 471 | + return nil |
| 472 | + } |
| 473 | + |
| 474 | + sbs := dto.SparseBuckets{} |
| 475 | + var prevCount uint64 |
| 476 | + var prevI int |
| 477 | + for n, i := range ii { |
481 | 478 | v, _ := buckets.Load(i)
|
482 |
| - current := atomic.LoadUint64(v.(*uint64)) |
483 |
| - fmt.Printf("- %d: %d Δ=%d\n", i, current, int(current)-int(prev)) |
484 |
| - prev = current |
| 479 | + count := atomic.LoadUint64(v.(*uint64)) |
| 480 | + if n == 0 || i-prevI != 1 { |
| 481 | + sbs.Span = append(sbs.Span, &dto.SparseBuckets_Span{ |
| 482 | + Offset: proto.Int(i - prevI), |
| 483 | + Length: proto.Uint32(1), |
| 484 | + }) |
| 485 | + } else { |
| 486 | + *sbs.Span[len(sbs.Span)-1].Length++ |
| 487 | + } |
| 488 | + sbs.Delta = append(sbs.Delta, int64(count)-int64(prevCount)) // TODO(beorn7): Do proper overflow handling. |
| 489 | + prevI, prevCount = i, count |
485 | 490 | }
|
486 |
| - return 0, nil |
| 491 | + return &sbs |
487 | 492 | }
|
488 | 493 |
|
489 | 494 | // addAndReset returns a function to be used with sync.Map.Range of spare
|
|
0 commit comments