Skip to content

Commit bef2d02

Browse files
committed
able to pick-up fragments and executed by individual queriers
1 parent 6ed74f6 commit bef2d02

File tree

19 files changed

+648
-246
lines changed

19 files changed

+648
-246
lines changed

pkg/api/queryapi/query_api.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
7-
"github.com/cortexproject/cortex/pkg/scheduler/plan_fragments"
87
"net/http"
98
"strconv"
109
"time"
@@ -141,10 +140,10 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
141140
ctx = httputil.ContextFromRequest(ctx, r)
142141

143142
// TODO: if distributed exec enabled
144-
isRoot, queryID, fragmentID, _ := plan_fragments.ExtractFragmentMetaData(ctx)
143+
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
145144
if !isRoot {
146145
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
147-
q.queryResultCache.InitWriting(key)
146+
q.queryResultCache.InitWriting(*key)
148147
}
149148

150149
res := qry.Exec(ctx)
@@ -190,10 +189,10 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
190189
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
191190

192191
// TODO: if distributed exec enabled
193-
isRoot, queryID, fragmentID, _ := plan_fragments.ExtractFragmentMetaData(ctx)
192+
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
194193
if !isRoot {
195194
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
196-
q.queryResultCache.InitWriting(key)
195+
q.queryResultCache.InitWriting(*key)
197196
}
198197

199198
var qry promql.Query
@@ -205,15 +204,15 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
205204
if err != nil {
206205
if !isRoot {
207206
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
208-
q.queryResultCache.SetError(key)
207+
q.queryResultCache.SetError(*key)
209208
}
210209
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
211210
}
212211
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query"))
213212
if err != nil {
214213
if !isRoot {
215214
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
216-
q.queryResultCache.SetError(key)
215+
q.queryResultCache.SetError(*key)
217216
}
218217
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create instant query from logical plan: %v", err)}, nil, nil}
219218
}
@@ -267,14 +266,14 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
267266
if result.data != nil {
268267
// TODO: if distributed exec enabled
269268
ctx := httputil.ContextFromRequest(r.Context(), r)
270-
isRoot, queryID, fragmentID, _ := plan_fragments.ExtractFragmentMetaData(ctx)
269+
isRoot, queryID, fragmentID, _, _ := distributed_execution.ExtractFragmentMetaData(ctx)
271270
if !isRoot {
272271
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
273272
result := distributed_execution.FragmentResult{
274273
Data: result.data,
275274
Expiration: time.Now().Add(time.Hour),
276275
}
277-
q.queryResultCache.SetComplete(key, result)
276+
q.queryResultCache.SetComplete(*key, result)
278277
return
279278
}
280279
q.respond(w, r, result.data, result.warnings, r.FormValue("query"))

pkg/cortex/modules.go

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import (
66
"fmt"
77
"github.com/cortexproject/cortex/pkg/engine/distributed_execution"
88
"github.com/cortexproject/cortex/pkg/ring/client"
9-
"github.com/prometheus/client_golang/prometheus/promauto"
109
"log/slog"
1110
"net"
1211
"net/http"
1312
"runtime"
1413
"runtime/debug"
1514
"strconv"
15+
"strings"
1616
"time"
1717

1818
"github.com/go-kit/log/level"
@@ -366,6 +366,15 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
366366
// │ │
367367
// └──────────────────┘
368368
func (t *Cortex) initQuerier() (serv services.Service, err error) {
369+
370+
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
371+
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
372+
ipAddr, err := ring.GetInstanceAddr(t.Cfg.Alertmanager.ShardingRing.InstanceAddr, t.Cfg.Alertmanager.ShardingRing.InstanceInterfaceNames, util_log.Logger)
373+
if err != nil {
374+
return nil, err
375+
}
376+
serverAddress := net.JoinHostPort(ipAddr, strconv.Itoa(t.Cfg.Server.GRPCListenPort))
377+
369378
// Create new map for caching partial results during distributed execution
370379
var queryResultCache *distributed_execution.QueryResultCache
371380
var queryServer *distributed_execution.QuerierServer
@@ -438,37 +447,22 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
438447
return nil, nil
439448
}
440449

441-
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
442-
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
443-
ipAddr, err := ring.GetInstanceAddr(t.Cfg.Alertmanager.ShardingRing.InstanceAddr, t.Cfg.Alertmanager.ShardingRing.InstanceInterfaceNames, util_log.Logger)
444-
if err != nil {
445-
return nil, err
446-
}
447-
serverAddress := net.JoinHostPort(ipAddr, strconv.Itoa(t.Cfg.Server.HTTPListenPort))
448-
449450
if t.Cfg.Querier.DistributedExecEnabled {
450-
poolConfig := client.PoolConfig{
451-
CheckInterval: 5 * time.Second,
452-
HealthCheckEnabled: true,
453-
HealthCheckTimeout: 1 * time.Second,
454-
}
455-
456-
querierClientsGauge := promauto.With(prometheus.DefaultRegisterer).NewGauge(prometheus.GaugeOpts{
457-
Name: "cortex_querier_service_clients",
458-
Help: "The current number of clients connected to this querier",
459-
})
460-
factory := func(addr string) (client.PoolClient, error) {
461-
return distributed_execution.CreateQuerierClient(t.Cfg.QueryScheduler.GRPCClientConfig, addr)
462-
}
463-
querierPool := client.NewPool("querier", poolConfig, nil, factory, querierClientsGauge, util_log.Logger)
451+
querierPool := distributed_execution.NewQuerierPool(t.Cfg.QueryScheduler.GRPCClientConfig, prometheus.DefaultRegisterer, util_log.Logger)
464452
internalQuerierRouter = injectPool(internalQuerierRouter, querierPool)
453+
//go watchQuerierRingAndUpdatePool(context.Background(), t.Ring, querierPool)
465454
}
466455

467456
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer, serverAddress, t.Cfg.Querier.DistributedExecEnabled, queryResultCache)
468457
}
469458

