Skip to content

Commit 55aa811

Browse files
committed
Adopt to new remote read client interface
Signed-off-by: György Krajcsovits <[email protected]>
1 parent 77516e3 commit 55aa811

File tree

6 files changed

+205
-185
lines changed

6 files changed

+205
-185
lines changed

pkg/distributor/otel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes b
429429
// Old, less efficient, version of otelMetricsToTimeseries.
430430
func otelMetricsToTimeseriesOld(ctx context.Context, tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
431431
converter := prometheusremotewrite.NewPrometheusConverter()
432-
errs := converter.FromMetrics(ctx, md, prometheusremotewrite.Settings{
432+
_, errs := converter.FromMetrics(ctx, md, prometheusremotewrite.Settings{
433433
AddMetricSuffixes: addSuffixes,
434434
})
435435
promTS := converter.TimeSeries()

pkg/mimirtool/backfill/backfill.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/go-kit/log"
1919
"github.com/pkg/errors"
2020
"github.com/prometheus/common/model"
21+
"github.com/prometheus/prometheus/model/histogram"
2122
"github.com/prometheus/prometheus/model/labels"
2223
"github.com/prometheus/prometheus/tsdb"
2324
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
@@ -66,7 +67,7 @@ func getFormattedBytes(bytes int64, humanReadable bool) string {
6667

6768
type Iterator interface {
6869
Next() error
69-
Sample() (ts int64, v float64)
70+
Sample() (ts int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram)
7071
Labels() (l labels.Labels)
7172
}
7273

@@ -117,13 +118,18 @@ func CreateBlocks(input IteratorCreator, mint, maxt int64, maxSamplesInAppender
117118
return errors.Wrap(err, "input")
118119
}
119120

120-
ts, v := i.Sample()
121+
ts, v, h, fh := i.Sample()
121122
if ts < t || ts >= tsUpper {
122123
continue
123124
}
124125

125126
l := i.Labels()
126-
if _, err := app.Append(0, i.Labels(), ts, v); err != nil {
127+
if h != nil || fh != nil {
128+
_, err = app.AppendHistogram(0, i.Labels(), ts, h, fh)
129+
} else {
130+
_, err = app.Append(0, i.Labels(), ts, v)
131+
}
132+
if err != nil {
127133
return errors.Wrap(err, fmt.Sprintf("add sample for metric=%s ts=%s value=%f", l, model.Time(ts).Time().Format(time.RFC3339Nano), v))
128134
}
129135

pkg/mimirtool/commands/remote_read.go

Lines changed: 112 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import (
2121
"time"
2222

2323
"github.com/alecthomas/kingpin/v2"
24-
"github.com/pkg/errors"
2524
config_util "github.com/prometheus/common/config"
2625
"github.com/prometheus/common/model"
26+
"github.com/prometheus/prometheus/model/histogram"
2727
"github.com/prometheus/prometheus/model/labels"
2828
"github.com/prometheus/prometheus/model/value"
29-
"github.com/prometheus/prometheus/prompb"
3029
"github.com/prometheus/prometheus/promql/parser"
30+
"github.com/prometheus/prometheus/storage"
3131
"github.com/prometheus/prometheus/storage/remote"
32+
"github.com/prometheus/prometheus/tsdb/chunkenc"
3233
log "github.com/sirupsen/logrus"
3334

3435
"github.com/grafana/mimir/pkg/mimirtool/backfill"
@@ -105,66 +106,59 @@ func (s *setTenantIDTransport) RoundTrip(req *http.Request) (*http.Response, err
105106
}
106107

107108
type timeSeriesIterator struct {
108-
posSeries int
109-
posSample int
110-
ts []*prompb.TimeSeries
111-
112-
// labels slice is reused across samples within a series
113-
labels labels.Labels
114-
labelsSeriesPos int
109+
ss storage.SeriesSet
110+
it chunkenc.Iterator
111+
ts int64
112+
v float64
113+
h *histogram.Histogram
114+
fh *histogram.FloatHistogram
115115
}
116116

117-
func newTimeSeriesIterator(ts []*prompb.TimeSeries) *timeSeriesIterator {
117+
func newTimeSeriesIterator(seriesSet storage.SeriesSet) *timeSeriesIterator {
118118
return &timeSeriesIterator{
119-
posSeries: 0,
120-
posSample: -1,
121-
122-
// ensure we are not pointing to a valid slice position
123-
labelsSeriesPos: -1,
124-
125-
ts: ts,
119+
ss: seriesSet,
120+
it: chunkenc.NewNopIterator(),
126121
}
127-
128122
}
129123

130124
func (i *timeSeriesIterator) Next() error {
131-
if i.posSeries >= len(i.ts) {
132-
return io.EOF
133-
}
134-
135-
i.posSample++
136-
137-
if i.posSample >= len(i.ts[i.posSeries].Samples) {
138-
i.posSample = -1
139-
i.posSeries++
140-
return i.Next()
125+
// Find non empty series iterator.
126+
var vt chunkenc.ValueType
127+
for vt = i.it.Next(); vt == chunkenc.ValNone; vt = i.it.Next() {
128+
if !i.ss.Next() {
129+
err := i.ss.Err()
130+
if err != nil {
131+
return err
132+
}
133+
return io.EOF
134+
}
135+
i.it = i.ss.At().Iterator(i.it)
136+
}
137+
switch vt {
138+
case chunkenc.ValFloat:
139+
i.ts, i.v = i.it.At()
140+
i.h = nil
141+
i.fh = nil
142+
case chunkenc.ValHistogram:
143+
i.ts, i.h = i.it.AtHistogram(nil)
144+
i.v = i.h.Sum
145+
i.fh = nil
146+
case chunkenc.ValFloatHistogram:
147+
i.ts, i.fh = i.it.AtFloatHistogram(nil)
148+
i.v = i.fh.Sum
149+
i.h = nil
150+
default:
151+
panic("unreachable")
141152
}
142-
143153
return nil
144154
}
145155

146156
func (i *timeSeriesIterator) Labels() (l labels.Labels) {
147-
// if it's the same label as previously return it
148-
if i.posSeries == i.labelsSeriesPos {
149-
return i.labels
150-
}
151-
152-
series := i.ts[i.posSeries]
153-
builder := labels.NewScratchBuilder(len(series.Labels))
154-
for posLabel := range series.Labels {
155-
builder.Add(series.Labels[posLabel].Name, series.Labels[posLabel].Value)
156-
}
157-
i.labels = builder.Labels()
158-
i.labelsSeriesPos = i.posSeries
159-
return i.labels
157+
return i.ss.At().Labels()
160158
}
161159

162-
func (i *timeSeriesIterator) Sample() (ts int64, v float64) {
163-
series := i.ts[i.posSeries]
164-
sample := series.Samples[i.posSample]
165-
166-
//sample.GetValue()
167-
return sample.GetTimestamp(), sample.GetValue()
160+
func (i *timeSeriesIterator) Sample() (ts int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) {
161+
return i.ts, i.v, i.h, i.fh
168162
}
169163

170164
// this is adapted from Go 1.15 for older versions
@@ -231,7 +225,7 @@ func (c *RemoteReadCommand) readClient() (remote.ReadClient, error) {
231225
}
232226

233227
// prepare() validates the input and prepares the client to query remote read endpoints
234-
func (c *RemoteReadCommand) prepare() (query func(context.Context) ([]*prompb.TimeSeries, error), from, to time.Time, err error) {
228+
func (c *RemoteReadCommand) prepare() (query func(context.Context) (storage.SeriesSet, error), from, to time.Time, err error) {
235229
from, err = time.Parse(time.RFC3339, c.from)
236230
if err != nil {
237231
return nil, time.Time{}, time.Time{}, fmt.Errorf("error parsing from: '%s' value: %w", c.from, err)
@@ -262,14 +256,14 @@ func (c *RemoteReadCommand) prepare() (query func(context.Context) ([]*prompb.Ti
262256
return nil, time.Time{}, time.Time{}, err
263257
}
264258

265-
return func(ctx context.Context) ([]*prompb.TimeSeries, error) {
259+
return func(ctx context.Context) (storage.SeriesSet, error) {
266260
log.Infof("Querying time from=%s to=%s with selector=%s", from.Format(time.RFC3339), to.Format(time.RFC3339), c.selector)
267-
resp, err := readClient.Read(ctx, pbQuery)
261+
resp, err := readClient.Read(ctx, pbQuery, false)
268262
if err != nil {
269263
return nil, err
270264
}
271265

272-
return resp.Timeseries, nil
266+
return resp, nil
273267

274268
}, from, to, nil
275269
}
@@ -285,23 +279,39 @@ func (c *RemoteReadCommand) dump(_ *kingpin.ParseContext) error {
285279
return err
286280
}
287281

288-
iterator := newTimeSeriesIterator(timeseries)
289-
for {
290-
err := iterator.Next()
291-
if err != nil {
292-
if errors.Is(err, io.EOF) {
293-
break
282+
var it chunkenc.Iterator
283+
for timeseries.Next() {
284+
s := timeseries.At()
285+
286+
l := s.Labels().String()
287+
it := s.Iterator(it)
288+
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
289+
switch vt {
290+
case chunkenc.ValFloat:
291+
ts, v := it.At()
292+
comment := ""
293+
if value.IsStaleNaN(v) {
294+
comment = " # StaleNaN"
295+
}
296+
fmt.Printf("%s %g %d%s\n", l, v, ts, comment)
297+
case chunkenc.ValHistogram:
298+
ts, h := it.AtHistogram(nil)
299+
comment := ""
300+
if value.IsStaleNaN(h.Sum) {
301+
comment = " # StaleNaN"
302+
}
303+
fmt.Printf("%s %s %d%s\n", l, h.String(), ts, comment)
304+
case chunkenc.ValFloatHistogram:
305+
ts, h := it.AtFloatHistogram(nil)
306+
comment := ""
307+
if value.IsStaleNaN(h.Sum) {
308+
comment = " # StaleNaN"
309+
}
310+
fmt.Printf("%s %s %d%s\n", l, h.String(), ts, comment)
311+
default:
312+
panic("unreachable")
294313
}
295-
return err
296-
}
297-
298-
l := iterator.Labels()
299-
ts, v := iterator.Sample()
300-
comment := ""
301-
if value.IsStaleNaN(v) {
302-
comment = " # StaleNaN"
303314
}
304-
fmt.Printf("%s %g %d%s\n", l, v, ts, comment)
305315
}
306316

307317
return nil
@@ -331,32 +341,44 @@ func (c *RemoteReadCommand) stats(_ *kingpin.ParseContext) error {
331341
Series: make(map[string]struct{}),
332342
}
333343

334-
iterator := newTimeSeriesIterator(timeseries)
335-
for {
336-
err := iterator.Next()
337-
if err != nil {
338-
if errors.Is(err, io.EOF) {
339-
break
344+
var it chunkenc.Iterator
345+
for timeseries.Next() {
346+
s := timeseries.At()
347+
it := s.Iterator(it)
348+
for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() {
349+
num.Samples++
350+
num.Series[s.Labels().String()] = struct{}{}
351+
352+
var ts int64
353+
var v float64
354+
switch vt {
355+
case chunkenc.ValFloat:
356+
ts, v = it.At()
357+
case chunkenc.ValHistogram:
358+
var h *histogram.Histogram
359+
ts, h = it.AtHistogram(nil)
360+
v = h.Sum
361+
case chunkenc.ValFloatHistogram:
362+
var h *histogram.FloatHistogram
363+
ts, h = it.AtFloatHistogram(nil)
364+
v = h.Sum
365+
default:
366+
panic("unreachable")
340367
}
341-
return err
342-
}
343-
num.Samples++
344-
num.Series[iterator.Labels().String()] = struct{}{}
345-
346-
ts, v := iterator.Sample()
347368

348-
if int64(num.MaxT) < ts {
349-
num.MaxT = model.Time(ts)
350-
}
351-
if num.MinT == 0 || int64(num.MinT) > ts {
352-
num.MinT = model.Time(ts)
353-
}
369+
if int64(num.MaxT) < ts {
370+
num.MaxT = model.Time(ts)
371+
}
372+
if num.MinT == 0 || int64(num.MinT) > ts {
373+
num.MinT = model.Time(ts)
374+
}
354375

355-
if math.IsNaN(v) {
356-
num.NaNValues++
357-
}
358-
if value.IsStaleNaN(v) {
359-
num.StaleNaNValues++
376+
if math.IsNaN(v) {
377+
num.NaNValues++
378+
}
379+
if value.IsStaleNaN(v) {
380+
num.StaleNaNValues++
381+
}
360382
}
361383
}
362384

0 commit comments

Comments
 (0)