Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3485,6 +3485,10 @@ grpc_client_config:
# using default gRPC client connect timeout 20s.
# CLI flag: -querier.frontend-client.connect-timeout
[connect_timeout: <duration> | default = 5s]
# Name of network interface to read address from.
# CLI flag: -querier.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
```

### `ingester_config`
Expand Down
6 changes: 4 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"fmt"

"log/slog"
"net/http"
"runtime"
Expand Down Expand Up @@ -414,6 +413,9 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {

t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog

t.Cfg.Worker.ListenPort = t.Cfg.Server.GRPCListenPort

return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer)
}

Expand Down Expand Up @@ -815,7 +817,7 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) {
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
}

s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled)
if err != nil {
return nil, errors.Wrap(err, "query-scheduler init")
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/distributed_execution/fragment_key.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
package distributed_execution

// FragmentKey uniquely identifies a fragment of a distributed logical query plan.
// It combines a queryID (to identify the overall query) and a fragmentID
// (to identify the specific fragment within that query).
type FragmentKey struct {
queryID uint64
// queryID identifies the distributed query this fragment belongs to
queryID uint64
// fragmentID identifies this specific fragment within the query
fragmentID uint64
}

func MakeFragmentKey(queryID uint64, fragmentID uint64) *FragmentKey {
return &FragmentKey{
// MakeFragmentKey creates a new FragmentKey with the given queryID and fragmentID.
// It's used to track and identify fragments during distributed query execution.
func MakeFragmentKey(queryID uint64, fragmentID uint64) FragmentKey {
return FragmentKey{
queryID: queryID,
fragmentID: fragmentID,
}
}

// GetQueryID returns the queryID for the current key
// This ID is shared across all fragments of the same distributed query.
func (f FragmentKey) GetQueryID() uint64 {
return f.queryID
}

// GetFragmentID returns the ID for this specific fragment
// within its parent query.
func (f FragmentKey) GetFragmentID() uint64 {
return f.fragmentID
}
52 changes: 52 additions & 0 deletions pkg/distributed_execution/plan_fragments/fragmenter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package plan_fragments

import "github.com/thanos-io/promql-engine/logicalplan"

// Fragmenter interface
type Fragmenter interface {
// Fragment function fragments the logical query plan and will always return the fragment in the order of child-to-root
// in other words, the order of the fragment in the array will be the order they are being scheduled
Fragment(node logicalplan.Node) ([]Fragment, error)
}

type DummyFragmenter struct {
}

func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) {
// simple logic without distributed optimizer
return []Fragment{
{
Node: node,
FragmentID: uint64(1),
ChildIDs: []uint64{},
IsRoot: true,
},
}, nil
}

type Fragment struct {
Node logicalplan.Node
FragmentID uint64
ChildIDs []uint64
IsRoot bool
}

func (s *Fragment) IsEmpty() bool {
if s.Node != nil {
return false
}
if s.FragmentID != 0 {
return false
}
if s.IsRoot {
return false
}
if len(s.ChildIDs) != 0 {
return false
}
return true
}

func NewDummyFragmenter() Fragmenter {
return &DummyFragmenter{}
}
46 changes: 46 additions & 0 deletions pkg/distributed_execution/plan_fragments/fragmenter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package plan_fragments

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util/logical_plan"
)

func TestFragmenter(t *testing.T) {
type testCase struct {
name string
query string
start time.Time
end time.Time
expectedFragments int
}

now := time.Now()

// more tests will be added when distributed optimizer and fragmenter are implemented
tests := []testCase{
{
name: "simple logical query plan - no fragmentation",
query: "up",
start: now,
end: now,
expectedFragments: 1,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
lp, err := logical_plan.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
require.NoError(t, err)

fragmenter := NewDummyFragmenter()
res, err := fragmenter.Fragment((*lp).Root())

require.NoError(t, err)
require.Equal(t, tc.expectedFragments, len(res))
})
}
}
2 changes: 1 addition & 1 deletion pkg/distributed_execution/remote_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (r *Remote) UnmarshalJSON(data []byte) error {
return err
}

r.FragmentKey = *MakeFragmentKey(re.QueryID, re.FragmentID)
r.FragmentKey = MakeFragmentKey(re.QueryID, re.FragmentID)
r.FragmentAddr = re.FragmentAddr
return nil
}
3 changes: 2 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ type Config struct {
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"`
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"`
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"`
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`

DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
}

var (
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"
)

func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) {
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer, querierAddress string) (*schedulerProcessor, []services.Service) {
p := &schedulerProcessor{
log: log,
handler: handler,
Expand All @@ -47,6 +47,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
Help: "Time spend doing requests to frontend.",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"}),
querierAddress: querierAddress,
}

frontendClientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Expand All @@ -71,6 +72,7 @@ type schedulerProcessor struct {
grpcConfig grpcclient.Config
maxMessageSize int
querierID string
querierAddress string

frontendPool *client.Pool
frontendClientRequestDuration *prometheus.HistogramVec
Expand All @@ -97,7 +99,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context,
for backoff.Ongoing() {
c, err := schedulerClient.QuerierLoop(ctx)
if err == nil {
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID})
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID, QuerierAddress: sp.querierAddress})
}

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/scheduler_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func Test_ToShowNotPanic_RelatedIssue6599(t *testing.T) {
go stat.AddFetchedChunkBytes(10)
}).Return(&httpgrpc.HTTPResponse{}, nil)

sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil)
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil, "")
schedulerClient := &mockSchedulerForQuerierClient{}
schedulerClient.On("QuerierLoop", mock.Anything, mock.Anything).Return(querierLoopClient, nil)

Expand Down
21 changes: 20 additions & 1 deletion pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package worker
import (
"context"
"flag"
"net"
"os"
"strconv"
"sync"
"time"

Expand All @@ -14,7 +16,9 @@ import (
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand All @@ -33,6 +37,10 @@ type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`