470459
func injectPool(next http.Handler, pool *client.Pool) http.Handler {
471460
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
461+
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
462+
ctx := distributed_execution.ContextWithPool(r.Context(), pool)
463+
next.ServeHTTP(w, r.WithContext(ctx))
464+
return
465+
}
472466
ctx := distributed_execution.ContextWithPool(r.Context(), pool)
473467
next.ServeHTTP(w, r.WithContext(ctx))
474468
})
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package distributed_execution
2+
3+
import (
4+
"context"
5+
)
6+
7+
type fragmentMetadataKey struct{}
8+
9+
type fragmentMetadata struct {
10+
queryID uint64
11+
fragmentID uint64
12+
childIDToAddr map[uint64]string
13+
isRoot bool
14+
}
15+
16+
func InjectFragmentMetaData(ctx context.Context, fragmentID uint64, queryID uint64, isRoot bool, childIDs []uint64, childAddr []string) context.Context {
17+
childIDToAddr := make(map[uint64]string)
18+
for i, childID := range childIDs {
19+
childIDToAddr[childID] = childAddr[i]
20+
}
21+
22+
return context.WithValue(ctx, fragmentMetadataKey{}, fragmentMetadata{
23+
queryID: queryID,
24+
fragmentID: fragmentID,
25+
childIDToAddr: childIDToAddr,
26+
isRoot: isRoot,
27+
})
28+
}
29+
30+
func ExtractFragmentMetaData(ctx context.Context) (isRoot bool, queryID uint64, fragmentID uint64, childAddrs map[uint64]string, ok bool) {
31+
metadata, ok := ctx.Value(fragmentMetadataKey{}).(fragmentMetadata)
32+
if !ok {
33+
return false, 0, 0, nil, false
34+
}
35+
return metadata.isRoot, metadata.queryID, metadata.fragmentID, metadata.childIDToAddr, true
36+
}

pkg/scheduler/plan_fragments/id_test.go renamed to pkg/engine/distributed_execution/id_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
package plan_fragments
1+
package distributed_execution
22

