Skip to content

Commit 3a7cb1b

Browse files
authored
[PLAT-108854] Add Thanos query bytes fetched in query frontend (#42)
2 parents b2f8333 + 927f88e commit 3a7cb1b

File tree

8 files changed

+758
-144
lines changed

8 files changed

+758
-144
lines changed

internal/cortex/querier/queryrange/query_range.go

Lines changed: 73 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,17 @@ func AnalyzesMerge(analysis ...*Analysis) *Analysis {
243243
return root
244244
}
245245

246+
func SeriesStatsCounterMerge(seriesStatsCounters ...*SeriesStatsCounter) *SeriesStatsCounter {
247+
result := SeriesStatsCounter{}
248+
for _, c := range seriesStatsCounters {
249+
result.Series += c.Series
250+
result.Chunks += c.Chunks
251+
result.Samples += c.Samples
252+
result.Bytes += c.Bytes
253+
}
254+
return &result
255+
}
256+
246257
func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response, error) {
247258
if len(responses) == 0 {
248259
return NewEmptyPrometheusResponse(), nil
@@ -269,6 +280,15 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response
269280
analyzes = append(analyzes, promResponses[i].Data.GetAnalysis())
270281
}
271282

283+
seriesStatsCounters := make([]*SeriesStatsCounter, 0, len(responses))
284+
for i := range promResponses {
285+
if promResponses[i].Data.GetSeriesStatsCounter() == nil {
286+
continue
287+
}
288+
289+
seriesStatsCounters = append(seriesStatsCounters, promResponses[i].Data.GetSeriesStatsCounter())
290+
}
291+
272292
response := PrometheusResponse{
273293
Status: StatusSuccess,
274294
Data: PrometheusData{
@@ -278,6 +298,9 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response
278298
Analysis: AnalyzesMerge(analyzes...),
279299
},
280300
}
301+
if len(seriesStatsCounters) > 0 {
302+
response.Data.SeriesStatsCounter = SeriesStatsCounterMerge(seriesStatsCounters...)
303+
}
281304
response.Headers = QueryBytesFetchedPrometheusResponseHeaders(responses...)
282305
if len(resultsCacheGenNumberHeaderValues) != 0 {
283306
response.Headers = append(response.Headers, &PrometheusResponseHeader{
@@ -450,7 +473,11 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
450473
httpHeader := http.Header{
451474
"Content-Type": []string{"application/json"}}
452475
if queryBytesFetchedHttpHeaderValue := QueryBytesFetchedHttpHeaderValue(res); queryBytesFetchedHttpHeaderValue != nil {
476+
// M3 code path
453477
httpHeader[QueryBytesFetchedHeaderName] = queryBytesFetchedHttpHeaderValue
478+
} else if res.(*PrometheusResponse).Data.SeriesStatsCounter != nil {
479+
// Pantheon code path
480+
httpHeader[QueryBytesFetchedHeaderName] = []string{strconv.FormatInt(res.(*PrometheusResponse).Data.SeriesStatsCounter.Bytes, 10)}
454481
}
455482
resp := http.Response{
456483
Header: httpHeader,
@@ -572,10 +599,11 @@ func (s *StringSample) UnmarshalJSON(b []byte) error {
572599
// UnmarshalJSON implements json.Unmarshaler.
573600
func (s *PrometheusInstantQueryData) UnmarshalJSON(data []byte) error {
574601
var queryData struct {
575-
ResultType string `json:"resultType"`
576-
Result jsoniter.RawMessage `json:"result"`
577-
Stats *PrometheusResponseStats `json:"stats,omitempty"`
578-
Analysis *Analysis `json:"analysis,omitempty"`
602+
ResultType string `json:"resultType"`
603+
Result jsoniter.RawMessage `json:"result"`
604+
Stats *PrometheusResponseStats `json:"stats,omitempty"`
605+
Analysis *Analysis `json:"analysis,omitempty"`
606+
SeriesStatsCounter *SeriesStatsCounter `json:"seriesStatsCounter,omitempty"`
579607
}
580608

581609
if err := json.Unmarshal(data, &queryData); err != nil {
@@ -585,6 +613,7 @@ func (s *PrometheusInstantQueryData) UnmarshalJSON(data []byte) error {
585613
s.ResultType = queryData.ResultType
586614
s.Stats = queryData.Stats
587615
s.Analysis = queryData.Analysis
616+
s.SeriesStatsCounter = queryData.SeriesStatsCounter
588617
switch s.ResultType {
589618
case model.ValVector.String():
590619
var result struct {
@@ -644,54 +673,62 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
644673
switch s.ResultType {
645674
case model.ValVector.String():
646675
res := struct {
647-
ResultType string `json:"resultType"`
648-
Data []*Sample `json:"result"`
649-
Stats *PrometheusResponseStats `json:"stats,omitempty"`
650-
Analysis *Analysis `json:"analysis,omitempty"`
676+
ResultType string `json:"resultType"`
677+
Data []*Sample `json:"result"`
678+
Stats *PrometheusResponseStats `json:"stats,omitempty"`
679+
Analysis *Analysis `json:"analysis,omitempty"`
680+
SeriesStatsCounter *SeriesStatsCounter `json:"seriesStatsCounter,omitempty"`
651681
}{
652-
ResultType: s.ResultType,
653-
Data: s.Result.GetVector().Samples,
654-
Stats: s.Stats,
655-
Analysis: s.Analysis,
682+
ResultType: s.ResultType,
683+
Data: s.Result.GetVector().Samples,
684+
Stats: s.Stats,
685+
Analysis: s.Analysis,
686+
SeriesStatsCounter: s.SeriesStatsCounter,
656687
}
657688
return json.Marshal(res)
658689
case model.ValMatrix.String():
659690
res := struct {
660-
ResultType string `json:"resultType"`
661-
Data []*SampleStream `json:"result"`
662-
Stats *PrometheusResponseStats `json:"stats,omitempty"`
663-
Analysis *Analysis `json:"analysis,omitempty"`
691+
ResultType string `json:"resultType"`
692+
Data []*SampleStream `json:"result"`
693+
Stats *PrometheusResponseStats `json:"stats,omitempty"`
694+
Analysis *Analysis `json:"analysis,omitempty"`
695+
SeriesStatsCounter *SeriesStatsCounter `json:"seriesStatsCounter,omitempty"`
664696
}{
665-
ResultType: s.ResultType,
666-
Data: s.Result.GetMatrix().SampleStreams,
667-
Stats: s.Stats,
668-
Analysis: s.Analysis,
697+
ResultType: s.ResultType,
698+
Data: s.Result.GetMatrix().SampleStreams,
699+
Stats: s.Stats,
700+
Analysis: s.Analysis,
701+
SeriesStatsCounter: s.SeriesStatsCounter,
669702
}
670703
return json.Marshal(res)
671704
case model.ValScalar.String():
672705
res := struct {
673-
ResultType string `json:"resultType"`
674-
Data *cortexpb.Sample `json:"result"`
675-
Stats *PrometheusResponseStats `json:"stats,omitempty"`
676-
Analysis *Analysis `json:"analysis,omitempty"`
706+
ResultType string `json:"resultType"`
707+
Data *cortexpb.Sample `json:"result"`
708+
Stats *PrometheusResponseStats `json:"stats,omitempty"`
709+
Analysis *Analysis `json:"analysis,omitempty"`
710+
SeriesStatsCounter *SeriesStatsCounter `json:"seriesStatsCounter,omitempty"`
677711
}{
678-
ResultType: s.ResultType,
679-
Data: s.Result.GetScalar(),
680-
Stats: s.Stats,
681-
Analysis: s.Analysis,
712+
ResultType: s.ResultType,
713+
Data: s.Result.GetScalar(),
714+
Stats: s.Stats,
715+
Analysis: s.Analysis,
716+
SeriesStatsCounter: s.SeriesStatsCounter,
682717
}
683718
return json.Marshal(res)
684719
case model.ValString.String():
685720
res := struct {
686-
ResultType string `json:"resultType"`
687-
Data *StringSample `json:"result"`
688-
Stats *PrometheusResponseStats `json:"stats,omitempty"`
689-
Analysis *Analysis `json:"analysis,omitempty"`
721+
ResultType string `json:"resultType"`
722+
Data *StringSample `json:"result"`
723+
Stats *PrometheusResponseStats `json:"stats,omitempty"`
724+
Analysis *Analysis `json:"analysis,omitempty"`
725+
SeriesStatsCounter *SeriesStatsCounter `json:"seriesStatsCounter,omitempty"`
690726
}{
691-
ResultType: s.ResultType,
692-
Data: s.Result.GetStringSample(),
693-
Stats: s.Stats,
694-
Analysis: s.Analysis,
727+
ResultType: s.ResultType,
728+
Data: s.Result.GetStringSample(),
729+
Stats: s.Stats,
730+
Analysis: s.Analysis,
731+
SeriesStatsCounter: s.SeriesStatsCounter,
695732
}
696733
return json.Marshal(res)
697734
default:

internal/cortex/querier/queryrange/query_range_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,72 @@ func TestMergeAPIResponses(t *testing.T) {
344344
},
345345
},
346346

347+
{
348+
name: "Basic merging of two responses with series stats counter.",
349+
input: []Response{
350+
&PrometheusResponse{
351+
Data: PrometheusData{
352+
ResultType: matrix,
353+
Analysis: &Analysis{
354+
Name: "foo",
355+
ExecutionTime: Duration(1 * time.Second),
356+
},
357+
Result: []SampleStream{
358+
{
359+
Labels: []cortexpb.LabelAdapter{},
360+
Samples: []cortexpb.Sample{
361+
{Value: 0, TimestampMs: 0},
362+
{Value: 1, TimestampMs: 1},
363+
},
364+
},
365+
},
366+
SeriesStatsCounter: &SeriesStatsCounter{Series: 2, Chunks: 16, Samples: 256, Bytes: 1024},
367+
},
368+
},
369+
&PrometheusResponse{
370+
Data: PrometheusData{
371+
ResultType: matrix,
372+
Analysis: &Analysis{
373+
Name: "foo",
374+
ExecutionTime: Duration(1 * time.Second),
375+
},
376+
Result: []SampleStream{
377+
{
378+
Labels: []cortexpb.LabelAdapter{},
379+
Samples: []cortexpb.Sample{
380+
{Value: 2, TimestampMs: 2},
381+
{Value: 3, TimestampMs: 3},
382+
},
383+
},
384+
},
385+
SeriesStatsCounter: &SeriesStatsCounter{Series: 2, Chunks: 16, Samples: 256, Bytes: 1024},
386+
},
387+
},
388+
},
389+
expected: &PrometheusResponse{
390+
Status: StatusSuccess,
391+
Data: PrometheusData{
392+
ResultType: matrix,
393+
Analysis: &Analysis{
394+
Name: "foo",
395+
ExecutionTime: Duration(2 * time.Second),
396+
},
397+
Result: []SampleStream{
398+
{
399+
Labels: []cortexpb.LabelAdapter{},
400+
Samples: []cortexpb.Sample{
401+
{Value: 0, TimestampMs: 0},
402+
{Value: 1, TimestampMs: 1},
403+
{Value: 2, TimestampMs: 2},
404+
{Value: 3, TimestampMs: 3},
405+
},
406+
},
407+
},
408+
SeriesStatsCounter: &SeriesStatsCounter{Series: 4, Chunks: 32, Samples: 512, Bytes: 2048},
409+
},
410+
},
411+
},
412+
347413
{
348414
name: "Basic merging of two responses with nested analysis trees.",
349415
input: []Response{

0 commit comments

Comments
 (0)