Skip to content

Commit 6ed74f6

Browse files
committed
initial implementation
1 parent 91b32f1 commit 6ed74f6

File tree

23 files changed

+1576
-317
lines changed

23 files changed

+1576
-317
lines changed

pkg/api/api.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package api
33
import (
44
"context"
55
"flag"
6+
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
7+
"github.com/cortexproject/cortex/pkg/engine/distributed_execution/querierpb"
68
"net/http"
79
"path"
810
"strings"
@@ -480,6 +482,10 @@ func (a *API) RegisterQueryScheduler(f *scheduler.Scheduler) {
480482
schedulerpb.RegisterSchedulerForQuerierServer(a.server.GRPC, f)
481483
}
482484

485+
func (a *API) RegisterQuerierServer(q *distributed_execution.QuerierServer) {
486+
querierpb.RegisterQuerierServer(a.server.GRPC, q)
487+
}
488+
483489
// RegisterServiceMapHandler registers the Cortex structs service handler
484490
// TODO: Refactor this code to be accomplished using the services.ServiceManager
485491
// or a future module manager #2291

pkg/api/handlers.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package api
33
import (
44
"context"
55
"encoding/json"
6+
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
67
"html/template"
78
"net/http"
89
"path"
@@ -164,6 +165,7 @@ func NewQuerierHandler(
164165
queryable storage.SampleAndChunkQueryable,
165166
exemplarQueryable storage.ExemplarQueryable,
166167
engine engine.QueryEngine,
168+
queryResultCache *distributed_execution.QueryResultCache,
167169
metadataQuerier querier.MetadataQuerier,
168170
reg prometheus.Registerer,
169171
logger log.Logger,
@@ -280,7 +282,7 @@ func NewQuerierHandler(
280282
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
281283
api.Register(legacyPromRouter)
282284

283-
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
285+
queryAPI := queryapi.NewQueryAPI(engine, queryResultCache, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
284286

285287
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
286288
// https://github.com/prometheus/prometheus/pull/7125/files

pkg/api/queryapi/query_api.go

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
7+
"github.com/cortexproject/cortex/pkg/scheduler/plan_fragments"
78
"net/http"
89
"strconv"
910
"time"
@@ -26,31 +27,34 @@ import (
2627
)
2728

2829
type QueryAPI struct {
29-
queryable storage.SampleAndChunkQueryable
30-
queryEngine engine.QueryEngine
31-
now func() time.Time
32-
statsRenderer v1.StatsRenderer
33-
logger log.Logger
34-
codecs []v1.Codec
35-
CORSOrigin *regexp.Regexp
30+
queryable storage.SampleAndChunkQueryable
31+
queryEngine engine.QueryEngine
32+
queryResultCache *distributed_execution.QueryResultCache
33+
now func() time.Time
34+
statsRenderer v1.StatsRenderer
35+
logger log.Logger
36+
codecs []v1.Codec
37+
CORSOrigin *regexp.Regexp
3638
}
3739

3840
func NewQueryAPI(
3941
qe engine.QueryEngine,
42+
queryResultCache *distributed_execution.QueryResultCache,
4043
q storage.SampleAndChunkQueryable,
4144
statsRenderer v1.StatsRenderer,
4245
logger log.Logger,
4346
codecs []v1.Codec,
4447
CORSOrigin *regexp.Regexp,
4548
) *QueryAPI {
4649
return &QueryAPI{
47-
queryEngine: qe,
48-
queryable: q,
49-
statsRenderer: statsRenderer,
50-
logger: logger,
51-
codecs: codecs,
52-
CORSOrigin: CORSOrigin,
53-
now: time.Now,
50+
queryEngine: qe,
51+
queryResultCache: queryResultCache,
52+
queryable: q,
53+
statsRenderer: statsRenderer,
54+
logger: logger,
55+
codecs: codecs,
56+
CORSOrigin: CORSOrigin,
57+
now: time.Now,
5458
}
5559
}
5660

@@ -136,6 +140,13 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
136140

137141
ctx = httputil.ContextFromRequest(ctx, r)
138142

143+
// TODO: if distributed exec enabled
144+
isRoot, queryID, fragmentID, _ := plan_fragments.ExtractFragmentMetaData(ctx)
145+
if !isRoot {
146+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
147+
q.queryResultCache.InitWriting(key)
148+
}
149+
139150
res := qry.Exec(ctx)
140151
if res.Err != nil {
141152
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
@@ -178,17 +189,32 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
178189
ctx = engine.AddEngineTypeToContext(ctx, r)
179190
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
180191

192+
// TODO: if distributed exec enabled
193+
isRoot, queryID, fragmentID, _ := plan_fragments.ExtractFragmentMetaData(ctx)
194+
if !isRoot {
195+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
196+
q.queryResultCache.InitWriting(key)
197+
}
198+
181199
var qry promql.Query
182200
tsTime := convertMsToTime(ts)
183201

184202
byteLP := []byte(r.PostFormValue("plan"))
185203
if len(byteLP) != 0 {
186204
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
187205
if err != nil {
206+
if !isRoot {
207+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
208+
q.queryResultCache.SetError(key)
209+
}
188210
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
189211
}
190212
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query"))
191213
if err != nil {
214+
if !isRoot {
215+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
216+
q.queryResultCache.SetError(key)
217+
}
192218
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create instant query from logical plan: %v", err)}, nil, nil}
193219
}
194220
} else { // if there is logical plan field is empty, fall back
@@ -239,6 +265,18 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
239265
}
240266

241267
if result.data != nil {
268+
// TODO: if distributed exec enabled
269+
ctx := httputil.ContextFromRequest(r.Context(), r)
270+
isRoot, queryID, fragmentID, _ := plan_fragments.ExtractFragmentMetaData(ctx)
271+
if !isRoot {
272+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
273+
result := distributed_execution.FragmentResult{
274+
Data: result.data,
275+
Expiration: time.Now().Add(time.Hour),
276+
}
277+
q.queryResultCache.SetComplete(key, result)
278+
return
279+
}
242280
q.respond(w, r, result.data, result.warnings, r.FormValue("query"))
243281
return
244282
}

pkg/cortex/modules.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
7+
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
8+
"github.com/cortexproject/cortex/pkg/ring/client"
9+
"github.com/prometheus/client_golang/prometheus/promauto"
810
"log/slog"
911
"net"
1012
"net/http"
1113
"runtime"
1214
"runtime/debug"
1315
"strconv"
16+
"time"
1417

1518
"github.com/go-kit/log/level"
1619
"github.com/opentracing-contrib/go-stdlib/nethttp"
@@ -363,13 +366,35 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
363366
// │ │
364367
// └──────────────────┘
365368
func (t *Cortex) initQuerier() (serv services.Service, err error) {
369+
// Create new map for caching partial results during distributed execution
370+
var queryResultCache *distributed_execution.QueryResultCache
371+
var queryServer *distributed_execution.QuerierServer
372+
373+
if t.Cfg.Querier.DistributedExecEnabled {
374+
// set up querier server service and register it
375+
queryResultCache = distributed_execution.NewQueryResultCache()
376+
queryServer = distributed_execution.NewQuerierServer(queryResultCache)
377+
378+
go func() {
379+
// TODO: make expire time a config var
380+
ticker := time.NewTicker(5 * time.Minute)
381+
defer ticker.Stop()
382+
383+
for range ticker.C {
384+
queryResultCache.CleanExpired()
385+
}
386+
}()
387+
t.API.RegisterQuerierServer(queryServer)
388+
}
389+
366390
// Create a internal HTTP handler that is configured with the Prometheus API routes and points
367391
// to a Prometheus API struct instantiated with the Cortex Queryable.
368392
internalQuerierRouter := api.NewQuerierHandler(
369393
t.Cfg.API,
370394
t.QuerierQueryable,
371395
t.ExemplarQueryable,
372396
t.QuerierEngine,
397+
queryResultCache,
373398
t.MetadataQuerier,
374399
prometheus.DefaultRegisterer,
375400
util_log.Logger,
@@ -420,7 +445,33 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
420445
return nil, err
421446
}
422447
serverAddress := net.JoinHostPort(ipAddr, strconv.Itoa(t.Cfg.Server.HTTPListenPort))
423-
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer, serverAddress)
448+
449+
if t.Cfg.Querier.DistributedExecEnabled {
450+
poolConfig := client.PoolConfig{
451+
CheckInterval: 5 * time.Second,
452+
HealthCheckEnabled: true,
453+
HealthCheckTimeout: 1 * time.Second,
454+
}
455+
456+
querierClientsGauge := promauto.With(prometheus.DefaultRegisterer).NewGauge(prometheus.GaugeOpts{
457+
Name: "cortex_querier_service_clients",
458+
Help: "The current number of clients connected to this querier",
459+
})
460+
factory := func(addr string) (client.PoolClient, error) {
461+
return distributed_execution.CreateQuerierClient(t.Cfg.QueryScheduler.GRPCClientConfig, addr)
462+
}
463+
querierPool := client.NewPool("querier", poolConfig, nil, factory, querierClientsGauge, util_log.Logger)
464+
internalQuerierRouter = injectPool(internalQuerierRouter, querierPool)
465+
}
466+
467+
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer, serverAddress, t.Cfg.Querier.DistributedExecEnabled, queryResultCache)
468+
}
469+
470+
func injectPool(next http.Handler, pool *client.Pool) http.Handler {
471+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
472+
ctx := distributed_execution.ContextWithPool(r.Context(), pool)
473+
next.ServeHTTP(w, r.WithContext(ctx))
474+
})
424475
}
425476

426477
func (t *Cortex) initStoreQueryables() (services.Service, error) {
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package distributed_execution
2+
3+
import (
4+
"github.com/cortexproject/cortex/pkg/engine/distributed_execution/querierpb"
5+
"github.com/cortexproject/cortex/pkg/ring/client"
6+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
7+
otgrpc "github.com/opentracing-contrib/go-grpc"
8+
"github.com/opentracing/opentracing-go"
9+
"github.com/prometheus/prometheus/model/histogram"
10+
"github.com/weaveworks/common/middleware"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/health/grpc_health_v1"
13+
)
14+
15+
func CreateQuerierClient(grpcConfig grpcclient.Config, addr string) (client.PoolClient, error) {
16+
opts, err := grpcConfig.DialOption([]grpc.UnaryClientInterceptor{
17+
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
18+
middleware.ClientUserHeaderInterceptor,
19+
}, nil)
20+
21+
if err != nil {
22+
return nil, err
23+
}
24+
25+
conn, err := grpc.NewClient(addr, opts...)
26+
if err != nil {
27+
return nil, err
28+
}
29+
30+
return &querierClient{
31+
QuerierClient: querierpb.NewQuerierClient(conn),
32+
HealthClient: grpc_health_v1.NewHealthClient(conn),
33+
conn: conn,
34+
}, nil
35+
}
36+
37+
type querierClient struct {
38+
querierpb.QuerierClient
39+
grpc_health_v1.HealthClient
40+
conn *grpc.ClientConn
41+
}
42+
43+
func (qc *querierClient) Close() error {
44+
return qc.conn.Close()
45+
}
46+
47+
func FloatHistogramProtoToFloatHistograms(hps []querierpb.Histogram) []*histogram.FloatHistogram {
48+
floatHistograms := make([]*histogram.FloatHistogram, len(hps))
49+
for _, hp := range hps {
50+
newHist := FloatHistogramProtoToFloatHistogram(hp)
51+
floatHistograms = append(floatHistograms, newHist)
52+
}
53+
return floatHistograms
54+
}
55+
56+
func FloatHistogramProtoToFloatHistogram(hp querierpb.Histogram) *histogram.FloatHistogram {
57+
_, IsFloatHist := hp.GetCount().(*querierpb.Histogram_CountFloat)
58+
if !IsFloatHist {
59+
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
60+
}
61+
return &histogram.FloatHistogram{
62+
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
63+
Schema: hp.Schema,
64+
ZeroThreshold: hp.ZeroThreshold,
65+
ZeroCount: hp.GetZeroCountFloat(),
66+
Count: hp.GetCountFloat(),
67+
Sum: hp.Sum,
68+
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
69+
PositiveBuckets: hp.GetPositiveCounts(),
70+
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
71+
NegativeBuckets: hp.GetNegativeCounts(),
72+
}
73+
}
74+
75+
func spansProtoToSpans(s []querierpb.BucketSpan) []histogram.Span {
76+
spans := make([]histogram.Span, len(s))
77+
for i := 0; i < len(s); i++ {
78+
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
79+
}
80+
81+
return spans
82+
}

0 commit comments

Comments
 (0)