Skip to content

Commit 4f75333

Browse files
committed
allow logical plan fragment type for scheduler queue
Signed-off-by: rubywtl <[email protected]>
1 parent 52b9672 commit 4f75333

File tree

15 files changed

+1150
-106
lines changed

15 files changed

+1150
-106
lines changed

pkg/cortex/modules.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
87
"log/slog"
98
"net/http"
109
"runtime"
@@ -414,6 +413,9 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
414413

415414
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
416415
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
416+
417+
t.Cfg.Worker.QuerierRing.ListenPort = t.Cfg.Server.GRPCListenPort
418+
417419
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer)
418420
}
419421

@@ -813,7 +815,7 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) {
813815
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
814816
}
815817

816-
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
818+
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled)
817819
if err != nil {
818820
return nil, errors.Wrap(err, "query-scheduler init")
819821
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package distributed_execution
2+
3+
// FragmentKey uniquely identifies a fragment of a distributed logical query plan.
4+
// It combines a queryID (to identify the overall query) and a fragmentID
5+
// (to identify the specific fragment within that query).
6+
type FragmentKey struct {
7+
// QueryID identifies the distributed query this fragment belongs to
8+
queryID uint64
9+
10+
// FragmentID identifies this specific fragment within the query
11+
fragmentID uint64
12+
}
13+
14+
// MakeFragmentKey creates a new FragmentKey with the given queryID and fragmentID.
15+
// It's used to track and identify fragments during distributed query execution.
16+
func MakeFragmentKey(queryID uint64, fragmentID uint64) FragmentKey {
17+
return FragmentKey{
18+
queryID: queryID,
19+
fragmentID: fragmentID,
20+
}
21+
}
22+
23+
// GetQueryID returns the queryID for the current key
24+
// This ID is shared across all fragments of the same distributed query.
25+
func (f FragmentKey) GetQueryID() uint64 {
26+
return f.queryID
27+
}
28+
29+
// GetFragmentID returns the ID for this specific fragment
30+
// within its parent query.
31+
func (f FragmentKey) GetFragmentID() uint64 {
32+
return f.fragmentID
33+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package plan_fragments
2+
3+
import "github.com/thanos-io/promql-engine/logicalplan"
4+
5+
type Fragmenter interface {
6+
Fragment(node logicalplan.Node) ([]Fragment, error)
7+
}
8+
9+
type DummyFragmenter struct {
10+
}
11+
12+
func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) {
13+
// simple logic without distributed optimizer
14+
return []Fragment{
15+
{
16+
Node: node,
17+
FragmentID: uint64(1),
18+
ChildIDs: []uint64{},
19+
IsRoot: true,
20+
},
21+
}, nil
22+
}
23+
24+
type Fragment struct {
25+
Node logicalplan.Node
26+
FragmentID uint64
27+
ChildIDs []uint64
28+
IsRoot bool
29+
}
30+
31+
func (s *Fragment) IsEmpty() bool {
32+
if s.Node != nil {
33+
return false
34+
}
35+
if s.FragmentID != 0 {
36+
return false
37+
}
38+
if s.IsRoot {
39+
return false
40+
}
41+
if len(s.ChildIDs) != 0 {
42+
return false
43+
}
44+
return true
45+
}
46+
47+
func NewDummyFragmenter() Fragmenter {
48+
return &DummyFragmenter{}
49+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package plan_fragments
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/cortexproject/cortex/pkg/util/logical_plan"
10+
)
11+
12+
func TestFragmenter(t *testing.T) {
13+
type testCase struct {
14+
name string
15+
query string
16+
start time.Time
17+
end time.Time
18+
expectedFragments int
19+
}
20+
21+
now := time.Now()
22+
23+
// more tests will be added when distributed optimizer and fragmenter are implemented
24+
tests := []testCase{
25+
{
26+
name: "simple logical query plan - no fragmentation",
27+
query: "up",
28+
start: now,
29+
end: now,
30+
expectedFragments: 1,
31+
},
32+
}
33+
34+
for _, tc := range tests {
35+
t.Run(tc.name, func(t *testing.T) {
36+
lp, err := logical_plan.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
37+
require.NoError(t, err)
38+
39+
fragmenter := NewDummyFragmenter()
40+
res, err := fragmenter.Fragment((*lp).Root())
41+
42+
require.NoError(t, err)
43+
require.Equal(t, tc.expectedFragments, len(res))
44+
})
45+
}
46+
}

pkg/querier/querier.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ type Config struct {
9696
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"`
9797
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"`
9898
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"`
99-
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
99+
100+
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
100101
}
101102

102103
var (

pkg/querier/worker/scheduler_processor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/cortexproject/cortex/pkg/util/services"
3232
)
3333

34-
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) {
34+
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer, querierAddress string) (*schedulerProcessor, []services.Service) {
3535
p := &schedulerProcessor{
3636
log: log,
3737
handler: handler,
@@ -47,6 +47,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
4747
Help: "Time spend doing requests to frontend.",
4848
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
4949
}, []string{"operation", "status_code"}),
50+
querierAddress: querierAddress,
5051
}
5152

