Skip to content

Commit 0fc9ed0

Browse files
authored
removing unused code/types from ingester (#5969)
* removing unused code/types from ingester Signed-off-by: alanprot <[email protected]> * lint Signed-off-by: alanprot <[email protected]> * fix test Signed-off-by: alanprot <[email protected]> * Fix test Signed-off-by: alanprot <[email protected]> * rebasing from master Signed-off-by: alanprot <[email protected]> * return serieset error on MatrixFromSeriesSet Signed-off-by: alanprot <[email protected]> * cleaning up streamingSelect method Signed-off-by: alanprot <[email protected]> * checking it.Err() on MatrixFromSeriesSet Signed-off-by: alanprot <[email protected]> * fix TestNewDistributorsCanPushToOldIngestersWithReplication Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent 0d0f2ad commit 0fc9ed0

14 files changed

+470
-706
lines changed

integration/backward_compatibility_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,14 @@ var (
2121
// If you change the image tag, remember to update it in the preloading done
2222
// by GitHub Actions too (see .github/workflows/test-build-deploy.yml).
2323
previousVersionImages = map[string]func(map[string]string) map[string]string{
24-
"quay.io/cortexproject/cortex:v1.13.1": nil,
25-
"quay.io/cortexproject/cortex:v1.13.2": nil,
24+
"quay.io/cortexproject/cortex:v1.13.1": func(m map[string]string) map[string]string {
25+
m["-ingester.stream-chunks-when-using-blocks"] = "true"
26+
return m
27+
},
28+
"quay.io/cortexproject/cortex:v1.13.2": func(m map[string]string) map[string]string {
29+
m["-ingester.stream-chunks-when-using-blocks"] = "true"
30+
return m
31+
},
2632
"quay.io/cortexproject/cortex:v1.14.0": nil,
2733
"quay.io/cortexproject/cortex:v1.14.1": nil,
2834
"quay.io/cortexproject/cortex:v1.15.0": nil,

pkg/distributor/query.go

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc
7070
}
7171

7272
if s := opentracing.SpanFromContext(ctx); s != nil {
73-
s.LogKV("chunk-series", len(result.GetChunkseries()), "time-series", len(result.GetTimeseries()))
73+
s.LogKV("chunk-series", len(result.GetChunkseries()))
7474
}
7575
return nil
7676
})
@@ -253,15 +253,11 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
253253
return nil, validation.LimitError(chunkLimitErr.Error())
254254
}
255255

256-
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)+len(resp.Timeseries))
256+
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries))
257257
for _, series := range resp.Chunkseries {
258258
s = append(s, series.Labels)
259259
}
260260

261-
for _, series := range resp.Timeseries {
262-
s = append(s, series.Labels)
263-
}
264-
265261
if limitErr := queryLimiter.AddSeries(s...); limitErr != nil {
266262
return nil, validation.LimitError(limitErr.Error())
267263
}
@@ -275,7 +271,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
275271
}
276272

277273
result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
278-
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
279274
}
280275
return result, nil
281276
})
@@ -286,7 +281,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
286281
span, _ := opentracing.StartSpanFromContext(ctx, "Distributor.MergeIngesterStreams")
287282
defer span.Finish()
288283
hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{}
289-
hashToTimeSeries := map[string]cortexpb.TimeSeries{}
290284

291285
for _, result := range results {
292286
response := result.(*ingester_client.QueryStreamResponse)
@@ -299,40 +293,23 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
299293
existing.Chunks = append(existing.Chunks, series.Chunks...)
300294
hashToChunkseries[key] = existing
301295
}
302-
303-
// Parse any time series
304-
for _, series := range response.Timeseries {
305-
key := ingester_client.LabelsToKeyString(cortexpb.FromLabelAdaptersToLabels(series.Labels))
306-
existing := hashToTimeSeries[key]
307-
existing.Labels = series.Labels
308-
if existing.Samples == nil {
309-
existing.Samples = series.Samples
310-
} else {
311-
existing.Samples = mergeSamples(existing.Samples, series.Samples)
312-
}
313-
hashToTimeSeries[key] = existing
314-
}
315296
}
316297

317298
resp := &ingester_client.QueryStreamResponse{
318299
Chunkseries: make([]ingester_client.TimeSeriesChunk, 0, len(hashToChunkseries)),
319-
Timeseries: make([]cortexpb.TimeSeries, 0, len(hashToTimeSeries)),
320300
}
321301
for _, series := range hashToChunkseries {
322302
resp.Chunkseries = append(resp.Chunkseries, series)
323303
}
324-
for _, series := range hashToTimeSeries {
325-
resp.Timeseries = append(resp.Timeseries, series)
326-
}
327304

