Skip to content

Commit 36719a9

Browse files
committed
fix querier serve streaming errors but next() currently takes too long
1 parent bef2d02 commit 36719a9

File tree

9 files changed

+238
-139
lines changed

9 files changed

+238
-139
lines changed

pkg/api/handlers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func NewQuerierHandler(
169169
metadataQuerier querier.MetadataQuerier,
170170
reg prometheus.Registerer,
171171
logger log.Logger,
172+
distributedExecEnabled bool,
172173
) http.Handler {
173174
// Prometheus histograms for requests to the querier.
174175
querierRequestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
@@ -282,7 +283,7 @@ func NewQuerierHandler(
282283
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
283284
api.Register(legacyPromRouter)
284285

285-
queryAPI := queryapi.NewQueryAPI(engine, queryResultCache, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
286+
queryAPI := queryapi.NewQueryAPI(engine, queryResultCache, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, distributedExecEnabled)
286287

287288
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
288289
// https://github.com/prometheus/prometheus/pull/7125/files

pkg/api/handlers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func TestBuildInfoAPI(t *testing.T) {
232232
version.Version = tc.version
233233
version.Branch = tc.branch
234234
version.Revision = tc.revision
235-
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, &FakeLogger{})
235+
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, nil, &FakeLogger{}, false)
236236
writer := httptest.NewRecorder()
237237
req := httptest.NewRequest("GET", "/api/v1/status/buildinfo", nil)
238238
req = req.WithContext(user.InjectOrgID(req.Context(), "test"))

pkg/api/queryapi/query_api.go

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ import (
2626
)
2727

2828
type QueryAPI struct {
29-
queryable storage.SampleAndChunkQueryable
30-
queryEngine engine.QueryEngine
31-
queryResultCache *distributed_execution.QueryResultCache
32-
now func() time.Time
33-
statsRenderer v1.StatsRenderer
34-
logger log.Logger
35-
codecs []v1.Codec
36-
CORSOrigin *regexp.Regexp
29+
queryable storage.SampleAndChunkQueryable
30+
queryEngine engine.QueryEngine
31+
queryResultCache *distributed_execution.QueryResultCache
32+
now func() time.Time
33+
statsRenderer v1.StatsRenderer
34+
logger log.Logger
35+
codecs []v1.Codec
36+
CORSOrigin *regexp.Regexp
37+
distributedExecEnabled bool
3738
}
3839

3940
func NewQueryAPI(
@@ -44,16 +45,18 @@ func NewQueryAPI(
4445
logger log.Logger,
4546
codecs []v1.Codec,
4647
CORSOrigin *regexp.Regexp,
48+
distributedExecEnabled bool,
4749
) *QueryAPI {
4850
return &QueryAPI{
49-
queryEngine: qe,
50-
queryResultCache: queryResultCache,
51-
queryable: q,
52-
statsRenderer: statsRenderer,
53-
logger: logger,
54-
codecs: codecs,
55-
CORSOrigin: CORSOrigin,
56-
now: time.Now,
51+
queryEngine: qe,
52+
queryResultCache: queryResultCache,
53+
queryable: q,
54+
statsRenderer: statsRenderer,
55+
logger: logger,
56+
codecs: codecs,
57+
CORSOrigin: CORSOrigin,
58+
now: time.Now,
59+
distributedExecEnabled: distributedExecEnabled,
5760
}
5861
}
5962

@@ -139,11 +142,12 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
139142

140143
ctx = httputil.ContextFromRequest(ctx, r)
141144

142-
// TODO: if distributed exec enabled
143-
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
144-
if !isRoot {
145-
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
146-
q.queryResultCache.InitWriting(*key)
145+
if q.distributedExecEnabled {
146+
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
147+
if !isRoot {
148+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
149+
q.queryResultCache.InitWriting(*key)
150+
}
147151
}
148152

149153
res := qry.Exec(ctx)
@@ -188,11 +192,14 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
188192
ctx = engine.AddEngineTypeToContext(ctx, r)
189193
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
190194

191-
// TODO: if distributed exec enabled
192-
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
193-
if !isRoot {
194-
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
195-
q.queryResultCache.InitWriting(*key)
195+
var isRoot bool
196+
var queryID, fragmentID uint64
197+
if q.distributedExecEnabled {
198+
isRoot, queryID, fragmentID, _, _ = distributed_execution.ExtractFragmentMetaData(ctx)
199+
if !isRoot {
200+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
201+
q.queryResultCache.InitWriting(*key)
202+
}
196203
}
197204

198205
var qry promql.Query
@@ -202,9 +209,11 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
202209
if len(byteLP) != 0 {
203210
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
204211
if err != nil {
205-
if !isRoot {
206-
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
207-
q.queryResultCache.SetError(*key)
212+
if q.distributedExecEnabled {
213+
if !isRoot {
214+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
215+
q.queryResultCache.SetError(*key)
216+
}
208217
}
209218
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
210219
}
@@ -264,17 +273,18 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
264273
}
265274

266275
if result.data != nil {
267-
// TODO: if distributed exec enabled
268276
ctx := httputil.ContextFromRequest(r.Context(), r)
269-
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
270-
if !isRoot {
271-
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
272-
result := distributed_execution.FragmentResult{
273-
Data: result.data,
274-
Expiration: time.Now().Add(time.Hour),
277+
if q.distributedExecEnabled {
278+
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
279+
if !isRoot {
280+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
281+
result := distributed_execution.FragmentResult{
282+
Data: result.data,
283+
Expiration: time.Now().Add(time.Hour),
284+
}
285+
q.queryResultCache.SetComplete(*key, result)
286+
return
275287
}
276-
q.queryResultCache.SetComplete(*key, result)
277-
return
278288
}
279289
q.respond(w, r, result.data, result.warnings, r.FormValue("query"))
280290
return

pkg/api/queryapi/query_api_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
78
"io"
89
"net/http"
910
"net/http/httptest"
@@ -183,7 +184,7 @@ func Test_CustomAPI(t *testing.T) {
183184

184185
for _, test := range tests {
185186
t.Run(test.name, func(t *testing.T) {
186-
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
187+
c := NewQueryAPI(engine, nil, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
187188

188189
router := mux.NewRouter()
189190
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
@@ -244,7 +245,7 @@ func Test_InvalidCodec(t *testing.T) {
244245
},
245246
}
246247

247-
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"))
248+
queryAPI := NewQueryAPI(engine, nil, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"), false)
248249
router := mux.NewRouter()
249250
router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
250251

@@ -285,7 +286,7 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) {
285286
},
286287
}
287288

288-
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
289+
queryAPI := NewQueryAPI(engine, nil, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
289290

290291
router := mux.NewRouter()
291292
router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
@@ -441,7 +442,7 @@ func Test_Logicalplan_Requests(t *testing.T) {
441442

442443
for _, tt := range tests {
443444
t.Run(tt.name, func(t *testing.T) {
444-
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
445+
c := NewQueryAPI(engine, &distributed_execution.QueryResultCache{}, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
445446
router := mux.NewRouter()
446447
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
447448
router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler))

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
407407
t.MetadataQuerier,
408408
prometheus.DefaultRegisterer,
409409
util_log.Logger,
410+
t.Cfg.Querier.DistributedExecEnabled,
410411
)
411412

412413
// If the querier is running standalone without the query-frontend or query-scheduler, we must register it's internal

pkg/engine/distributed_execution/querier_service_server.go

Lines changed: 58 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (s *QuerierServer) Series(req *querierpb.SeriesRequest, srv querierpb.Queri
3737
return fmt.Errorf("fragment not found: %v", key)
3838
}
3939

40-
batchSize := int(req.Batchsize)
40+
batchSize := int(req.Batchsize) // TODO: remove this
4141
if batchSize <= 0 {
4242
batchSize = BATCHSIZE
4343
}
@@ -47,7 +47,8 @@ func (s *QuerierServer) Series(req *querierpb.SeriesRequest, srv querierpb.Queri
4747
fragmentResult := result.Data.(FragmentResult)
4848
v1ResultData := fragmentResult.Data.(*v1.QueryData)
4949

50-
if v1ResultData.ResultType == parser.ValueTypeMatrix {
50+
switch v1ResultData.ResultType {
51+
case parser.ValueTypeMatrix:
5152
series := v1ResultData.Result.(promql.Matrix)
5253
for i := 0; i < len(series); i += batchSize {
5354
end := i + batchSize
@@ -72,7 +73,7 @@ func (s *QuerierServer) Series(req *querierpb.SeriesRequest, srv querierpb.Queri
7273
}
7374
return nil
7475

75-
} else if v1ResultData.ResultType == parser.ValueTypeVector {
76+
case parser.ValueTypeVector:
7677
series := v1ResultData.Result.(promql.Vector)
7778
for i := 0; i < len(series); i += batchSize {
7879
end := i + batchSize
@@ -110,61 +111,46 @@ func (s *QuerierServer) Series(req *querierpb.SeriesRequest, srv querierpb.Queri
110111
func (s *QuerierServer) Next(req *querierpb.NextRequest, srv querierpb.Querier_NextServer) error {
111112
key := MakeFragmentKey(req.QueryID, req.FragmentID)
112113

114+
batchSize := int(req.Batchsize)
115+
if batchSize <= 0 {
116+
batchSize = BATCHSIZE
117+
}
118+
113119
for {
114-
// TODO: maybe add a timeout thing here too
115120
result, ok := s.queryResultCache.Get(*key)
116121
if !ok {
117122
return fmt.Errorf("fragment not found: %v", key)
118123
}
119124

120125
switch result.Status {
121126
case StatusDone:
122-
batchSize := int(req.Batchsize)
123-
if batchSize <= 0 {
124-
batchSize = BATCHSIZE
125-
}
126-
127127
fragmentResult := result.Data.(FragmentResult)
128128
v1ResultData := fragmentResult.Data.(*v1.QueryData)
129129

130-
if v1ResultData.ResultType == parser.ValueTypeMatrix {
130+
switch v1ResultData.ResultType {
131+
case parser.ValueTypeMatrix:
131132
matrix := v1ResultData.Result.(promql.Matrix)
132133

133-
for i := 0; i < len(matrix); i += batchSize {
134-
end := i + batchSize
135-
if end > len(matrix) {
136-
end = len(matrix)
137-
}
138-
139-
batch := &querierpb.StepVectorBatch{
140-
StepVectors: make([]*querierpb.StepVector, 0, end-i),
141-
}
134+
numTimeSteps := matrix.TotalSamples()
142135

143-
for _, v := range (matrix)[i:end] {
144-
var floats []float64
145-
var histograms []*histogram.FloatHistogram
136+
for timeStep := 0; timeStep < numTimeSteps; timeStep++ {
146137

147-
for _, f := range v.Floats {
148-
floats = append(floats, f.F)
138+
for i, series := range matrix {
139+
batch := &querierpb.StepVectorBatch{
140+
StepVectors: make([]*querierpb.StepVector, 0, len(matrix)),
149141
}
150-
151-
for _, h := range v.Histograms {
152-
histograms = append(histograms, h.H)
142+
vector, err := s.createVectorForTimestep(&series, timeStep, uint64(i))
143+
if err != nil {
144+
return err
153145
}
154-
155-
protoVector := &querierpb.StepVector{
156-
T: 0,
157-
Samples: floats,
158-
Histograms: FloatHistogramsToFloatHistogramProto(histograms),
146+
batch.StepVectors = append(batch.StepVectors, vector)
147+
if err := srv.Send(batch); err != nil {
148+
return fmt.Errorf("error sending batch: %w", err)
159149
}
160-
batch.StepVectors = append(batch.StepVectors, protoVector)
161150
}
162-
if err := srv.Send(batch); err != nil {
163-
return err
164-
}
165-
}
166151

167-
} else if v1ResultData.ResultType == parser.ValueTypeVector {
152+
}
153+
case parser.ValueTypeVector:
168154
vector := v1ResultData.Result.(promql.Vector)
169155

170156
for i := 0; i < len(vector); i += batchSize {
@@ -190,23 +176,52 @@ func (s *QuerierServer) Next(req *querierpb.NextRequest, srv querierpb.Querier_N
190176
return err
191177
}
192178
}
193-
}
194-
195-
return nil
196179

180+
default:
181+
return fmt.Errorf("unsupported result type: %v", v1ResultData.ResultType)
182+
}
197183
case StatusError:
198184
return fmt.Errorf("fragment processing failed")
199-
200185
case StatusWriting:
201186
time.Sleep(WritingTimeout)
202187
continue
203188
}
204189
}
205190
}
206191

192+
func (s *QuerierServer) createVectorForTimestep(series *promql.Series, timeStep int, sampleID uint64) (*querierpb.StepVector, error) {
193+
var samples []float64
194+
var sampleIDs []uint64
195+
var histograms []*histogram.FloatHistogram
196+
var histogramIDs []uint64
197+
var timestamp int64
198+
199+
if timeStep < len(series.Floats) {
200+
point := series.Floats[timeStep]
201+
timestamp = point.T
202+
samples = append(samples, point.F)
203+
sampleIDs = append(sampleIDs, sampleID)
204+
}
205+
206+
if timeStep < len(series.Histograms) {
207+
point := series.Histograms[timeStep]
208+
timestamp = point.T
209+
histograms = append(histograms, point.H)
210+
histogramIDs = append(histogramIDs, uint64(timeStep))
211+
}
212+
213+
return &querierpb.StepVector{
214+
T: timestamp,
215+
Sample_IDs: sampleIDs,
216+
Samples: samples,
217+
Histogram_IDs: histogramIDs,
218+
Histograms: FloatHistogramsToFloatHistogramProto(histograms),
219+
}, nil
220+
}
221+
207222
func FloatHistogramsToFloatHistogramProto(histograms []*histogram.FloatHistogram) []querierpb.Histogram {
208223
if histograms == nil {
209-
return nil
224+
return []querierpb.Histogram{}
210225
}
211226

212227
protoHistograms := make([]querierpb.Histogram, 0, len(histograms))

0 commit comments

Comments
 (0)