Skip to content

Commit 942674c

Browse files
committed
allow logical plan fragment type for scheduler queue
Signed-off-by: rubywtl <[email protected]>
1 parent 52b9672 commit 942674c

File tree

16 files changed

+1265
-106
lines changed

16 files changed

+1265
-106
lines changed

docs/configuration/config-file-reference.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2940,6 +2940,7 @@ The `consul_config` configures the consul client. The supported CLI flags `<pref
29402940
- `distributor.ha-tracker`
29412941
- `distributor.ring`
29422942
- `parquet-converter.ring`
2943+
- `querier.ring`
29432944
- `ruler.ring`
29442945
- `store-gateway.sharding-ring`
29452946

@@ -3277,6 +3278,7 @@ The `etcd_config` configures the etcd client. The supported CLI flags `<prefix>`
32773278
- `distributor.ha-tracker`
32783279
- `distributor.ring`
32793280
- `parquet-converter.ring`
3281+
- `querier.ring`
32803282
- `ruler.ring`
32813283
- `store-gateway.sharding-ring`
32823284

@@ -3480,6 +3482,120 @@ grpc_client_config:
34803482
# using default gRPC client connect timeout 20s.
34813483
# CLI flag: -querier.frontend-client.connect-timeout
34823484
[connect_timeout: <duration> | default = 5s]
3485+
3486+
ring:
3487+
# The key-value store used to share the hash ring across multiple instances.
3488+
kvstore:
3489+
# Backend storage to use for the ring. Supported values are: consul, etcd,
3490+
# inmemory, memberlist, multi.
3491+
# CLI flag: -querier.ring.store
3492+
[store: <string> | default = "consul"]
3493+
3494+
# The prefix for the keys in the store. Should end with a /.
3495+
# CLI flag: -querier.ring.prefix
3496+
[prefix: <string> | default = "querier/"]
3497+
3498+
dynamodb:
3499+
# Region to access dynamodb.
3500+
# CLI flag: -querier.ring.dynamodb.region
3501+
[region: <string> | default = ""]
3502+
3503+
# Table name to use on dynamodb.
3504+
# CLI flag: -querier.ring.dynamodb.table-name
3505+
[table_name: <string> | default = ""]
3506+
3507+
# Time to expire items on dynamodb.
3508+
# CLI flag: -querier.ring.dynamodb.ttl-time
3509+
[ttl: <duration> | default = 0s]
3510+
3511+
# Time to refresh local ring with information on dynamodb.
3512+
# CLI flag: -querier.ring.dynamodb.puller-sync-time
3513+
[puller_sync_time: <duration> | default = 1m]
3514+
3515+
# Maximum number of retries for DDB KV CAS.
3516+
# CLI flag: -querier.ring.dynamodb.max-cas-retries
3517+
[max_cas_retries: <int> | default = 10]
3518+
3519+
# Timeout of dynamoDbClient requests. Default is 2m.
3520+
# CLI flag: -querier.ring.dynamodb.timeout
3521+
[timeout: <duration> | default = 2m]
3522+
3523+
# The consul_config configures the consul client.
3524+
# The CLI flags prefix for this block config is: querier.ring
3525+
[consul: <consul_config>]
3526+
3527+
# The etcd_config configures the etcd client.
3528+
# The CLI flags prefix for this block config is: querier.ring
3529+
[etcd: <etcd_config>]
3530+
3531+
multi:
3532+
# Primary backend storage used by multi-client.
3533+
# CLI flag: -querier.ring.multi.primary
3534+
[primary: <string> | default = ""]
3535+
3536+
# Secondary backend storage used by multi-client.
3537+
# CLI flag: -querier.ring.multi.secondary
3538+
[secondary: <string> | default = ""]
3539+
3540+
# Mirror writes to secondary store.
3541+
# CLI flag: -querier.ring.multi.mirror-enabled
3542+
[mirror_enabled: <boolean> | default = false]
3543+
3544+
# Timeout for storing value to secondary store.
3545+
# CLI flag: -querier.ring.multi.mirror-timeout
3546+
[mirror_timeout: <duration> | default = 2s]
3547+
3548+
# Period at which to heartbeat to the ring. 0 = disabled.
3549+
# CLI flag: -querier.ring.heartbeat-period
3550+
[heartbeat_period: <duration> | default = 5s]
3551+
3552+
# The heartbeat timeout after which rulers are considered unhealthy within the
3553+
# ring. 0 = never (timeout disabled).
3554+
# CLI flag: -querier.ring.heartbeat-timeout
3555+
[heartbeat_timeout: <duration> | default = 1m]
3556+
3557+
# EXPERIMENTAL: The replication factor to use when loading rule groups for API
3558+
# HA.
3559+
# CLI flag: -querier.ring.replication-factor
3560+
[replication_factor: <int> | default = 1]
3561+
3562+
# EXPERIMENTAL: True to enable zone-awareness and load rule groups across
3563+
# different availability zones for API HA.
3564+
# CLI flag: -querier.ring.zone-awareness-enabled
3565+
[zone_awareness_enabled: <boolean> | default = false]
3566+
3567+
# EXPERIMENTAL: File path where tokens are stored. If empty, tokens are not
3568+
# stored at shutdown and restored at startup.
3569+
# CLI flag: -querier.ring.tokens-file-path
3570+
[tokens_file_path: <string> | default = ""]
3571+
3572+
# Set to true to enable ring detailed metrics. These metrics provide detailed
3573+
# information, such as token count and ownership per tenant. Disabling them
3574+
# can significantly decrease the number of metrics emitted.
3575+
# CLI flag: -querier.ring.detailed-metrics-enabled
3576+
[detailed_metrics_enabled: <boolean> | default = true]
3577+
3578+
# The sleep seconds when ruler is shutting down. Need to be close to or larger
3579+
# than KV Store information propagation delay
3580+
# CLI flag: -querier.ring.final-sleep
3581+
[final_sleep: <duration> | default = 0s]
3582+
3583+
# Keep instance in the ring on shut down.
3584+
# CLI flag: -querier.ring.keep-instance-in-the-ring-on-shutdown
3585+
[keep_instance_in_the_ring_on_shutdown: <boolean> | default = false]
3586+
3587+
# Name of network interface to read address from.
3588+
# CLI flag: -querier.ring.instance-interface-names
3589+
[instance_interface_names: <list of string> | default = [eth0 en0]]
3590+
3591+
# The availability zone where this instance is running. Required if
3592+
# zone-awareness is enabled.
3593+
# CLI flag: -querier.ring.instance-availability-zone
3594+
[instance_availability_zone: <string> | default = ""]
3595+
3596+
# Number of tokens for each ruler.
3597+
# CLI flag: -querier.ring.num-tokens
3598+
[num_tokens: <int> | default = 128]
34833599
```
34843600

