Skip to content

Commit ac9f641

Browse files
committed
allow logical plan fragment type for scheduler queue
Signed-off-by: rubywtl <[email protected]>
1 parent 52b9672 commit ac9f641

File tree

16 files changed

+1072
-106
lines changed

16 files changed

+1072
-106
lines changed

pkg/cortex/modules.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"fmt"
77

88
"log/slog"
9+
"net"
910
"net/http"
1011
"runtime"
1112
"runtime/debug"
13+
"strconv"
1214

1315
"github.com/go-kit/log/level"
1416
"github.com/opentracing-contrib/go-stdlib/nethttp"
@@ -414,7 +416,12 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
414416

415417
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
416418
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
417-
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer)
419+
ipAddr, err := ring.GetInstanceAddr(t.Cfg.Alertmanager.ShardingRing.InstanceAddr, t.Cfg.Alertmanager.ShardingRing.InstanceInterfaceNames, util_log.Logger)
420+
if err != nil {
421+
return nil, err
422+
}
423+
serverAddress := net.JoinHostPort(ipAddr, strconv.Itoa(t.Cfg.Server.GRPCListenPort))
424+
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer, serverAddress)
418425
}
419426

420427
func (t *Cortex) initStoreQueryables() (services.Service, error) {
@@ -813,7 +820,7 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) {
813820
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
814821
}
815822

816-
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
823+
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled)
817824
if err != nil {
818825
return nil, errors.Wrap(err, "query-scheduler init")
819826
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package distributed_execution
2+
3+
type FragmentKey struct {
4+
queryID uint64
5+
fragmentID uint64
6+
}
7+
8+
func MakeFragmentKey(queryID uint64, fragmentID uint64) *FragmentKey {
9+
return &FragmentKey{
10+
queryID: queryID,
11+
fragmentID: fragmentID,
12+
}
13+
}
14+
15+
func (f FragmentKey) GetQueryID() uint64 {
16+
return f.queryID
17+
}
18+
19+
func (f FragmentKey) GetFragmentID() uint64 {
20+
return f.fragmentID
21+
}

pkg/frontend/frontend_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
291291
go grpcServer.Serve(grpcListen) //nolint:errcheck
292292

293293
var worker services.Service
294-
worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler), logger, nil)
294+
worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler), logger, nil, "")
295295
require.NoError(t, err)
296296
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))
297297

pkg/frontend/v1/frontend_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
285285
go grpcServer.Serve(grpcListen) //nolint:errcheck
286286

287287
var worker services.Service
288-
worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler), logger, nil)
288+
worker, err = querier_worker.NewQuerierWorker(workerConfig, httpgrpc_server.NewServer(handler), logger, nil, "")
289289
require.NoError(t, err)
290290
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))
291291

pkg/querier/worker/scheduler_processor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/cortexproject/cortex/pkg/util/services"
3232
)
3333

34-
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) {
34+
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer, querierAddress string) (*schedulerProcessor, []services.Service) {
3535
p := &schedulerProcessor{
3636
log: log,
3737
handler: handler,
@@ -47,6 +47,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
4747
Help: "Time spend doing requests to frontend.",
4848
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
4949
}, []string{"operation", "status_code"}),
50+
querierAddress: querierAddress,
5051
}
5152

5253
frontendClientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
@@ -71,6 +72,7 @@ type schedulerProcessor struct {
7172
grpcConfig grpcclient.Config
7273
maxMessageSize int
7374
querierID string
75+
querierAddress string
7476

7577
frontendPool *client.Pool
7678
frontendClientRequestDuration *prometheus.HistogramVec
@@ -97,7 +99,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context,
9799
for backoff.Ongoing() {
98100
c, err := schedulerClient.QuerierLoop(ctx)
99101
if err == nil {
100-
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID})
102+
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID, QuerierAddress: sp.querierAddress})
101103
}
102104

103105
if err != nil {

pkg/querier/worker/scheduler_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func Test_ToShowNotPanic_RelatedIssue6599(t *testing.T) {
144144
go stat.AddFetchedChunkBytes(10)
145145
}).Return(&httpgrpc.HTTPResponse{}, nil)
146146

