Skip to content

Commit b2f8333

Browse files
Query bytes fetched in query API (#39)
2 parents 72a3cb8 + c8b83d2 commit b2f8333

File tree

5 files changed

+63
-49
lines changed

5 files changed

+63
-49
lines changed

pkg/api/query/v1.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,9 @@ type queryData struct {
295295
Result parser.Value `json:"result"`
296296
Stats stats.QueryStats `json:"stats,omitempty"`
297297
// Additional Thanos Response field.
298-
QueryAnalysis queryTelemetry `json:"analysis,omitempty"`
299-
Warnings []error `json:"warnings,omitempty"`
298+
QueryAnalysis queryTelemetry `json:"analysis,omitempty"`
299+
Warnings []error `json:"warnings,omitempty"`
300+
SeriesStatsCounter storepb.SeriesStatsCounter `json:"seriesStatsCounter,omitempty"`
300301
}
301302

302303
type queryTelemetry struct {
@@ -705,6 +706,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
705706
for i := range seriesStats {
706707
aggregator.Aggregate(seriesStats[i])
707708
}
709+
seriesStatsCounter := aggregator.GetSeriesStatsCounter()
708710
aggregator.Observe(time.Since(beforeRange).Seconds())
709711

710712
// Optional stats field in response if parameter "stats" is not empty.
@@ -713,10 +715,11 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
713715
qs = stats.NewQueryStats(qry.Stats())
714716
}
715717
return &queryData{
716-
ResultType: res.Value.Type(),
717-
Result: res.Value,
718-
Stats: qs,
719-
QueryAnalysis: analysis,
718+
ResultType: res.Value.Type(),
719+
Result: res.Value,
720+
Stats: qs,
721+
QueryAnalysis: analysis,
722+
SeriesStatsCounter: seriesStatsCounter,
720723
}, res.Warnings.AsErrors(), nil, qry.Close
721724
}
722725

@@ -1004,6 +1007,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
10041007
for i := range seriesStats {
10051008
aggregator.Aggregate(seriesStats[i])
10061009
}
1010+
seriesStatsCounter := aggregator.GetSeriesStatsCounter()
10071011
aggregator.Observe(time.Since(beforeRange).Seconds())
10081012

10091013
// Optional stats field in response if parameter "stats" is not empty.
@@ -1012,10 +1016,11 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
10121016
qs = stats.NewQueryStats(qry.Stats())
10131017
}
10141018
return &queryData{
1015-
ResultType: res.Value.Type(),
1016-
Result: res.Value,
1017-
Stats: qs,
1018-
QueryAnalysis: analysis,
1019+
ResultType: res.Value.Type(),
1020+
Result: res.Value,
1021+
Stats: qs,
1022+
QueryAnalysis: analysis,
1023+
SeriesStatsCounter: seriesStatsCounter,
10191024
}, res.Warnings.AsErrors(), nil, qry.Close
10201025
}
10211026

pkg/query/querier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
276276

277277
if r.GetSeries() != nil {
278278
s.seriesSet = append(s.seriesSet, *r.GetSeries())
279-
s.seriesSetStats.Count(r.GetSeries())
279+
s.seriesSetStats.Count(r)
280280
return nil
281281
}
282282

pkg/store/proxy_heap.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -431,14 +431,13 @@ func newLazyRespSet(
431431
}
432432

433433
go func(st string, l *lazyRespSet) {
434-
bytesProcessed := 0
435434
seriesStats := &storepb.SeriesStatsCounter{}
436435

437436
defer func() {
438437
l.span.SetTag("processed.series", seriesStats.Series)
439438
l.span.SetTag("processed.chunks", seriesStats.Chunks)
440439
l.span.SetTag("processed.samples", seriesStats.Samples)
441-
l.span.SetTag("processed.bytes", bytesProcessed)
440+
l.span.SetTag("processed.bytes", seriesStats.Bytes)
442441
l.span.Finish()
443442
}()
444443

@@ -498,15 +497,12 @@ func newLazyRespSet(
498497
}
499498

500499
numResponses++
501-
bytesProcessed += resp.Size()
502500

503501
if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
504502
return true
505503
}
506504

507-
if resp.GetSeries() != nil {
508-
seriesStats.Count(resp.GetSeries())
509-
}
505+
seriesStats.Count(resp)
510506