34853601
### `ingester_config`

pkg/cortex/modules.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
87
"log/slog"
98
"net/http"
109
"runtime"
@@ -414,6 +413,9 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
414413

415414
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
416415
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
416+
417+
t.Cfg.Worker.QuerierRing.ListenPort = t.Cfg.Server.GRPCListenPort
418+
417419
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer)
418420
}
419421

@@ -813,7 +815,7 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) {
813815
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
814816
}
815817

816-
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
818+
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled)
817819
if err != nil {
818820
return nil, errors.Wrap(err, "query-scheduler init")
819821
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package distributed_execution
2+
3+
// FragmentKey uniquely identifies a fragment of a distributed logical query plan.
4+
// It combines a queryID (to identify the overall query) and a fragmentID
5+
// (to identify the specific fragment within that query).
6+
type FragmentKey struct {
7+
// QueryID identifies the distributed query this fragment belongs to
8+
queryID uint64
9+
10+
// FragmentID identifies this specific fragment within the query
11+
fragmentID uint64
12+
}
13+
14+
// MakeFragmentKey creates a new FragmentKey with the given queryID and fragmentID.
15+
// It's used to track and identify fragments during distributed query execution.
16+
func MakeFragmentKey(queryID uint64, fragmentID uint64) FragmentKey {
17+
return FragmentKey{
18+
queryID: queryID,
19+
fragmentID: fragmentID,
20+
}
21+
}
22+
23+
// GetQueryID returns the queryID for the current key
24+
// This ID is shared across all fragments of the same distributed query.
25+
func (f FragmentKey) GetQueryID() uint64 {
26+
return f.queryID
27+
}
28+
29+
// GetFragmentID returns the ID for this specific fragment
30+
// within its parent query.
31+
func (f FragmentKey) GetFragmentID() uint64 {
32+
return f.fragmentID
33+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package plan_fragments
2+
3+
import "github.com/thanos-io/promql-engine/logicalplan"
4+
5+
type Fragmenter interface {
6+
Fragment(node logicalplan.Node) ([]Fragment, error)
7+
}
8+
9+
type DummyFragmenter struct {
10+
}
11+
12+
func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) {
13+
// simple logic without distributed optimizer
14+
return []Fragment{
15+
{
16+
Node: node,
17+
FragmentID: uint64(1),
18+
ChildIDs: []uint64{},
19+
IsRoot: true,
20+
},
21+
}, nil
22+
}
23+
24+
type Fragment struct {
25+
Node logicalplan.Node
26+
FragmentID uint64
27+
ChildIDs []uint64
28+
IsRoot bool
29+
}
30+
31+
func (s *Fragment) IsEmpty() bool {
32+
if s.Node != nil {
33+
return false
34+
}
35+
if s.FragmentID != 0 {
36+
return false
37+
}
38+
if s.IsRoot {
39+
return false
40+
}
41+
if len(s.ChildIDs) != 0 {
42+
return false
43+
}
44+
return true
45+
}
46+
47+
func NewDummyFragmenter() Fragmenter {
48+
return &DummyFragmenter{}
49+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package plan_fragments
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/cortexproject/cortex/pkg/util/logical_plan"
10+
)
11+
12+
func TestFragmenter(t *testing.T) {
13+
type testCase struct {
14+
name string
15+
query string
16+
start time.Time
17+
end time.Time
18+
expectedFragments int
19+
}
20+
21+
now := time.Now()
22+
23+
// more tests will be added when distributed optimizer and fragmenter are implemented
24+
tests := []testCase{
25+
{
26+
name: "simple logical query plan - no fragmentation",
27+
query: "up",
28+
start: now,
29+
end: now,
30+
expectedFragments: 1,
31+
},
32+
}
33+
34+
for _, tc := range tests {
35+
t.Run(tc.name, func(t *testing.T) {
36+
lp, err := logical_plan.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
37+
require.NoError(t, err)
38+
39+
fragmenter := NewDummyFragmenter()
40+
res, err := fragmenter.Fragment((*lp).Root())
41+
42+
require.NoError(t, err)
43+
require.Equal(t, tc.expectedFragments, len(res))
44+
})
45+
}
46+
}

pkg/querier/querier.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ type Config struct {
9696
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"`
9797
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"`
9898
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"`
99-
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
99+
100+
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
100101
}
101102

102103
var (

pkg/querier/worker/scheduler_processor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/cortexproject/cortex/pkg/util/services"
3232
)
3333

34-
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) {
34+
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer, querierAddress string) (*schedulerProcessor, []services.Service) {
3535
p := &schedulerProcessor{
3636
log: log,
3737
handler: handler,
@@ -47,6 +47,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
4747
Help: "Time spend doing requests to frontend.",
4848
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
4949
}, []string{"operation", "status_code"}),
50+
querierAddress: querierAddress,
5051
}
5152

5253
frontendClientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
@@ -71,6 +72,7 @@ type schedulerProcessor struct {
7172
grpcConfig grpcclient.Config
7273
maxMessageSize int
7374
querierID string
75+
querierAddress string
7476

7577
frontendPool *client.Pool
7678
frontendClientRequestDuration *prometheus.HistogramVec
@@ -97,7 +99,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context,
9799
for backoff.Ongoing() {
98100
c, err := schedulerClient.QuerierLoop(ctx)
99101
if err == nil {
100-
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID})
102+
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID, QuerierAddress: sp.querierAddress})
101103
}
102104

103105
if err != nil {

pkg/querier/worker/scheduler_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func Test_ToShowNotPanic_RelatedIssue6599(t *testing.T) {
144144
go stat.AddFetchedChunkBytes(10)
145145
}).Return(&httpgrpc.HTTPResponse{}, nil)
146146

147-
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil)
147+
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil, "")
148148
schedulerClient := &mockSchedulerForQuerierClient{}
149149
schedulerClient.On("QuerierLoop", mock.Anything, mock.Anything).Return(querierLoopClient, nil)
150150

0 commit comments

Comments
 (0)