328305
respSize := resp.Size()
329306
chksSize := resp.ChunksSize()
330307
chksCount := resp.ChunksCount()
331-
span.SetTag("fetched_series", len(resp.Chunkseries)+len(resp.Timeseries))
308+
span.SetTag("fetched_series", len(resp.Chunkseries))
332309
span.SetTag("fetched_chunks", chksCount)
333310
span.SetTag("fetched_data_bytes", respSize)
334311
span.SetTag("fetched_chunks_bytes", chksSize)
335-
reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries)))
312+
reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries)))
336313
reqStats.AddFetchedChunkBytes(uint64(chksSize))
337314
reqStats.AddFetchedDataBytes(uint64(respSize))
338315
reqStats.AddFetchedChunks(uint64(chksCount))

pkg/ingester/client/compat.go

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55

66
"github.com/prometheus/common/model"
77
"github.com/prometheus/prometheus/model/labels"
8+
"github.com/prometheus/prometheus/storage"
9+
"github.com/prometheus/prometheus/tsdb/chunkenc"
810

911
"github.com/cortexproject/cortex/pkg/cortexpb"
1012
)
@@ -66,6 +68,32 @@ func FromExemplarQueryRequest(req *ExemplarQueryRequest) (int64, int64, [][]*lab
6668
return req.StartTimestampMs, req.EndTimestampMs, result, nil
6769
}
6870

71+
// MatrixFromSeriesSet unpacks a SeriesSet to a model.Matrix.
72+
func MatrixFromSeriesSet(set storage.SeriesSet) (model.Matrix, error) {
73+
m := make(model.Matrix, 0)
74+
for set.Next() {
75+
s := set.At()
76+
var ss model.SampleStream
77+
ss.Metric = cortexpb.FromLabelAdaptersToMetric(cortexpb.FromLabelsToLabelAdapters(s.Labels()))
78+
ss.Values = make([]model.SamplePair, 0)
79+
it := s.Iterator(nil)
80+
for it.Next() != chunkenc.ValNone {
81+
t, v := it.At()
82+
ss.Values = append(ss.Values, model.SamplePair{
83+
Value: model.SampleValue(v),
84+
Timestamp: model.Time(t),
85+
})
86+
}
87+
if it.Err() != nil {
88+
return nil, it.Err()
89+
}
90+
91+
m = append(m, &ss)
92+
}
93+
94+
return m, set.Err()
95+
}
96+
6997
// ToQueryResponse builds a QueryResponse proto.
7098
func ToQueryResponse(matrix model.Matrix) *QueryResponse {
7199
resp := &QueryResponse{}
@@ -118,6 +146,34 @@ func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Ma
118146
}, nil
119147
}
120148

149+
// SeriesSetToQueryResponse builds a QueryResponse proto
150+
func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
151+
result := &QueryResponse{}
152+
153+
var it chunkenc.Iterator
154+
for s.Next() {
155+
series := s.At()
156+
samples := []cortexpb.Sample{}
157+
it = series.Iterator(it)
158+
for it.Next() != chunkenc.ValNone {
159+
t, v := it.At()
160+
samples = append(samples, cortexpb.Sample{
161+
TimestampMs: t,
162+
Value: v,
163+
})
164+
}
165+
if err := it.Err(); err != nil {
166+
return nil, err
167+
}
168+
result.Timeseries = append(result.Timeseries, cortexpb.TimeSeries{
169+
Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()),
170+
Samples: samples,
171+
})
172+
}
173+
174+
return result, s.Err()
175+
}
176+
121177
// FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto
122178
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) {
123179
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
@@ -133,15 +189,6 @@ func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (mo
133189
return from, to, matchersSet, nil
134190
}
135191

136-
// FromMetricsForLabelMatchersResponse unpacks a MetricsForLabelMatchersResponse proto
137-
func FromMetricsForLabelMatchersResponse(resp *MetricsForLabelMatchersResponse) []model.Metric {
138-
metrics := []model.Metric{}
139-
for _, m := range resp.Metric {
140-
metrics = append(metrics, cortexpb.FromLabelAdaptersToMetric(m.Labels))
141-
}
142-
return metrics
143-
}
144-
145192
// ToLabelValuesRequest builds a LabelValuesRequest proto
146193
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
147194
ms, err := toLabelMatchers(matchers)

pkg/ingester/client/custom.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ func (m *QueryStreamResponse) ChunksSize() int {
3535
}
3636

3737
func (m *QueryStreamResponse) SamplesCount() (count int) {
38-
for _, ts := range m.Timeseries {
39-
count += len(ts.Samples)
40-
}
4138
for _, cs := range m.Chunkseries {
4239
for _, c := range cs.Chunks {
4340
if c.Encoding == int32(encoding.PrometheusXorChunk) {

0 commit comments

Comments
 (0)