511507
l.bufferedResponsesMtx.Lock()
512508
l.bufferedResponses = append(l.bufferedResponses, resp)
@@ -714,13 +710,12 @@ func newEagerRespSet(
714710
// Start a goroutine and immediately buffer everything.
715711
go func(l *eagerRespSet) {
716712
seriesStats := &storepb.SeriesStatsCounter{}
717-
bytesProcessed := 0
718713

719714
defer func() {
720715
l.span.SetTag("processed.series", seriesStats.Series)
721716
l.span.SetTag("processed.chunks", seriesStats.Chunks)
722717
l.span.SetTag("processed.samples", seriesStats.Samples)
723-
l.span.SetTag("processed.bytes", bytesProcessed)
718+
l.span.SetTag("processed.bytes", seriesStats.Bytes)
724719
l.span.Finish()
725720
ret.wg.Done()
726721
}()
@@ -767,15 +762,12 @@ func newEagerRespSet(
767762
}
768763

769764
numResponses++
770-
bytesProcessed += resp.Size()
771765

772766
if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
773767
return true
774768
}
775769

776-
if resp.GetSeries() != nil {
777-
seriesStats.Count(resp.GetSeries())
778-
}
770+
seriesStats.Count(resp)
779771

780772
l.bufferedResponses = append(l.bufferedResponses, resp)
781773
return true

pkg/store/storepb/custom.go

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ type SeriesStatsCounter struct {
484484
Series int
485485
Chunks int
486486
Samples int
487+
Bytes uint64
487488
}
488489

489490
func (c *SeriesStatsCounter) CountSeries(seriesLabels []labelpb.ZLabel) {
@@ -494,39 +495,45 @@ func (c *SeriesStatsCounter) CountSeries(seriesLabels []labelpb.ZLabel) {
494495
}
495496
}
496497

497-
func (c *SeriesStatsCounter) Count(series *Series) {
498-
c.CountSeries(series.Labels)
499-
for _, chk := range series.Chunks {
500-
if chk.Raw != nil {
501-
c.Chunks++
502-
c.Samples += chk.Raw.XORNumSamples()
503-
}
498+
func (c *SeriesStatsCounter) Count(r *SeriesResponse) {
499+
if r.GetSeries() != nil {
500+
series := r.GetSeries()
501+
c.CountSeries(series.Labels)
502+
for _, chk := range series.Chunks {
503+
if chk.Raw != nil {
504+
c.Chunks++
505+
c.Samples += chk.Raw.XORNumSamples()
506+
}
504507

505-
if chk.Count != nil {
506-
c.Chunks++
507-
c.Samples += chk.Count.XORNumSamples()
508-
}
508+
if chk.Count != nil {
509+
c.Chunks++
510+
c.Samples += chk.Count.XORNumSamples()
511+
}
509512

510-
if chk.Counter != nil {
511-
c.Chunks++
512-
c.Samples += chk.Counter.XORNumSamples()
513-
}
513+
if chk.Counter != nil {
514+
c.Chunks++
515+
c.Samples += chk.Counter.XORNumSamples()
516+
}
514517

515-
if chk.Max != nil {
516-
c.Chunks++
517-
c.Samples += chk.Max.XORNumSamples()
518-
}
518+
if chk.Max != nil {
519+
c.Chunks++
520+
c.Samples += chk.Max.XORNumSamples()
521+
}
519522

520-
if chk.Min != nil {
521-
c.Chunks++
522-
c.Samples += chk.Min.XORNumSamples()
523-
}
523+
if chk.Min != nil {
524+
c.Chunks++
525+
c.Samples += chk.Min.XORNumSamples()
526+
}
524527

525-
if chk.Sum != nil {
526-
c.Chunks++
527-
c.Samples += chk.Sum.XORNumSamples()
528+
if chk.Sum != nil {
529+
c.Chunks++
530+
c.Samples += chk.Sum.XORNumSamples()
531+
}
528532
}
529533
}
534+
535+
//aggregate # of bytes fetched
536+
c.Bytes += uint64(r.Size())
530537
}
531538

532539
func (m *SeriesRequest) ToPromQL() string {

pkg/store/telemetry.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ type seriesStatsAggregator struct {
2424
seriesStats storepb.SeriesStatsCounter
2525
}
2626

27+
func (s *seriesStatsAggregator) GetSeriesStatsCounter() storepb.SeriesStatsCounter {
28+
return s.seriesStats
29+
}
30+
2731
type seriesStatsAggregatorFactory struct {
2832
queryDuration *prometheus.HistogramVec
2933
seriesLeBuckets []float64
@@ -81,6 +85,7 @@ func (s *seriesStatsAggregator) Aggregate(stats storepb.SeriesStatsCounter) {
8185
s.seriesStats.Series += stats.Series
8286
s.seriesStats.Samples += stats.Samples
8387
s.seriesStats.Chunks += stats.Chunks
88+
s.seriesStats.Bytes += stats.Bytes
8489
}
8590

8691
// Observe commits the aggregated SeriesStatsCounter as an observation.
@@ -124,11 +129,16 @@ type SeriesQueryPerformanceMetricsAggregatorFactory interface {
124129
type SeriesQueryPerformanceMetricsAggregator interface {
125130
Aggregate(seriesStats storepb.SeriesStatsCounter)
126131
Observe(duration float64)
132+
GetSeriesStatsCounter() storepb.SeriesStatsCounter
127133
}
128134

129135
// NoopSeriesStatsAggregator is a query performance series aggregator that does nothing.
130136
type NoopSeriesStatsAggregator struct{}
131137

138+
func (s *NoopSeriesStatsAggregator) GetSeriesStatsCounter() storepb.SeriesStatsCounter {
139+
return storepb.SeriesStatsCounter{}
140+
}
141+
132142
func (s *NoopSeriesStatsAggregator) Aggregate(_ storepb.SeriesStatsCounter) {}
133143

134144
func (s *NoopSeriesStatsAggregator) Observe(_ float64) {}

0 commit comments

Comments
 (0)