Skip to content

Commit d83f7b1

Browse files
authored
Support ingesting native histograms in Ruler appender (#6029)
* support ingesting native histogram samples in Ruler pusher Signed-off-by: Ben Ye <[email protected]> * handle evaluation delay for native histograms Signed-off-by: Ben Ye <[email protected]> * fix unit test Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent a19b867 commit d83f7b1

File tree

5 files changed

+405
-56
lines changed

5 files changed

+405
-56
lines changed

pkg/cortexpb/compat.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMe
4444
return req
4545
}
4646

47+
func (w *WriteRequest) AddHistogramTimeSeries(lbls []labels.Labels, histograms []Histogram) {
48+
for i := 0; i < len(lbls); i++ {
49+
ts := TimeseriesFromPool()
50+
ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...)
51+
ts.Histograms = append(ts.Histograms, histograms[i])
52+
w.Timeseries = append(w.Timeseries, PreallocTimeseries{TimeSeries: ts})
53+
}
54+
}
55+
4756
// FromLabelAdaptersToLabels casts []LabelAdapter to labels.Labels.
4857
// It uses unsafe, but as LabelAdapter == labels.Label this should be safe.
4958
// This allows us to use labels.Labels directly in protos.

pkg/cortexpb/timeseries.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
)
1313

1414
var (
15-
expectedTimeseries = 100
16-
expectedLabels = 20
17-
expectedSamplesPerSeries = 10
18-
expectedExemplarsPerSeries = 1
15+
expectedTimeseries = 100
16+
expectedLabels = 20
17+
expectedSamplesPerSeries = 10
18+
expectedExemplarsPerSeries = 1
19+
expectedHistogramsPerSeries = 1
1920

2021
/*
2122
We cannot pool these as pointer-to-slice because the place we use them is in WriteRequest which is generated from Protobuf
@@ -31,9 +32,10 @@ var (
3132
timeSeriesPool = sync.Pool{
3233
New: func() interface{} {
3334
return &TimeSeries{
34-
Labels: make([]LabelAdapter, 0, expectedLabels),
35-
Samples: make([]Sample, 0, expectedSamplesPerSeries),
36-
Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries),
35+
Labels: make([]LabelAdapter, 0, expectedLabels),
36+
Samples: make([]Sample, 0, expectedSamplesPerSeries),
37+
Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries),
38+
Histograms: make([]Histogram, 0, expectedHistogramsPerSeries),
3739
}
3840
},
3941
}

pkg/ruler/compat.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,34 @@ type PusherAppender struct {
4343
pusher Pusher
4444
labels []labels.Labels
4545
samples []cortexpb.Sample
46+
histogramLabels []labels.Labels
47+
histograms []cortexpb.Histogram
4648
userID string
4749
evaluationDelay time.Duration
4850
}
4951

50-
func (a *PusherAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
51-
return 0, errors.New("querying native histograms is not supported")
52+
func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
53+
if h == nil && fh == nil {
54+
return 0, errors.New("no histogram")
55+
}
56+
57+
if h != nil {
58+
// A histogram sample is considered stale if its sum is set to NaN.
59+
// https://github.com/prometheus/prometheus/blob/b6ef745016fa9472fdd0ae20f75a9682e01d1e5c/tsdb/head_append.go#L339-L346
60+
if a.evaluationDelay > 0 && (value.IsStaleNaN(h.Sum)) {
61+
t -= a.evaluationDelay.Milliseconds()
62+
}
63+
a.histograms = append(a.histograms, cortexpb.HistogramToHistogramProto(t, h))
64+
} else {
65+
// A histogram sample is considered stale if its sum is set to NaN.
66+
// https://github.com/prometheus/prometheus/blob/b6ef745016fa9472fdd0ae20f75a9682e01d1e5c/tsdb/head_append.go#L339-L346
67+
if a.evaluationDelay > 0 && (value.IsStaleNaN(fh.Sum)) {
68+
t -= a.evaluationDelay.Milliseconds()
69+
}
70+
a.histograms = append(a.histograms, cortexpb.FloatHistogramToHistogramProto(t, fh))
71+
}
72+
a.histogramLabels = append(a.histogramLabels, l)
73+
return 0, nil
5274
}
5375

5476
func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
@@ -85,10 +107,11 @@ func (a *PusherAppender) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _
85107
func (a *PusherAppender) Commit() error {
86108
a.totalWrites.Inc()
87109

110+
req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE)
111+
req.AddHistogramTimeSeries(a.histogramLabels, a.histograms)
88112
// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
89113
// We shouldn't call client.ReuseSlice here.
90-
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE))
91-
114+
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req)
92115
if err != nil {
93116
// Don't report errors that ended with 4xx HTTP status code (series limits, duplicate samples, out of order, etc.)
94117
if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code/100 != 4 {
@@ -98,6 +121,8 @@ func (a *PusherAppender) Commit() error {
98121

99122
a.labels = nil
100123
a.samples = nil
124+
a.histogramLabels = nil
125+
a.histograms = nil
101126
return err
102127
}
103128

@@ -108,6 +133,8 @@ func (a *PusherAppender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _
108133
func (a *PusherAppender) Rollback() error {
109134
a.labels = nil
110135
a.samples = nil
136+
a.histogramLabels = nil
137+
a.histograms = nil
111138
return nil
112139
}
113140

0 commit comments

Comments
 (0)