TargetHeaders []string `yaml:"-"` // Propagated by config.

InstanceInterfaceNames []string `yaml:"instance_interface_names"`
ListenPort int `yaml:"-"`
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -46,6 +54,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", "", f)

cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "querier.instance-interface-names", "Name of network interface to read address from.")
f.StringVar(&cfg.InstanceAddr, "querier.instance-addr", "", "IP address of the querier")
}

func (cfg *Config) Validate(log log.Logger) error {
Expand Down Expand Up @@ -109,7 +121,14 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr
level.Info(log).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress)

address = cfg.SchedulerAddress
processor, servs = newSchedulerProcessor(cfg, handler, log, reg)

ipAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, log)
if err != nil {
return nil, err
}
querierAddr := net.JoinHostPort(ipAddr, strconv.Itoa(cfg.ListenPort))

processor, servs = newSchedulerProcessor(cfg, handler, log, reg, querierAddr)

case cfg.FrontendAddress != "":
level.Info(log).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress)
Expand Down
79 changes: 79 additions & 0 deletions pkg/scheduler/fragment_table/fragment_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package fragment_table

import (
"sync"
"time"

"github.com/cortexproject/cortex/pkg/distributed_execution"
)

type fragmentEntry struct {
addr string
createdAt time.Time
}

// FragmentTable maintains a mapping between query fragments and their assigned querier addresses.
// Entries automatically expire after a configured duration to prevent stale mappings.
type FragmentTable struct {
mappings map[distributed_execution.FragmentKey]*fragmentEntry
mu sync.RWMutex
expiration time.Duration
}

// NewFragmentTable creates a new FragmentTable with the specified expiration duration.
// It starts a background goroutine that periodically removes expired entries.
// The cleanup interval is set to half of the expiration duration.
func NewFragmentTable(expiration time.Duration) *FragmentTable {
ft := &FragmentTable{
mappings: make(map[distributed_execution.FragmentKey]*fragmentEntry),
expiration: expiration,
}
go ft.periodicCleanup()
return ft
}

// AddAddressByID associates a querier address with a specific fragment of a query.
// The association will automatically expire after the configured duration.
func (f *FragmentTable) AddAddressByID(queryID uint64, fragmentID uint64, addr string) {
f.mu.Lock()
defer f.mu.Unlock()
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
f.mappings[key] = &fragmentEntry{
addr: addr,
createdAt: time.Now(),
}
}

// GetAddrByID retrieves the querier address associated with a specific fragment.
func (f *FragmentTable) GetAddrByID(queryID uint64, fragmentID uint64) (string, bool) {
f.mu.RLock()
defer f.mu.RUnlock()
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
if entry, ok := f.mappings[key]; ok {
return entry.addr, true
}
return "", false
}

func (f *FragmentTable) cleanupExpired() {
f.mu.Lock()
defer f.mu.Unlock()
now := time.Now()
keysToDelete := make([]distributed_execution.FragmentKey, 0)
for key, entry := range f.mappings {
if now.Sub(entry.createdAt) > f.expiration {
keysToDelete = append(keysToDelete, key)
}
}
for _, key := range keysToDelete {
delete(f.mappings, key)
}
}

func (f *FragmentTable) periodicCleanup() {
ticker := time.NewTicker(f.expiration / 2)
defer ticker.Stop()
for range ticker.C {
f.cleanupExpired()
}
}
Loading
Loading