Skip to content

Commit cd8cba2

Browse files
authored
histograms: Add timer to reset ASAP after bucket limiting has happened (#1367)
Fixes #1248. See issue description for all the details. Signed-off-by: beorn7 <[email protected]>
1 parent c3e797e commit cd8cba2

File tree

2 files changed

+76
-16
lines changed

2 files changed

+76
-16
lines changed

prometheus/histogram.go

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,9 @@ type HistogramOpts struct {
475475

476476
// now is for testing purposes, by default it's time.Now.
477477
now func() time.Time
478+
479+
// afterFunc is for testing purposes, by default it's time.AfterFunc.
480+
afterFunc func(time.Duration, func()) *time.Timer
478481
}
479482

480483
// HistogramVecOpts bundles the options to create a HistogramVec metric.
@@ -526,7 +529,9 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
526529
if opts.now == nil {
527530
opts.now = time.Now
528531
}
529-
532+
if opts.afterFunc == nil {
533+
opts.afterFunc = time.AfterFunc
534+
}
530535
h := &histogram{
531536
desc: desc,
532537
upperBounds: opts.Buckets,
@@ -536,6 +541,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
536541
nativeHistogramMinResetDuration: opts.NativeHistogramMinResetDuration,
537542
lastResetTime: opts.now(),
538543
now: opts.now,
544+
afterFunc: opts.afterFunc,
539545
}
540546
if len(h.upperBounds) == 0 && opts.NativeHistogramBucketFactor <= 1 {
541547
h.upperBounds = DefBuckets
@@ -716,9 +722,16 @@ type histogram struct {
716722
nativeHistogramMinResetDuration time.Duration
717723
// lastResetTime is protected by mtx. It is also used as created timestamp.
718724
lastResetTime time.Time
725+
// resetScheduled is protected by mtx. It is true if a reset is
726+
// scheduled for a later time (when nativeHistogramMinResetDuration has
727+
// passed).
728+
resetScheduled bool
719729

720730
// now is for testing purposes, by default it's time.Now.
721731
now func() time.Time
732+
733+
// afterFunc is for testing purposes, by default it's time.AfterFunc.
734+
afterFunc func(time.Duration, func()) *time.Timer
722735
}
723736

724737
func (h *histogram) Desc() *Desc {
@@ -874,21 +887,31 @@ func (h *histogram) limitBuckets(counts *histogramCounts, value float64, bucket
874887
if h.maybeReset(hotCounts, coldCounts, coldIdx, value, bucket) {
875888
return
876889
}
890+
// One of the other strategies will happen. To undo what they will do as
891+
// soon as enough time has passed to satisfy
892+
// h.nativeHistogramMinResetDuration, schedule a reset at the right time
893+
// if we haven't done so already.
894+
if h.nativeHistogramMinResetDuration > 0 && !h.resetScheduled {
895+
h.resetScheduled = true
896+
h.afterFunc(h.nativeHistogramMinResetDuration-h.now().Sub(h.lastResetTime), h.reset)
897+
}
898+
877899
if h.maybeWidenZeroBucket(hotCounts, coldCounts) {
878900
return
879901
}
880902
h.doubleBucketWidth(hotCounts, coldCounts)
881903
}
882904

883-
// maybeReset resets the whole histogram if at least h.nativeHistogramMinResetDuration
884-
// has been passed. It returns true if the histogram has been reset. The caller
885-
// must have locked h.mtx.
905+
// maybeReset resets the whole histogram if at least
906+
// h.nativeHistogramMinResetDuration has been passed. It returns true if the
907+
// histogram has been reset. The caller must have locked h.mtx.
886908
func (h *histogram) maybeReset(
887909
hot, cold *histogramCounts, coldIdx uint64, value float64, bucket int,
888910
) bool {
889911
// We are using the possibly mocked h.now() rather than
890912
// time.Since(h.lastResetTime) to enable testing.
891-
if h.nativeHistogramMinResetDuration == 0 ||
913+
if h.nativeHistogramMinResetDuration == 0 || // No reset configured.
914+
h.resetScheduled || // Do not interefere if a reset is already scheduled.
892915
h.now().Sub(h.lastResetTime) < h.nativeHistogramMinResetDuration {
893916
return false
894917
}
@@ -906,6 +929,29 @@ func (h *histogram) maybeReset(
906929
return true
907930
}
908931

932+
// reset resets the whole histogram. It locks h.mtx itself, i.e. it has to be
933+
// called without having locked h.mtx.
934+
func (h *histogram) reset() {
935+
h.mtx.Lock()
936+
defer h.mtx.Unlock()
937+
938+
n := atomic.LoadUint64(&h.countAndHotIdx)
939+
hotIdx := n >> 63
940+
coldIdx := (^n) >> 63
941+
hot := h.counts[hotIdx]
942+
cold := h.counts[coldIdx]
943+
// Completely reset coldCounts.
944+
h.resetCounts(cold)
945+
// Make coldCounts the new hot counts while resetting countAndHotIdx.
946+
n = atomic.SwapUint64(&h.countAndHotIdx, coldIdx<<63)
947+
count := n & ((1 << 63) - 1)
948+
waitForCooldown(count, hot)
949+
// Finally, reset the formerly hot counts, too.
950+
h.resetCounts(hot)
951+
h.lastResetTime = h.now()
952+
h.resetScheduled = false
953+
}
954+
909955
// maybeWidenZeroBucket widens the zero bucket until it includes the existing
910956
// buckets closest to the zero bucket (which could be two, if an equidistant
911957
// negative and a positive bucket exists, but usually it's only one bucket to be

prometheus/histogram_test.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -925,16 +925,16 @@ func TestNativeHistogram(t *testing.T) {
925925
maxBuckets: 4,
926926
minResetDuration: 9 * time.Minute,
927927
want: &dto.Histogram{
928-
SampleCount: proto.Uint64(2),
929-
SampleSum: proto.Float64(7),
928+
SampleCount: proto.Uint64(3),
929+
SampleSum: proto.Float64(12.1),
930930
Schema: proto.Int32(2),
931931
ZeroThreshold: proto.Float64(2.938735877055719e-39),
932932
ZeroCount: proto.Uint64(0),
933933
PositiveSpan: []*dto.BucketSpan{
934-
{Offset: proto.Int32(7), Length: proto.Uint32(2)},
934+
{Offset: proto.Int32(7), Length: proto.Uint32(4)},
935935
},
936-
PositiveDelta: []int64{1, 0},
937-
CreatedTimestamp: timestamppb.New(now.Add(10 * time.Minute)), // We expect reset to happen after 9 minutes.
936+
PositiveDelta: []int64{1, 0, -1, 1},
937+
CreatedTimestamp: timestamppb.New(now.Add(9 * time.Minute)), // We expect reset to happen after 8 minutes.
938938
},
939939
},
940940
{
@@ -945,23 +945,27 @@ func TestNativeHistogram(t *testing.T) {
945945
maxZeroThreshold: 1.2,
946946
minResetDuration: 9 * time.Minute,
947947
want: &dto.Histogram{
948-
SampleCount: proto.Uint64(2),
949-
SampleSum: proto.Float64(7),
948+
SampleCount: proto.Uint64(3),
949+
SampleSum: proto.Float64(12.1),
950950
Schema: proto.Int32(2),
951951
ZeroThreshold: proto.Float64(2.938735877055719e-39),
952952
ZeroCount: proto.Uint64(0),
953953
PositiveSpan: []*dto.BucketSpan{
954-
{Offset: proto.Int32(7), Length: proto.Uint32(2)},
954+
{Offset: proto.Int32(7), Length: proto.Uint32(4)},
955955
},
956-
PositiveDelta: []int64{1, 0},
957-
CreatedTimestamp: timestamppb.New(now.Add(10 * time.Minute)), // We expect reset to happen after 9 minutes.
956+
PositiveDelta: []int64{1, 0, -1, 1},
957+
CreatedTimestamp: timestamppb.New(now.Add(9 * time.Minute)), // We expect reset to happen after 8 minutes.
958958
},
959959
},
960960
}
961961

962962
for _, s := range scenarios {
963963
t.Run(s.name, func(t *testing.T) {
964-
ts := now
964+
var (
965+
ts = now
966+
funcToCall func()
967+
whenToCall time.Duration
968+
)
965969

966970
his := NewHistogram(HistogramOpts{
967971
Name: "name",
@@ -972,12 +976,22 @@ func TestNativeHistogram(t *testing.T) {
972976
NativeHistogramMinResetDuration: s.minResetDuration,
973977
NativeHistogramMaxZeroThreshold: s.maxZeroThreshold,
974978
now: func() time.Time { return ts },
979+
afterFunc: func(d time.Duration, f func()) *time.Timer {
980+
funcToCall = f
981+
whenToCall = d
982+
return nil
983+
},
975984
})
976985

977986
ts = ts.Add(time.Minute)
978987
for _, o := range s.observations {
979988
his.Observe(o)
980989
ts = ts.Add(time.Minute)
990+
whenToCall -= time.Minute
991+
if funcToCall != nil && whenToCall <= 0 {
992+
funcToCall()
993+
funcToCall = nil
994+
}
981995
}
982996
m := &dto.Metric{}
983997
if err := his.Write(m); err != nil {

0 commit comments

Comments
 (0)