33
import (
44
"context"
5+
"github.com/cortexproject/cortex/pkg/scheduler/plan_fragments"
56
"github.com/stretchr/testify/require"
67
"testing"
78
)
@@ -13,13 +14,13 @@ func TestFragmentMetadata(t *testing.T) {
1314
t.Run("basic injection and extraction", func(t *testing.T) {
1415

1516
ctx := context.Background()
16-
fragment := Fragment{
17+
fragment := plan_fragments.Fragment{
1718
FragmentID: 123,
1819
IsRoot: true,
1920
}
2021
queryID := uint64(456)
2122

22-
ctx = InjectFragmentMetaData(ctx, fragment, queryID)
23+
ctx = InjectFragmentMetaData(ctx, fragment.FragmentID, queryID, fragment.IsRoot, fragment.ChildIDs)
2324

2425
isRoot, qID, fID, ok := ExtractFragmentMetaData(ctx)
2526
require.True(t, ok)
@@ -40,8 +41,8 @@ func TestFragmentMetadata(t *testing.T) {
4041

4142
t.Run("zero values", func(t *testing.T) {
4243
ctx := context.Background()
43-
fragment := Fragment{}
44-
ctx = InjectFragmentMetaData(ctx, fragment, 0)
44+
fragment := plan_fragments.Fragment{}
45+
ctx = InjectFragmentMetaData(ctx, fragment.FragmentID, 0, fragment.IsRoot, fragment.ChildIDs)
4546

4647
isRoot, queryID, fragmentID, ok := ExtractFragmentMetaData(ctx)
4748
require.True(t, ok)

pkg/engine/distributed_execution/logicalplan.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ package distributed_execution
33
import (
44
"bytes"
55
"encoding/json"
6-
"math"
7-
86
"github.com/prometheus/prometheus/model/labels"
97
"github.com/thanos-io/promql-engine/logicalplan"
8+
"math"
109
)
1110

1211
type jsonNode struct {
@@ -21,6 +20,46 @@ const (
2120
negInfVal = `"-Inf"`
2221
)
2322

23+
func Marshal(node Node) ([]byte, error) {
24+
clone := node.Clone()
25+
return marshalNode(clone)
26+
}
27+
28+
func marshalNode(node Node) ([]byte, error) {
29+
children := make([]json.RawMessage, 0, len(node.Children()))
30+
for _, c := range node.Children() {
31+
childData, err := marshalNode(*c)
32+
if err != nil {
33+
return nil, err
34+
}
35+
children = append(children, childData)
36+
}
37+
var data json.RawMessage = nil
38+
if n, ok := node.(*logicalplan.NumberLiteral); ok {
39+
if math.IsInf(n.Val, 1) {
40+
data = json.RawMessage(infVal)
41+
}
42+
if math.IsInf(n.Val, -1) {
43+
data = json.RawMessage(negInfVal)
44+
}
45+
if math.IsNaN(n.Val) {
46+
data = json.RawMessage(nanVal)
47+
}
48+
}
49+
if data == nil {
50+
var err error
51+
data, err = json.Marshal(node)
52+
if err != nil {
53+
return nil, err
54+
}
55+
}
56+
return json.Marshal(jsonNode{
57+
Type: node.Type(),
58+
Data: data,
59+
Children: children,
60+
})
61+
}
62+
2463
func Unmarshal(data []byte) (logicalplan.Node, error) {
2564
return unmarshalNode(data)
2665
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package distributed_execution

pkg/engine/distributed_execution/querier_service_client.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@ import (
44
"github.com/cortexproject/cortex/pkg/engine/distributed_execution/querierpb"
55
"github.com/cortexproject/cortex/pkg/ring/client"
66
"github.com/cortexproject/cortex/pkg/util/grpcclient"
7+
cortexmiddleware "github.com/cortexproject/cortex/pkg/util/middleware"
8+
"github.com/go-kit/log"
79
otgrpc "github.com/opentracing-contrib/go-grpc"
810
"github.com/opentracing/opentracing-go"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/prometheus/client_golang/prometheus/promauto"
913
"github.com/prometheus/prometheus/model/histogram"
1014
"github.com/weaveworks/common/middleware"
1115
"google.golang.org/grpc"
1216
"google.golang.org/grpc/health/grpc_health_v1"
17+
"time"
1318
)
1419

1520
func CreateQuerierClient(grpcConfig grpcclient.Config, addr string) (client.PoolClient, error) {
@@ -44,6 +49,60 @@ func (qc *querierClient) Close() error {
4449
return qc.conn.Close()
4550
}
4651

52+
func NewQuerierPool(cfg grpcclient.Config, reg prometheus.Registerer, log log.Logger) *client.Pool {
53+
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
54+
Name: "cortex_querier_query_request_duration_seconds",
55+
Help: "Time spent doing requests to querier.",
56+
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
57+
}, []string{"operation", "status_code"})
58+
59+
clientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
60+
Name: "cortex_querier_query_clients",
61+
Help: "The current number of clients connected to querier.",
62+
})
63+
64+
poolConfig := client.PoolConfig{
65+
CheckInterval: time.Minute,
66+
HealthCheckEnabled: true,
67+
HealthCheckTimeout: 10 * time.Second,
68+
}
69+
70+
q := &querierPool{
71+
grpcConfig: cfg,
72+
requestDuration: requestDuration,
73+
}
74+
75+
return client.NewPool("querier", poolConfig, nil, q.createQuerierClient, clientsGauge, log)
76+
}
77+
78+
type querierPool struct {
79+
grpcConfig grpcclient.Config
80+
requestDuration *prometheus.HistogramVec
81+
}
82+
83+
func (q *querierPool) createQuerierClient(addr string) (client.PoolClient, error) {
84+
opts, err := q.grpcConfig.DialOption([]grpc.UnaryClientInterceptor{
85+
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
86+
middleware.ClientUserHeaderInterceptor,
87+
cortexmiddleware.PrometheusGRPCUnaryInstrumentation(q.requestDuration),
88+
}, nil)
89+
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
conn, err := grpc.NewClient(addr, opts...)
95+
if err != nil {
96+
return nil, err
97+
}
98+
99+
return &querierClient{
100+
QuerierClient: querierpb.NewQuerierClient(conn),
101+
HealthClient: grpc_health_v1.NewHealthClient(conn),
102+
conn: conn,
103+
}, nil
104+
}
105+
47106
func FloatHistogramProtoToFloatHistograms(hps []querierpb.Histogram) []*histogram.FloatHistogram {
48107
floatHistograms := make([]*histogram.FloatHistogram, len(hps))
49108
for _, hp := range hps {

0 commit comments

Comments
 (0)