147-
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil)
147+
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil, "")
148148
schedulerClient := &mockSchedulerForQuerierClient{}
149149
schedulerClient.On("QuerierLoop", mock.Anything, mock.Anything).Return(querierLoopClient, nil)
150150

pkg/querier/worker/worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ type querierWorker struct {
9191
managers map[string]*processorManager
9292
}
9393

94-
func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (services.Service, error) {
94+
func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer, querierAddr string) (services.Service, error) {
9595
if cfg.QuerierID == "" {
9696
hostname, err := os.Hostname()
9797
if err != nil {
@@ -109,7 +109,7 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr
109109
level.Info(log).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress)
110110

111111
address = cfg.SchedulerAddress
112-
processor, servs = newSchedulerProcessor(cfg, handler, log, reg)
112+
processor, servs = newSchedulerProcessor(cfg, handler, log, reg, querierAddr)
113113

114114
case cfg.FrontendAddress != "":
115115
level.Info(log).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress)
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package plan_fragments
2+
3+
import (
4+
"sync"
5+
6+
"github.com/cortexproject/cortex/pkg/distributed_execution"
7+
)
8+
9+
type FragmentTable struct {
10+
mappings map[distributed_execution.FragmentKey]string
11+
mu sync.RWMutex
12+
}
13+
14+
func NewFragmentTable() *FragmentTable {
15+
return &FragmentTable{
16+
mappings: make(map[distributed_execution.FragmentKey]string),
17+
}
18+
}
19+
20+
func (f *FragmentTable) AddMapping(queryID uint64, fragmentID uint64, addr string) {
21+
f.mu.Lock()
22+
defer f.mu.Unlock()
23+
24+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
25+
f.mappings[*key] = addr
26+
}
27+
28+
func (f *FragmentTable) GetMappings(queryID uint64, fragmentIDs []uint64) ([]string, bool) {
29+
f.mu.RLock()
30+
defer f.mu.RUnlock()
31+
32+
addresses := make([]string, 0, len(fragmentIDs))
33+
34+
for _, fragmentID := range fragmentIDs {
35+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
36+
if addr, ok := f.mappings[*key]; ok {
37+
addresses = append(addresses, addr)
38+
} else {
39+
return nil, false
40+
}
41+
}
42+
return addresses, true
43+
}
44+
45+
func (f *FragmentTable) GetMapping(queryID uint64, fragmentID uint64) (string, bool) {
46+
f.mu.RLock()
47+
defer f.mu.RUnlock()
48+
49+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
50+
if addr, ok := f.mappings[*key]; ok {
51+
return addr, true
52+
}
53+
return "", false
54+
}
55+
56+
func (f *FragmentTable) ClearMappings(queryID uint64) {
57+
f.mu.Lock()
58+
defer f.mu.Unlock()
59+
60+
keysToDelete := make([]distributed_execution.FragmentKey, 0)
61+
for key := range f.mappings {
62+
if key.GetQueryID() == queryID {
63+
keysToDelete = append(keysToDelete, key)
64+
}
65+
}
66+
67+
for _, key := range keysToDelete {
68+
delete(f.mappings, key)
69+
}
70+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package plan_fragments
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
// TestSchedulerCoordination checks whether the hashtable for fragment-querier mapping gives the expected value
12+
// It also checks if it remains functional and accurate during a multi-thread/concurrent read & write situation
13+
func TestSchedulerCoordination(t *testing.T) {
14+
t.Run("basic operations", func(t *testing.T) {
15+
table := NewFragmentTable()
16+
table.AddMapping(uint64(0), uint64(1), "localhost:8000")
17+
table.AddMapping(uint64(0), uint64(2), "localhost:8001")
18+
19+
result, exist := table.GetMappings(uint64(0), []uint64{1, 2})
20+
require.True(t, exist)
21+
require.Equal(t, []string{"localhost:8000", "localhost:8001"}, result)
22+
23+
result, exist = table.GetMappings(uint64(0), []uint64{1, 3})
24+
require.False(t, exist)
25+
require.Empty(t, result)
26+
27+
result, exist = table.GetMappings(uint64(0), []uint64{1})
28+
require.True(t, exist)
29+
require.Equal(t, []string{"localhost:8000"}, result)
30+
31+
table.ClearMappings(uint64(0))
32+
result, exist = table.GetMappings(uint64(0), []uint64{1})
33+
require.False(t, exist)
34+
require.Empty(t, result)
35+
})
36+
37+
t.Run("concurrent operations", func(t *testing.T) {
38+
table := NewFragmentTable()
39+
const numGoroutines = 10
40+
const numOperations = 100
41+
42+
var wg sync.WaitGroup
43+
wg.Add(numGoroutines * 3)
44+
45+
// write
46+
for i := 0; i < numGoroutines; i++ {
47+
go func(routine int) {
48+
defer wg.Done()
49+
for j := 0; j < numOperations; j++ {
50+
queryID := uint64(routine)
51+
fragmentID := uint64(j)
52+
addr := fmt.Sprintf("localhost:%d", j)
53+
table.AddMapping(queryID, fragmentID, addr)
54+
}
55+
}(i)
56+
}
57+
58+
// read
59+
for i := 0; i < numGoroutines; i++ {
60+
go func(routine int) {
61+
defer wg.Done()
62+
for j := 0; j < numOperations; j++ {
63+
queryID := uint64(routine)
64+
fragmentIDs := []uint64{uint64(j)}
65+
table.GetMappings(queryID, fragmentIDs)
66+
}
67+
}(i)
68+
}
69+
70+
// clear
71+
for i := 0; i < numGoroutines; i++ {
72+
go func(routine int) {
73+
defer wg.Done()
74+
for j := 0; j < numOperations; j++ {
75+
queryID := uint64(routine)
76+
table.ClearMappings(queryID)
77+
}
78+
}(i)
79+
}
80+
81+
wg.Wait()
82+
})
83+
84+
t.Run("edge cases", func(t *testing.T) {
85+
table := NewFragmentTable()
86+
87+
// test empty fragment IDs
88+
result, exist := table.GetMappings(0, []uint64{})
89+
require.True(t, exist)
90+
require.Empty(t, result)
91+
92+
// test clearing non-existent query
93+
table.ClearMappings(999)
94+
require.NotPanics(t, func() {
95+
table.ClearMappings(999)
96+
})
97+
98+
// test overwriting mapping
99+
table.AddMapping(1, 1, "addr1")
100+
table.AddMapping(1, 1, "addr2")
101+
result, exist = table.GetMappings(1, []uint64{1})
102+
require.True(t, exist)
103+
require.Equal(t, []string{"addr2"}, result)
104+
105+
// test multiple queries
106+
table.AddMapping(1, 1, "addr1")
107+
table.AddMapping(2, 1, "addr2")
108+
result, exist = table.GetMappings(1, []uint64{1})
109+
require.True(t, exist)
110+
require.Equal(t, []string{"addr1"}, result)
111+
112+
result, exist = table.GetMappings(2, []uint64{1})
113+
require.True(t, exist)
114+
require.Equal(t, []string{"addr2"}, result)
115+
})
116+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package plan_fragments
2+
3+
import "github.com/thanos-io/promql-engine/logicalplan"
4+
5+
type Fragmenter interface {
6+
Fragment(node logicalplan.Node) ([]Fragment, error)
7+
getNewID() uint64
8+
}
9+
10+
type DummyFragmenter struct {
11+
}
12+
13+
func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) {
14+
// simple logic without distributed optimizer
15+
return []Fragment{
16+
{
17+
Node: node,
18+
FragmentID: f.getNewID(),
19+
ChildIDs: []uint64{},
20+
IsRoot: true,
21+
},
22+
}, nil
23+
}
24+
25+
func (f *DummyFragmenter) getNewID() uint64 {
26+
return 1 // for dummy plan_fragments testing
27+
}
28+
29+
type Fragment struct {
30+
Node logicalplan.Node
31+
FragmentID uint64
32+
ChildIDs []uint64
33+
IsRoot bool
34+
}
35+
36+
func (s *Fragment) IsEmpty() bool {
37+
if s.Node != nil {
38+
return false
39+
}
40+
if s.FragmentID != 0 {
41+
return false
42+
}
43+
if s.IsRoot {
44+
return false
45+
}
46+
if len(s.ChildIDs) != 0 {
47+
return false
48+
}
49+
return true
50+
}
51+
52+
func NewDummyFragmenter() Fragmenter {
53+
return &DummyFragmenter{}
54+
}

0 commit comments

Comments
 (0)