5253
frontendClientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
@@ -71,6 +72,7 @@ type schedulerProcessor struct {
7172
grpcConfig grpcclient.Config
7273
maxMessageSize int
7374
querierID string
75+
querierAddress string
7476

7577
frontendPool *client.Pool
7678
frontendClientRequestDuration *prometheus.HistogramVec
@@ -97,7 +99,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context,
9799
for backoff.Ongoing() {
98100
c, err := schedulerClient.QuerierLoop(ctx)
99101
if err == nil {
100-
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID})
102+
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID, QuerierAddress: sp.querierAddress})
101103
}
102104

103105
if err != nil {

pkg/querier/worker/scheduler_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func Test_ToShowNotPanic_RelatedIssue6599(t *testing.T) {
144144
go stat.AddFetchedChunkBytes(10)
145145
}).Return(&httpgrpc.HTTPResponse{}, nil)
146146

147-
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil)
147+
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil, "")
148148
schedulerClient := &mockSchedulerForQuerierClient{}
149149
schedulerClient.On("QuerierLoop", mock.Anything, mock.Anything).Return(querierLoopClient, nil)
150150

pkg/querier/worker/worker.go

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package worker
33
import (
44
"context"
55
"flag"
6+
"fmt"
7+
"net"
68
"os"
9+
"strconv"
710
"sync"
811
"time"
912

@@ -14,7 +17,10 @@ import (
1417
"github.com/weaveworks/common/httpgrpc"
1518
"google.golang.org/grpc"
1619

20+
"github.com/cortexproject/cortex/pkg/ring"
21+
"github.com/cortexproject/cortex/pkg/ring/kv"
1722
"github.com/cortexproject/cortex/pkg/util"
23+
"github.com/cortexproject/cortex/pkg/util/flagext"
1824
"github.com/cortexproject/cortex/pkg/util/grpcclient"
1925
"github.com/cortexproject/cortex/pkg/util/services"
2026
)
@@ -33,9 +39,71 @@ type Config struct {
3339
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
3440

3541
TargetHeaders []string `yaml:"-"` // Propagated by config.
42+
43+
QuerierRing RingConfig `yaml:"ring"`
44+
}
45+
46+
type RingConfig struct {
47+
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
48+
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
49+
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
50+
ReplicationFactor int `yaml:"replication_factor"`
51+
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
52+
TokensFilePath string `yaml:"tokens_file_path"`
53+
DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"`
54+
55+
FinalSleep time.Duration `yaml:"final_sleep"`
56+
WaitInstanceStateTimeout time.Duration `yaml:"wait_instance_state_timeout"`
57+
KeepInstanceInTheRingOnShutdown bool `yaml:"keep_instance_in_the_ring_on_shutdown"`
58+
59+
// Instance details
60+
InstanceID string `yaml:"instance_id" doc:"hidden"`
61+
InstanceInterfaceNames []string `yaml:"instance_interface_names"`
62+
InstancePort int `yaml:"instance_port" doc:"hidden"`
63+
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`
64+
InstanceZone string `yaml:"instance_availability_zone"`
65+
66+
NumTokens int `yaml:"num_tokens"`
67+
68+
// Injected internally
69+
ListenPort int `yaml:"-"`
70+
RingCheckPeriod time.Duration `yaml:"-"`
71+
72+
// Used for testing
73+
SkipUnregister bool `yaml:"-"`
74+
}
75+
76+
// RegisterFlags adds the flags required to config this to the given FlagSet
77+
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
78+
hostname, err := os.Hostname()
79+
if err != nil {
80+
panic(fmt.Errorf("failed to get hostname, %w", err))
81+
}
82+
83+
// Ring flags
84+
cfg.KVStore.RegisterFlagsWithPrefix("querier.ring.", "querier/", f)
85+
f.DurationVar(&cfg.HeartbeatPeriod, "querier.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
86+
f.DurationVar(&cfg.HeartbeatTimeout, "querier.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring. 0 = never (timeout disabled).")
87+
f.DurationVar(&cfg.FinalSleep, "querier.ring.final-sleep", 0*time.Second, "The sleep seconds when ruler is shutting down. Need to be close to or larger than KV Store information propagation delay")
88+
f.IntVar(&cfg.ReplicationFactor, "querier.ring.replication-factor", 1, "EXPERIMENTAL: The replication factor to use when loading rule groups for API HA.")
89+
f.BoolVar(&cfg.ZoneAwarenessEnabled, "querier.ring.zone-awareness-enabled", false, "EXPERIMENTAL: True to enable zone-awareness and load rule groups across different availability zones for API HA.")
90+
f.StringVar(&cfg.TokensFilePath, "querier.ring.tokens-file-path", "", "EXPERIMENTAL: File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
91+
f.BoolVar(&cfg.DetailedMetricsEnabled, "querier.ring.detailed-metrics-enabled", true, "Set to true to enable ring detailed metrics. These metrics provide detailed information, such as token count and ownership per tenant. Disabling them can significantly decrease the number of metrics emitted.")
92+
93+
// Instance flags
94+
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
95+
f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "querier.ring.instance-interface-names", "Name of network interface to read address from.")
96+
f.StringVar(&cfg.InstanceAddr, "querier.ring.instance-addr", "", "IP address to advertise in the ring.")
97+
f.IntVar(&cfg.InstancePort, "querier.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).")
98+
f.StringVar(&cfg.InstanceID, "querier.ring.instance-id", hostname, "Instance ID to register in the ring.")
99+
f.StringVar(&cfg.InstanceZone, "querier.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.")
100+
f.IntVar(&cfg.NumTokens, "querier.ring.num-tokens", 128, "Number of tokens for each ruler.")
101+
f.BoolVar(&cfg.KeepInstanceInTheRingOnShutdown, "querier.ring.keep-instance-in-the-ring-on-shutdown", false, "Keep instance in the ring on shut down.")
36102
}
37103

38104
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
105+
cfg.QuerierRing.RegisterFlags(f)
106+
39107
f.StringVar(&cfg.SchedulerAddress, "querier.scheduler-address", "", "Hostname (and port) of scheduler that querier will periodically resolve, connect to and receive queries from. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")
40108
f.StringVar(&cfg.FrontendAddress, "querier.frontend-address", "", "Address of query frontend service, in host:port format. If -querier.scheduler-address is set as well, querier will use scheduler instead. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")
41109

@@ -109,7 +177,14 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr
109177
level.Info(log).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress)
110178

111179
address = cfg.SchedulerAddress
112-
processor, servs = newSchedulerProcessor(cfg, handler, log, reg)
180+
181+
ipAddr, err := ring.GetInstanceAddr(cfg.QuerierRing.InstanceAddr, cfg.QuerierRing.InstanceInterfaceNames, log)
182+
if err != nil {
183+
return nil, err
184+
}
185+
querierAddr := net.JoinHostPort(ipAddr, strconv.Itoa(cfg.QuerierRing.ListenPort))
186+
187+
processor, servs = newSchedulerProcessor(cfg, handler, log, reg, querierAddr)
113188

114189
case cfg.FrontendAddress != "":
115190
level.Info(log).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress)
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package fragment_table
2+
3+
import (
4+
"sync"
5+
6+
"github.com/cortexproject/cortex/pkg/distributed_execution"
7+
)
8+
9+
type FragmentTable struct {
10+
mappings map[distributed_execution.FragmentKey]string
11+
mu sync.RWMutex
12+
}
13+
14+
func NewFragmentTable() *FragmentTable {
15+
return &FragmentTable{
16+
mappings: make(map[distributed_execution.FragmentKey]string),
17+
}
18+
}
19+
20+
func (f *FragmentTable) AddMapping(queryID uint64, fragmentID uint64, addr string) {
21+
f.mu.Lock()
22+
defer f.mu.Unlock()
23+
24+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
25+
f.mappings[key] = addr
26+
}
27+
28+
func (f *FragmentTable) GetAllChildAddresses(queryID uint64, fragmentIDs []uint64) ([]string, bool) {
29+
f.mu.RLock()
30+
defer f.mu.RUnlock()
31+
32+
addresses := make([]string, 0, len(fragmentIDs))
33+
34+
for _, fragmentID := range fragmentIDs {
35+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
36+
if addr, ok := f.mappings[key]; ok {
37+
addresses = append(addresses, addr)
38+
} else {
39+
return nil, false
40+
}
41+
}
42+
return addresses, true
43+
}
44+
45+
func (f *FragmentTable) GetChildAddr(queryID uint64, fragmentID uint64) (string, bool) {
46+
f.mu.RLock()
47+
defer f.mu.RUnlock()
48+
49+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
50+
if addr, ok := f.mappings[key]; ok {
51+
return addr, true
52+
}
53+
return "", false
54+
}
55+
56+
func (f *FragmentTable) ClearMappings(queryID uint64) {
57+
f.mu.Lock()
58+
defer f.mu.Unlock()
59+
60+
keysToDelete := make([]distributed_execution.FragmentKey, 0)
61+
for key := range f.mappings {
62+
if key.GetQueryID() == queryID {
63+
keysToDelete = append(keysToDelete, key)
64+
}
65+
}
66+
67+
for _, key := range keysToDelete {
68+
delete(f.mappings, key)
69+
}
70+
}

0 commit comments

Comments
 (0)