diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c4ef8305ed6..096fc09fca4 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2940,6 +2940,7 @@ The `consul_config` configures the consul client. The supported CLI flags `` - `distributor.ha-tracker` - `distributor.ring` - `parquet-converter.ring` +- `querier.ring` - `ruler.ring` - `store-gateway.sharding-ring` @@ -3480,6 +3482,120 @@ grpc_client_config: # using default gRPC client connect timeout 20s. # CLI flag: -querier.frontend-client.connect-timeout [connect_timeout: | default = 5s] + +ring: + # The key-value store used to share the hash ring across multiple instances. + kvstore: + # Backend storage to use for the ring. Supported values are: consul, etcd, + # inmemory, memberlist, multi. + # CLI flag: -querier.ring.store + [store: | default = "consul"] + + # The prefix for the keys in the store. Should end with a /. + # CLI flag: -querier.ring.prefix + [prefix: | default = "querier/"] + + dynamodb: + # Region to access dynamodb. + # CLI flag: -querier.ring.dynamodb.region + [region: | default = ""] + + # Table name to use on dynamodb. + # CLI flag: -querier.ring.dynamodb.table-name + [table_name: | default = ""] + + # Time to expire items on dynamodb. + # CLI flag: -querier.ring.dynamodb.ttl-time + [ttl: | default = 0s] + + # Time to refresh local ring with information on dynamodb. + # CLI flag: -querier.ring.dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + + # Maximum number of retries for DDB KV CAS. + # CLI flag: -querier.ring.dynamodb.max-cas-retries + [max_cas_retries: | default = 10] + + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -querier.ring.dynamodb.timeout + [timeout: | default = 2m] + + # The consul_config configures the consul client. + # The CLI flags prefix for this block config is: querier.ring + [consul: ] + + # The etcd_config configures the etcd client. + # The CLI flags prefix for this block config is: querier.ring + [etcd: ] + + multi: + # Primary backend storage used by multi-client. + # CLI flag: -querier.ring.multi.primary + [primary: | default = ""] + + # Secondary backend storage used by multi-client. + # CLI flag: -querier.ring.multi.secondary + [secondary: | default = ""] + + # Mirror writes to secondary store. + # CLI flag: -querier.ring.multi.mirror-enabled + [mirror_enabled: | default = false] + + # Timeout for storing value to secondary store. + # CLI flag: -querier.ring.multi.mirror-timeout + [mirror_timeout: | default = 2s] + + # Period at which to heartbeat to the ring. 0 = disabled. + # CLI flag: -querier.ring.heartbeat-period + [heartbeat_period: | default = 5s] + + # The heartbeat timeout after which rulers are considered unhealthy within the + # ring. 0 = never (timeout disabled). + # CLI flag: -querier.ring.heartbeat-timeout + [heartbeat_timeout: | default = 1m] + + # EXPERIMENTAL: The replication factor to use when loading rule groups for API + # HA. + # CLI flag: -querier.ring.replication-factor + [replication_factor: | default = 1] + + # EXPERIMENTAL: True to enable zone-awareness and load rule groups across + # different availability zones for API HA. + # CLI flag: -querier.ring.zone-awareness-enabled + [zone_awareness_enabled: | default = false] + + # EXPERIMENTAL: File path where tokens are stored. If empty, tokens are not + # stored at shutdown and restored at startup. + # CLI flag: -querier.ring.tokens-file-path + [tokens_file_path: | default = ""] + + # Set to true to enable ring detailed metrics. These metrics provide detailed + # information, such as token count and ownership per tenant. Disabling them + # can significantly decrease the number of metrics emitted. + # CLI flag: -querier.ring.detailed-metrics-enabled + [detailed_metrics_enabled: | default = true] + + # The sleep seconds when ruler is shutting down. Need to be close to or larger + # than KV Store information propagation delay + # CLI flag: -querier.ring.final-sleep + [final_sleep: | default = 0s] + + # Keep instance in the ring on shut down. + # CLI flag: -querier.ring.keep-instance-in-the-ring-on-shutdown + [keep_instance_in_the_ring_on_shutdown: | default = false] + + # Name of network interface to read address from. + # CLI flag: -querier.ring.instance-interface-names + [instance_interface_names: | default = [eth0 en0]] + + # The availability zone where this instance is running. Required if + # zone-awareness is enabled. + # CLI flag: -querier.ring.instance-availability-zone + [instance_availability_zone: | default = ""] + + # Number of tokens for each ruler. + # CLI flag: -querier.ring.num-tokens + [num_tokens: | default = 128] ``` ### `ingester_config` diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index a47888b8267..e38e9cf2f7c 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "log/slog" "net/http" "runtime" @@ -414,6 +413,9 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog + + t.Cfg.Worker.QuerierRing.ListenPort = t.Cfg.Server.GRPCListenPort + return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer) } @@ -813,7 +815,7 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) { tenant.WithDefaultResolver(tenantfederation.NewRegexValidator()) } - s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled) if err != nil { return nil, errors.Wrap(err, "query-scheduler init") } diff --git a/pkg/distributed_execution/fragment_key.go b/pkg/distributed_execution/fragment_key.go new file mode 100644 index 00000000000..2f261471b9e --- /dev/null +++ b/pkg/distributed_execution/fragment_key.go @@ -0,0 +1,33 @@ +package distributed_execution + +// FragmentKey uniquely identifies a fragment of a distributed logical query plan. +// It combines a queryID (to identify the overall query) and a fragmentID +// (to identify the specific fragment within that query). +type FragmentKey struct { + // QueryID identifies the distributed query this fragment belongs to + queryID uint64 + + // FragmentID identifies this specific fragment within the query + fragmentID uint64 +} + +// MakeFragmentKey creates a new FragmentKey with the given queryID and fragmentID. +// It's used to track and identify fragments during distributed query execution. +func MakeFragmentKey(queryID uint64, fragmentID uint64) FragmentKey { + return FragmentKey{ + queryID: queryID, + fragmentID: fragmentID, + } +} + +// GetQueryID returns the queryID for the current key +// This ID is shared across all fragments of the same distributed query. +func (f FragmentKey) GetQueryID() uint64 { + return f.queryID +} + +// GetFragmentID returns the ID for this specific fragment +// within its parent query. +func (f FragmentKey) GetFragmentID() uint64 { + return f.fragmentID +} diff --git a/pkg/distributed_execution/plan_fragments/fragmenter.go b/pkg/distributed_execution/plan_fragments/fragmenter.go new file mode 100644 index 00000000000..7fc474e8e1a --- /dev/null +++ b/pkg/distributed_execution/plan_fragments/fragmenter.go @@ -0,0 +1,49 @@ +package plan_fragments + +import "github.com/thanos-io/promql-engine/logicalplan" + +type Fragmenter interface { + Fragment(node logicalplan.Node) ([]Fragment, error) +} + +type DummyFragmenter struct { +} + +func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) { + // simple logic without distributed optimizer + return []Fragment{ + { + Node: node, + FragmentID: uint64(1), + ChildIDs: []uint64{}, + IsRoot: true, + }, + }, nil +} + +type Fragment struct { + Node logicalplan.Node + FragmentID uint64 + ChildIDs []uint64 + IsRoot bool +} + +func (s *Fragment) IsEmpty() bool { + if s.Node != nil { + return false + } + if s.FragmentID != 0 { + return false + } + if s.IsRoot { + return false + } + if len(s.ChildIDs) != 0 { + return false + } + return true +} + +func NewDummyFragmenter() Fragmenter { + return &DummyFragmenter{} +} diff --git a/pkg/distributed_execution/plan_fragments/fragmenter_test.go b/pkg/distributed_execution/plan_fragments/fragmenter_test.go new file mode 100644 index 00000000000..65f4c20022e --- /dev/null +++ b/pkg/distributed_execution/plan_fragments/fragmenter_test.go @@ -0,0 +1,46 @@ +package plan_fragments + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util/logical_plan" +) + +func TestFragmenter(t *testing.T) { + type testCase struct { + name string + query string + start time.Time + end time.Time + expectedFragments int + } + + now := time.Now() + + // more tests will be added when distributed optimizer and fragmenter are implemented + tests := []testCase{ + { + name: "simple logical query plan - no fragmentation", + query: "up", + start: now, + end: now, + expectedFragments: 1, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + lp, err := logical_plan.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0) + require.NoError(t, err) + + fragmenter := NewDummyFragmenter() + res, err := fragmenter.Fragment((*lp).Root()) + + require.NoError(t, err) + require.Equal(t, tc.expectedFragments, len(res)) + }) + } +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 9160f1c4112..548fda7d823 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -96,7 +96,8 @@ type Config struct { ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` - DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` + + DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } var ( diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 10fd96ab230..3bba5980442 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -31,7 +31,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/services" ) -func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) { +func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer, querierAddress string) (*schedulerProcessor, []services.Service) { p := &schedulerProcessor{ log: log, handler: handler, @@ -47,6 +47,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r Help: "Time spend doing requests to frontend.", Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), }, []string{"operation", "status_code"}), + querierAddress: querierAddress, } frontendClientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ @@ -71,6 +72,7 @@ type schedulerProcessor struct { grpcConfig grpcclient.Config maxMessageSize int querierID string + querierAddress string frontendPool *client.Pool frontendClientRequestDuration *prometheus.HistogramVec @@ -97,7 +99,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, for backoff.Ongoing() { c, err := schedulerClient.QuerierLoop(ctx) if err == nil { - err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID}) + err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID, QuerierAddress: sp.querierAddress}) } if err != nil { diff --git a/pkg/querier/worker/scheduler_processor_test.go b/pkg/querier/worker/scheduler_processor_test.go index c3d2534e441..9931ef9e303 100644 --- a/pkg/querier/worker/scheduler_processor_test.go +++ b/pkg/querier/worker/scheduler_processor_test.go @@ -144,7 +144,7 @@ func Test_ToShowNotPanic_RelatedIssue6599(t *testing.T) { go stat.AddFetchedChunkBytes(10) }).Return(&httpgrpc.HTTPResponse{}, nil) - sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil) + sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil, "") schedulerClient := &mockSchedulerForQuerierClient{} schedulerClient.On("QuerierLoop", mock.Anything, mock.Anything).Return(querierLoopClient, nil) diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 90e32b7aff5..a5436a877fd 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -3,7 +3,10 @@ package worker import ( "context" "flag" + "fmt" + "net" "os" + "strconv" "sync" "time" @@ -14,7 +17,10 @@ import ( "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -33,9 +39,70 @@ type Config struct { GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` TargetHeaders []string `yaml:"-"` // Propagated by config. + + QuerierRing RingConfig `yaml:"ring"` +} + +type RingConfig struct { + KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + ReplicationFactor int `yaml:"replication_factor"` + ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` + TokensFilePath string `yaml:"tokens_file_path"` + DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"` + + FinalSleep time.Duration `yaml:"final_sleep"` + KeepInstanceInTheRingOnShutdown bool `yaml:"keep_instance_in_the_ring_on_shutdown"` + + // Instance details + InstanceID string `yaml:"instance_id" doc:"hidden"` + InstanceInterfaceNames []string `yaml:"instance_interface_names"` + InstancePort int `yaml:"instance_port" doc:"hidden"` + InstanceAddr string `yaml:"instance_addr" doc:"hidden"` + InstanceZone string `yaml:"instance_availability_zone"` + + NumTokens int `yaml:"num_tokens"` + + // Injected internally + ListenPort int `yaml:"-"` + RingCheckPeriod time.Duration `yaml:"-"` + + // Used for testing + SkipUnregister bool `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { + hostname, err := os.Hostname() + if err != nil { + panic(fmt.Errorf("failed to get hostname, %w", err)) + } + + // Ring flags + cfg.KVStore.RegisterFlagsWithPrefix("querier.ring.", "querier/", f) + f.DurationVar(&cfg.HeartbeatPeriod, "querier.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") + f.DurationVar(&cfg.HeartbeatTimeout, "querier.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring. 0 = never (timeout disabled).") + f.DurationVar(&cfg.FinalSleep, "querier.ring.final-sleep", 0*time.Second, "The sleep seconds when ruler is shutting down. Need to be close to or larger than KV Store information propagation delay") + f.IntVar(&cfg.ReplicationFactor, "querier.ring.replication-factor", 1, "EXPERIMENTAL: The replication factor to use when loading rule groups for API HA.") + f.BoolVar(&cfg.ZoneAwarenessEnabled, "querier.ring.zone-awareness-enabled", false, "EXPERIMENTAL: True to enable zone-awareness and load rule groups across different availability zones for API HA.") + f.StringVar(&cfg.TokensFilePath, "querier.ring.tokens-file-path", "", "EXPERIMENTAL: File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") + f.BoolVar(&cfg.DetailedMetricsEnabled, "querier.ring.detailed-metrics-enabled", true, "Set to true to enable ring detailed metrics. These metrics provide detailed information, such as token count and ownership per tenant. Disabling them can significantly decrease the number of metrics emitted.") + + // Instance flags + cfg.InstanceInterfaceNames = []string{"eth0", "en0"} + f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "querier.ring.instance-interface-names", "Name of network interface to read address from.") + f.StringVar(&cfg.InstanceAddr, "querier.ring.instance-addr", "", "IP address to advertise in the ring.") + f.IntVar(&cfg.InstancePort, "querier.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") + f.StringVar(&cfg.InstanceID, "querier.ring.instance-id", hostname, "Instance ID to register in the ring.") + f.StringVar(&cfg.InstanceZone, "querier.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") + f.IntVar(&cfg.NumTokens, "querier.ring.num-tokens", 128, "Number of tokens for each ruler.") + f.BoolVar(&cfg.KeepInstanceInTheRingOnShutdown, "querier.ring.keep-instance-in-the-ring-on-shutdown", false, "Keep instance in the ring on shut down.") } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.QuerierRing.RegisterFlags(f) + f.StringVar(&cfg.SchedulerAddress, "querier.scheduler-address", "", "Hostname (and port) of scheduler that querier will periodically resolve, connect to and receive queries from. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.") f.StringVar(&cfg.FrontendAddress, "querier.frontend-address", "", "Address of query frontend service, in host:port format. If -querier.scheduler-address is set as well, querier will use scheduler instead. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.") @@ -109,7 +176,14 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr level.Info(log).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress) address = cfg.SchedulerAddress - processor, servs = newSchedulerProcessor(cfg, handler, log, reg) + + ipAddr, err := ring.GetInstanceAddr(cfg.QuerierRing.InstanceAddr, cfg.QuerierRing.InstanceInterfaceNames, log) + if err != nil { + return nil, err + } + querierAddr := net.JoinHostPort(ipAddr, strconv.Itoa(cfg.QuerierRing.ListenPort)) + + processor, servs = newSchedulerProcessor(cfg, handler, log, reg, querierAddr) case cfg.FrontendAddress != "": level.Info(log).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress) diff --git a/pkg/scheduler/fragment_table/fragment_table.go b/pkg/scheduler/fragment_table/fragment_table.go new file mode 100644 index 00000000000..47bf946a7e8 --- /dev/null +++ b/pkg/scheduler/fragment_table/fragment_table.go @@ -0,0 +1,70 @@ +package fragment_table + +import ( + "sync" + + "github.com/cortexproject/cortex/pkg/distributed_execution" +) + +type FragmentTable struct { + mappings map[distributed_execution.FragmentKey]string + mu sync.RWMutex +} + +func NewFragmentTable() *FragmentTable { + return &FragmentTable{ + mappings: make(map[distributed_execution.FragmentKey]string), + } +} + +func (f *FragmentTable) AddMapping(queryID uint64, fragmentID uint64, addr string) { + f.mu.Lock() + defer f.mu.Unlock() + + key := distributed_execution.MakeFragmentKey(queryID, fragmentID) + f.mappings[key] = addr +} + +func (f *FragmentTable) GetAllChildAddresses(queryID uint64, fragmentIDs []uint64) ([]string, bool) { + f.mu.RLock() + defer f.mu.RUnlock() + + addresses := make([]string, 0, len(fragmentIDs)) + + for _, fragmentID := range fragmentIDs { + key := distributed_execution.MakeFragmentKey(queryID, fragmentID) + if addr, ok := f.mappings[key]; ok { + addresses = append(addresses, addr) + } else { + return nil, false + } + } + return addresses, true +} + +func (f *FragmentTable) GetChildAddr(queryID uint64, fragmentID uint64) (string, bool) { + f.mu.RLock() + defer f.mu.RUnlock() + + key := distributed_execution.MakeFragmentKey(queryID, fragmentID) + if addr, ok := f.mappings[key]; ok { + return addr, true + } + return "", false +} + +func (f *FragmentTable) ClearMappings(queryID uint64) { + f.mu.Lock() + defer f.mu.Unlock() + + keysToDelete := make([]distributed_execution.FragmentKey, 0) + for key := range f.mappings { + if key.GetQueryID() == queryID { + keysToDelete = append(keysToDelete, key) + } + } + + for _, key := range keysToDelete { + delete(f.mappings, key) + } +} diff --git a/pkg/scheduler/fragment_table/fragment_table_test.go b/pkg/scheduler/fragment_table/fragment_table_test.go new file mode 100644 index 00000000000..0e7a52c7668 --- /dev/null +++ b/pkg/scheduler/fragment_table/fragment_table_test.go @@ -0,0 +1,116 @@ +package fragment_table + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestSchedulerCoordination checks whether the hashtable for fragment-querier mapping gives the expected value +// It also checks if it remains functional and accurate during a multi-thread/concurrent read & write situation +func TestSchedulerCoordination(t *testing.T) { + t.Run("basic operations", func(t *testing.T) { + table := NewFragmentTable() + table.AddMapping(uint64(0), uint64(1), "localhost:8000") + table.AddMapping(uint64(0), uint64(2), "localhost:8001") + + result, exist := table.GetAllChildAddresses(uint64(0), []uint64{1, 2}) + require.True(t, exist) + require.Equal(t, []string{"localhost:8000", "localhost:8001"}, result) + + result, exist = table.GetAllChildAddresses(uint64(0), []uint64{1, 3}) + require.False(t, exist) + require.Empty(t, result) + + result, exist = table.GetAllChildAddresses(uint64(0), []uint64{1}) + require.True(t, exist) + require.Equal(t, []string{"localhost:8000"}, result) + + table.ClearMappings(uint64(0)) + result, exist = table.GetAllChildAddresses(uint64(0), []uint64{1}) + require.False(t, exist) + require.Empty(t, result) + }) + + t.Run("concurrent operations", func(t *testing.T) { + table := NewFragmentTable() + const numGoroutines = 10 + const numOperations = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines * 3) + + // write + for i := 0; i < numGoroutines; i++ { + go func(routine int) { + defer wg.Done() + for j := 0; j < numOperations; j++ { + queryID := uint64(routine) + fragmentID := uint64(j) + addr := fmt.Sprintf("localhost:%d", j) + table.AddMapping(queryID, fragmentID, addr) + } + }(i) + } + + // read + for i := 0; i < numGoroutines; i++ { + go func(routine int) { + defer wg.Done() + for j := 0; j < numOperations; j++ { + queryID := uint64(routine) + fragmentIDs := []uint64{uint64(j)} + table.GetAllChildAddresses(queryID, fragmentIDs) + } + }(i) + } + + // clear + for i := 0; i < numGoroutines; i++ { + go func(routine int) { + defer wg.Done() + for j := 0; j < numOperations; j++ { + queryID := uint64(routine) + table.ClearMappings(queryID) + } + }(i) + } + + wg.Wait() + }) + + t.Run("edge cases", func(t *testing.T) { + table := NewFragmentTable() + + // test empty fragment IDs + result, exist := table.GetAllChildAddresses(0, []uint64{}) + require.True(t, exist) + require.Empty(t, result) + + // test clearing non-existent query + table.ClearMappings(999) + require.NotPanics(t, func() { + table.ClearMappings(999) + }) + + // test overwriting mapping + table.AddMapping(1, 1, "addr1") + table.AddMapping(1, 1, "addr2") + result, exist = table.GetAllChildAddresses(1, []uint64{1}) + require.True(t, exist) + require.Equal(t, []string{"addr2"}, result) + + // test multiple queries + table.AddMapping(1, 1, "addr1") + table.AddMapping(2, 1, "addr2") + result, exist = table.GetAllChildAddresses(1, []uint64{1}) + require.True(t, exist) + require.Equal(t, []string{"addr1"}, result) + + result, exist = table.GetAllChildAddresses(2, []uint64{1}) + require.True(t, exist) + require.Equal(t, []string{"addr2"}, result) + }) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 7c7ef4b7b3e..652d886289c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -5,6 +5,7 @@ import ( "flag" "io" "net/http" + "net/url" "sync" "time" @@ -15,14 +16,16 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/promql-engine/logicalplan" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/distributed_execution/plan_fragments" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" - //lint:ignore faillint scheduler needs to retrieve priority from the context - "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/querier/stats" //lint:ignore faillint scheduler needs to retrieve priority from the context + "github.com/cortexproject/cortex/pkg/scheduler/fragment_table" "github.com/cortexproject/cortex/pkg/scheduler/queue" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/tenant" @@ -55,7 +58,8 @@ type Scheduler struct { activeUsers *util.ActiveUsersCleanupService pendingRequestsMu sync.Mutex - pendingRequests map[requestKey]*schedulerRequest // Request is kept in this map even after being dispatched to querier. It can still be canceled at that time. + + pendingRequests map[requestKey]*schedulerRequest // Request is kept in this map even after being dispatched to querier. It can still be canceled at that time. // Subservices manager. subservices *services.Manager @@ -67,12 +71,26 @@ type Scheduler struct { connectedQuerierClients prometheus.GaugeFunc connectedFrontendClients prometheus.GaugeFunc queueDuration prometheus.Histogram + + // sub-query to querier address mappings + fragmentTable *fragment_table.FragmentTable + distributedExecEnabled bool + + // queryKey <--> fragment-ids lookup table allows faster cancellation of the whole query + // compared to traversing through the pending requests to find matching fragments + queryToFragmentsLookUp map[queryKey][]uint64 } -type requestKey struct { +// additional layer to improve efficiency of deleting fragments of logical query plans +// while maintaining previous logics +type queryKey struct { frontendAddr string queryID uint64 } +type requestKey struct { + queryKey queryKey + fragmentID uint64 +} type connectedFrontend struct { connections int @@ -95,7 +113,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } // NewScheduler creates a new Scheduler. -func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Scheduler, error) { +func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, distributedExecEnabled bool) (*Scheduler, error) { s := &Scheduler{ cfg: cfg, log: log, @@ -103,6 +121,10 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe pendingRequests: map[requestKey]*schedulerRequest{}, connectedFrontends: map[string]*connectedFrontend{}, + + fragmentTable: fragment_table.NewFragmentTable(), + distributedExecEnabled: distributedExecEnabled, + queryToFragmentsLookUp: map[queryKey][]uint64{}, } s.queueLength = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ @@ -166,6 +188,8 @@ type schedulerRequest struct { // This is only used for testing. parentSpanContext opentracing.SpanContext + + fragment plan_fragments.Fragment } func (s schedulerRequest) Priority() int64 { @@ -177,6 +201,34 @@ func (s schedulerRequest) Priority() int64 { return priority } +func (s *Scheduler) fragmentLogicalPlan(byteLP []byte) ([]plan_fragments.Fragment, error) { + lpNode, err := logicalplan.Unmarshal(byteLP) + if err != nil { + return nil, err + } + + fragmenter := plan_fragments.NewDummyFragmenter() + fragments, err := fragmenter.Fragment(lpNode) + if err != nil { + return nil, err + } + + return fragments, nil +} + +func (s *Scheduler) getPlanFromHTTPRequest(req *httpgrpc.HTTPRequest) ([]byte, error) { + if req.Body == nil { + return nil, nil + } + + values, err := url.ParseQuery(string(req.Body)) + if err != nil { + return nil, err + } + plan := values.Get("plan") + return []byte(plan), nil +} + // FrontendLoop handles connection from frontend. func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error { frontendAddress, frontendCtx, err := s.frontendConnected(frontend) @@ -223,7 +275,7 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front } case schedulerpb.CANCEL: - s.cancelRequestAndRemoveFromPending(frontendAddress, msg.QueryID) + s.cancelRequestAndRemoveFromPending(frontendAddress, msg.QueryID, 0) resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} default: @@ -279,6 +331,16 @@ func (s *Scheduler) frontendDisconnected(frontendAddress string) { } } +func (s *Scheduler) updatePlanInHTTPRequest(fragment plan_fragments.Fragment) ([]byte, error) { + byteLP, err := logicalplan.Marshal(fragment.Node) + if err != nil { + return nil, err + } + form := url.Values{} + form.Add("plan", string(byteLP)) + return []byte(form.Encode()), nil +} + func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr string, msg *schedulerpb.FrontendToScheduler) error { // Create new context for this request, to support cancellation. ctx, cancel := context.WithCancel(frontendContext) @@ -299,49 +361,153 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr userID := msg.GetUserID() - req := &schedulerRequest{ - frontendAddress: frontendAddr, - userID: msg.UserID, - queryID: msg.QueryID, - request: msg.HttpRequest, - statsEnabled: msg.StatsEnabled, + byteLP, err := s.getPlanFromHTTPRequest(msg.HttpRequest) + if err != nil { + return err + } + var fragments []plan_fragments.Fragment + if byteLP != nil { + if s.distributedExecEnabled { + fragments, err = s.fragmentLogicalPlan(byteLP) + } + if err != nil { + return err + } } - now := time.Now() + if len(fragments) == 0 { + req := &schedulerRequest{ + frontendAddress: frontendAddr, + userID: msg.UserID, + queryID: msg.QueryID, + request: msg.HttpRequest, + statsEnabled: msg.StatsEnabled, + fragment: plan_fragments.Fragment{}, + } - req.parentSpanContext = parentSpanContext - req.queueSpan, req.ctx = opentracing.StartSpanFromContextWithTracer(ctx, tracer, "queued", opentracing.ChildOf(parentSpanContext)) - req.enqueueTime = now - req.ctxCancel = cancel + now := time.Now() - // aggregate the max queriers limit in the case of a multi tenant query - tenantIDs, err := tenant.TenantIDsFromOrgID(userID) - if err != nil { - return err - } - maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, s.limits.MaxQueriersPerUser) + req.parentSpanContext = parentSpanContext + req.queueSpan, req.ctx = opentracing.StartSpanFromContextWithTracer(ctx, tracer, "queued", opentracing.ChildOf(parentSpanContext)) + req.enqueueTime = now + req.ctxCancel = cancel + + // aggregate the max queriers limit in the case of a multi tenant query + tenantIDs, err := tenant.TenantIDsFromOrgID(userID) + if err != nil { + return err + } + maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, s.limits.MaxQueriersPerUser) - s.activeUsers.UpdateUserTimestamp(userID, now) - return s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() { - shouldCancel = false + s.activeUsers.UpdateUserTimestamp(userID, now) + return s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() { + shouldCancel = false - s.pendingRequestsMu.Lock() - defer s.pendingRequestsMu.Unlock() - s.pendingRequests[requestKey{frontendAddr: frontendAddr, queryID: msg.QueryID}] = req - }) + s.pendingRequestsMu.Lock() + defer s.pendingRequestsMu.Unlock() + + // fragmentID will be 0 when distributed execution is not enabled + queryKey := queryKey{frontendAddr: frontendAddr, queryID: msg.QueryID} + s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: 0}] = req + }) + } else { + for _, fragment := range fragments { + frag := fragment + + if err := func() error { + // create new context and cancel func per fragment + ctx, cancel := context.WithCancel(frontendContext) + shouldCancel := true + defer func() { + if shouldCancel { + cancel() + } + }() + + // extract tracing info + tracer := opentracing.GlobalTracer() + parentSpanContext, err := httpgrpcutil.GetParentSpanForRequest(tracer, msg.HttpRequest) + if err != nil { + return err + } + + // modify request with fragment info + msg.HttpRequest.Body, err = s.updatePlanInHTTPRequest(frag) + if err != nil { + return err + } + + req := &schedulerRequest{ + frontendAddress: frontendAddr, + userID: msg.UserID, + queryID: msg.QueryID, + request: msg.HttpRequest, + statsEnabled: msg.StatsEnabled, + fragment: frag, + } + + now := time.Now() + req.parentSpanContext = parentSpanContext + req.queueSpan, req.ctx = opentracing.StartSpanFromContextWithTracer(ctx, tracer, "queued", opentracing.ChildOf(parentSpanContext)) + req.enqueueTime = now + req.ctxCancel = cancel + + tenantIDs, err := tenant.TenantIDsFromOrgID(userID) + if err != nil { + return err + } + maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, s.limits.MaxQueriersPerUser) + + s.activeUsers.UpdateUserTimestamp(userID, now) + + err = s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() { + shouldCancel = false + s.pendingRequestsMu.Lock() + defer s.pendingRequestsMu.Unlock() + + queryKey := queryKey{frontendAddr: frontendAddr, queryID: msg.QueryID} + s.queryToFragmentsLookUp[queryKey] = append(s.queryToFragmentsLookUp[queryKey], req.fragment.FragmentID) + s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: req.fragment.FragmentID}] = req + }) + + if err != nil { + return err + } + + return nil + }(); err != nil { + return err + } + } + return nil + } } // This method doesn't do removal from the queue. -func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, queryID uint64) { +func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, queryID uint64, fragmentID uint64) { s.pendingRequestsMu.Lock() defer s.pendingRequestsMu.Unlock() - key := requestKey{frontendAddr: frontendAddr, queryID: queryID} - req := s.pendingRequests[key] - if req != nil { - req.ctxCancel() + if s.distributedExecEnabled && fragmentID == 0 { + // deleting all the fragments of the query in the queue + querykey := queryKey{frontendAddr: frontendAddr, queryID: queryID} + for _, id := range s.queryToFragmentsLookUp[querykey] { + key := requestKey{queryKey: querykey, fragmentID: id} + req := s.pendingRequests[key] + if req != nil { + req.ctxCancel() + } + delete(s.pendingRequests, key) + } + delete(s.queryToFragmentsLookUp, querykey) // clean out the mappings for this request + } else { + key := requestKey{queryKey: queryKey{frontendAddr: frontendAddr, queryID: queryID}, fragmentID: fragmentID} + req := s.pendingRequests[key] + if req != nil { + req.ctxCancel() + } + delete(s.pendingRequests, key) } - delete(s.pendingRequests, key) } // QuerierLoop is started by querier to receive queries from scheduler. @@ -392,14 +558,13 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL */ if r.ctx.Err() != nil { - // Remove from pending requests. - s.cancelRequestAndRemoveFromPending(r.frontendAddress, r.queryID) + s.cancelRequestAndRemoveFromPending(r.frontendAddress, r.queryID, r.fragment.FragmentID) lastUserIndex = lastUserIndex.ReuseLastUser() continue } - if err := s.forwardRequestToQuerier(querier, r); err != nil { + if err := s.forwardRequestToQuerier(querier, r, resp.GetQuerierAddress()); err != nil { return err } } @@ -414,21 +579,43 @@ func (s *Scheduler) NotifyQuerierShutdown(_ context.Context, req *schedulerpb.No return &schedulerpb.NotifyQuerierShutdownResponse{}, nil } -func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *schedulerRequest) error { +func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *schedulerRequest, QuerierAddress string) error { // Make sure to cancel request at the end to cleanup resources. - defer s.cancelRequestAndRemoveFromPending(req.frontendAddress, req.queryID) + defer s.cancelRequestAndRemoveFromPending(req.frontendAddress, req.queryID, req.fragment.FragmentID) // Handle the stream sending & receiving on a goroutine so we can // monitoring the contexts in a select and cancel things appropriately. errCh := make(chan error, 1) go func() { + childIDtoAddrs := make(map[uint64]string) + if len(req.fragment.ChildIDs) != 0 { + for _, childID := range req.fragment.ChildIDs { + addr, ok := s.fragmentTable.GetChildAddr(req.queryID, childID) + if !ok { + return + } + childIDtoAddrs[childID] = addr + } + } err := querier.Send(&schedulerpb.SchedulerToQuerier{ UserID: req.userID, QueryID: req.queryID, FrontendAddress: req.frontendAddress, HttpRequest: req.request, StatsEnabled: req.statsEnabled, + FragmentID: req.fragment.FragmentID, + ChildIDtoAddrs: childIDtoAddrs, + IsRoot: req.fragment.IsRoot, }) + + if !req.fragment.IsEmpty() { + if req.fragment.IsRoot { + s.fragmentTable.ClearMappings(req.queryID) + } else { + s.fragmentTable.AddMapping(req.queryID, req.fragment.FragmentID, QuerierAddress) + } + } + if err != nil { errCh <- err return diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 9c1d75ad51a..9a11023d204 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "net/http" + "net/url" "strings" "sync" "testing" @@ -15,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/logicalplan" "github.com/uber/jaeger-client-go/config" "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc" @@ -26,16 +28,17 @@ import ( "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/httpgrpcutil" + "github.com/cortexproject/cortex/pkg/util/logical_plan" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" ) const testMaxOutstandingPerTenant = 5 -func setupScheduler(t *testing.T, reg prometheus.Registerer) (*Scheduler, schedulerpb.SchedulerForFrontendClient, schedulerpb.SchedulerForQuerierClient) { +func setupScheduler(t *testing.T, reg prometheus.Registerer, distributedExecEnabled bool) (*Scheduler, schedulerpb.SchedulerForFrontendClient, schedulerpb.SchedulerForQuerierClient) { cfg := Config{} flagext.DefaultValues(&cfg) - s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), reg) + s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), reg, distributedExecEnabled) require.NoError(t, err) server := grpc.NewServer() @@ -69,7 +72,7 @@ func setupScheduler(t *testing.T, reg prometheus.Registerer) (*Scheduler, schedu } func TestSchedulerBasicEnqueue(t *testing.T) { - scheduler, frontendClient, querierClient := setupScheduler(t, nil) + scheduler, frontendClient, querierClient := setupScheduler(t, nil, false) frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{ @@ -97,7 +100,7 @@ func TestSchedulerBasicEnqueue(t *testing.T) { } func TestSchedulerEnqueueWithCancel(t *testing.T) { - scheduler, frontendClient, querierClient := setupScheduler(t, nil) + scheduler, frontendClient, querierClient := setupScheduler(t, nil, false) frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{ @@ -127,7 +130,7 @@ func initQuerierLoop(t *testing.T, querierClient schedulerpb.SchedulerForQuerier } func TestSchedulerEnqueueByMultipleFrontendsWithCancel(t *testing.T) { - scheduler, frontendClient, querierClient := setupScheduler(t, nil) + scheduler, frontendClient, querierClient := setupScheduler(t, nil, false) frontendLoop1 := initFrontendLoop(t, frontendClient, "frontend-1") frontendLoop2 := initFrontendLoop(t, frontendClient, "frontend-2") @@ -168,7 +171,7 @@ func TestSchedulerEnqueueByMultipleFrontendsWithCancel(t *testing.T) { } func TestSchedulerEnqueueWithFrontendDisconnect(t *testing.T) { - scheduler, frontendClient, querierClient := setupScheduler(t, nil) + scheduler, frontendClient, querierClient := setupScheduler(t, nil, false) frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{ @@ -198,7 +201,7 @@ func TestSchedulerEnqueueWithFrontendDisconnect(t *testing.T) { } func TestCancelRequestInProgress(t *testing.T) { - scheduler, frontendClient, querierClient := setupScheduler(t, nil) + scheduler, frontendClient, querierClient := setupScheduler(t, nil, false) frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{ @@ -231,7 +234,7 @@ func TestCancelRequestInProgress(t *testing.T) { } func TestTracingContext(t *testing.T) { - scheduler, frontendClient, _ := setupScheduler(t, nil) + scheduler, frontendClient, _ := setupScheduler(t, nil, false) frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") @@ -262,7 +265,7 @@ func TestTracingContext(t *testing.T) { } func TestSchedulerShutdown_FrontendLoop(t *testing.T) { - scheduler, frontendClient, _ := setupScheduler(t, nil) + scheduler, frontendClient, _ := setupScheduler(t, nil, false) frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") @@ -283,7 +286,7 @@ func TestSchedulerShutdown_FrontendLoop(t *testing.T) { } func TestSchedulerShutdown_QuerierLoop(t *testing.T) { - scheduler, frontendClient, querierClient := setupScheduler(t, nil) + scheduler, frontendClient, querierClient := setupScheduler(t, nil, false) frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{ @@ -315,7 +318,7 @@ func TestSchedulerShutdown_QuerierLoop(t *testing.T) { } func TestSchedulerMaxOutstandingRequests(t *testing.T) { - _, frontendClient, _ := setupScheduler(t, nil) + _, frontendClient, _ := setupScheduler(t, nil, false) for i := 0; i < testMaxOutstandingPerTenant; i++ { // coming from different frontends @@ -347,7 +350,7 @@ func TestSchedulerMaxOutstandingRequests(t *testing.T) { } func TestSchedulerForwardsErrorToFrontend(t *testing.T) { - _, frontendClient, querierClient := setupScheduler(t, nil) + _, frontendClient, querierClient := setupScheduler(t, nil, false) fm := &frontendMock{resp: map[uint64]*httpgrpc.HTTPResponse{}} frontendAddress := "" @@ -409,7 +412,7 @@ func TestSchedulerForwardsErrorToFrontend(t *testing.T) { func TestSchedulerMetrics(t *testing.T) { reg := prometheus.NewPedanticRegistry() - scheduler, frontendClient, _ := setupScheduler(t, reg) + scheduler, frontendClient, _ := setupScheduler(t, reg, false) frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{ @@ -448,6 +451,70 @@ func TestSchedulerMetrics(t *testing.T) { `), "cortex_query_scheduler_queue_length", "cortex_request_queue_requests_total")) } +// TestQuerierLoopClient_WithLogicalPlan tests to see if the scheduler enqueues the fragment +// with the expected QueryID, logical plan, and other fragment meta-data + +func TestQuerierLoopClient_WithLogicalPlan(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + + _, frontendClient, querierClient := setupScheduler(t, reg, true) + frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") + querierLoop, err := querierClient.QuerierLoop(context.Background()) + require.NoError(t, err) + + // CASE 1: request with corrupted logical plan --> expect to fail at un-marshal stage + require.NoError(t, frontendLoop.Send(&schedulerpb.FrontendToScheduler{ + Type: schedulerpb.ENQUEUE, + QueryID: 1, + UserID: "test", + HttpRequest: &httpgrpc.HTTPRequest{Method: "POST", Url: "/hello", Body: []byte("test")}, + })) + msg, err := frontendLoop.Recv() + require.NoError(t, err) + require.True(t, msg.Status == schedulerpb.ERROR) + + // CASE 2: request without logical plan --> expect to not have fragment meta-data + frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{ + Type: schedulerpb.ENQUEUE, + QueryID: 2, + UserID: "test", + HttpRequest: &httpgrpc.HTTPRequest{Method: "POST", Url: "/hello", Body: []byte{}}, // empty logical plan + }) + require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{QuerierID: "querier-1", QuerierAddress: "localhost:8000"})) + + s2, err := querierLoop.Recv() + require.NoError(t, err) + require.Equal(t, uint64(2), s2.QueryID) + // (the below fields should be empty because the logical plan is not in the request) + require.Empty(t, s2.FragmentID) + require.Empty(t, s2.ChildIDtoAddrs) + require.Empty(t, s2.HttpRequest.Body) + require.False(t, s2.IsRoot) + + // CASE 3: request with correct logical plan --> expect to have fragment metadata + lp, err := logical_plan.CreateTestLogicalPlan("up", time.Now(), time.Now(), 0) + require.NoError(t, err) + bytesLp, err := logicalplan.Marshal((*lp).Root()) + form := url.Values{} + form.Set("plan", string(bytesLp)) // this is to imitate how the real format of http request body + require.NoError(t, err) + frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{ + Type: schedulerpb.ENQUEUE, + QueryID: 3, + UserID: "test", + HttpRequest: &httpgrpc.HTTPRequest{Method: "POST", Url: "/hello", Body: []byte(form.Encode())}, + }) + require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{QuerierID: "querier-2", QuerierAddress: "localhost:8000"})) + + s3, err := querierLoop.Recv() + require.NoError(t, err) + require.NotEmpty(t, s3.FragmentID) + require.Equal(t, uint64(3), s3.QueryID) + require.Empty(t, s3.ChildIDtoAddrs) // there is only one fragment for the logical plan, so no child fragments + require.Equal(t, s3.HttpRequest.Body, []byte(form.Encode())) + require.True(t, s3.IsRoot) +} + func initFrontendLoop(t *testing.T, client schedulerpb.SchedulerForFrontendClient, frontendAddr string) schedulerpb.SchedulerForFrontend_FrontendLoopClient { loop, err := client.FrontendLoop(context.Background()) require.NoError(t, err) diff --git a/pkg/scheduler/schedulerpb/scheduler.pb.go b/pkg/scheduler/schedulerpb/scheduler.pb.go index d3288f95b39..7ba5f7774b1 100644 --- a/pkg/scheduler/schedulerpb/scheduler.pb.go +++ b/pkg/scheduler/schedulerpb/scheduler.pb.go @@ -8,6 +8,7 @@ import ( fmt "fmt" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" httpgrpc "github.com/weaveworks/common/httpgrpc" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" @@ -85,7 +86,8 @@ func (SchedulerToFrontendStatus) EnumDescriptor() ([]byte, []int) { // Querier reports its own clientID when it connects, so that scheduler knows how many *different* queriers are connected. // To signal that querier is ready to accept another request, querier sends empty message. type QuerierToScheduler struct { - QuerierID string `protobuf:"bytes,1,opt,name=querierID,proto3" json:"querierID,omitempty"` + QuerierID string `protobuf:"bytes,1,opt,name=querierID,proto3" json:"querierID,omitempty"` + QuerierAddress string `protobuf:"bytes,2,opt,name=querierAddress,proto3" json:"querierAddress,omitempty"` } func (m *QuerierToScheduler) Reset() { *m = QuerierToScheduler{} } @@ -127,6 +129,13 @@ func (m *QuerierToScheduler) GetQuerierID() string { return "" } +func (m *QuerierToScheduler) GetQuerierAddress() string { + if m != nil { + return m.QuerierAddress + } + return "" +} + type SchedulerToQuerier struct { // Query ID as reported by frontend. When querier sends the response back to frontend (using frontendAddress), // it identifies the query by using this ID. @@ -139,6 +148,13 @@ type SchedulerToQuerier struct { // Whether query statistics tracking should be enabled. The response will include // statistics only when this option is enabled. StatsEnabled bool `protobuf:"varint,5,opt,name=statsEnabled,proto3" json:"statsEnabled,omitempty"` + // Below are the meta data that will be used for distributed execution + // The ID of current logical query plan fragment. + FragmentID uint64 `protobuf:"varint,6,opt,name=fragmentID,proto3" json:"fragmentID,omitempty"` + // The IDs and addresses of its child fragments + ChildIDtoAddrs map[uint64]string `protobuf:"bytes,7,rep,name=childIDtoAddrs,proto3" json:"childIDtoAddrs,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Whether the current fragment is the root + IsRoot bool `protobuf:"varint,8,opt,name=isRoot,proto3" json:"isRoot,omitempty"` } func (m *SchedulerToQuerier) Reset() { *m = SchedulerToQuerier{} } @@ -208,6 +224,27 @@ func (m *SchedulerToQuerier) GetStatsEnabled() bool { return false } +func (m *SchedulerToQuerier) GetFragmentID() uint64 { + if m != nil { + return m.FragmentID + } + return 0 +} + +func (m *SchedulerToQuerier) GetChildIDtoAddrs() map[uint64]string { + if m != nil { + return m.ChildIDtoAddrs + } + return nil +} + +func (m *SchedulerToQuerier) GetIsRoot() bool { + if m != nil { + return m.IsRoot + } + return false +} + type FrontendToScheduler struct { Type FrontendToSchedulerType `protobuf:"varint,1,opt,name=type,proto3,enum=schedulerpb.FrontendToSchedulerType" json:"type,omitempty"` // Used by INIT message. Will be put into all requests passed to querier. @@ -429,6 +466,7 @@ func init() { proto.RegisterEnum("schedulerpb.SchedulerToFrontendStatus", SchedulerToFrontendStatus_name, SchedulerToFrontendStatus_value) proto.RegisterType((*QuerierToScheduler)(nil), "schedulerpb.QuerierToScheduler") proto.RegisterType((*SchedulerToQuerier)(nil), "schedulerpb.SchedulerToQuerier") + proto.RegisterMapType((map[uint64]string)(nil), "schedulerpb.SchedulerToQuerier.ChildIDtoAddrsEntry") proto.RegisterType((*FrontendToScheduler)(nil), "schedulerpb.FrontendToScheduler") proto.RegisterType((*SchedulerToFrontend)(nil), "schedulerpb.SchedulerToFrontend") proto.RegisterType((*NotifyQuerierShutdownRequest)(nil), "schedulerpb.NotifyQuerierShutdownRequest") @@ -438,48 +476,54 @@ func init() { func init() { proto.RegisterFile("scheduler.proto", fileDescriptor_2b3fc28395a6d9c5) } var fileDescriptor_2b3fc28395a6d9c5 = []byte{ - // 644 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0x4f, 0x4f, 0xdb, 0x4e, - 0x10, 0xf5, 0x86, 0x24, 0xc0, 0x84, 0xdf, 0x0f, 0x77, 0x81, 0x36, 0x8d, 0xe8, 0x12, 0x45, 0x55, - 0x95, 0x72, 0x48, 0xaa, 0xb4, 0x52, 0x7b, 0x40, 0x95, 0x52, 0x30, 0x25, 0x2a, 0x75, 0x60, 0xb3, - 0x51, 0xff, 0x5c, 0x22, 0x92, 0x2c, 0x09, 0x02, 0xbc, 0x66, 0x6d, 0x17, 0xe5, 0xd6, 0x63, 0x8f, - 0xfd, 0x18, 0xfd, 0x28, 0xbd, 0x54, 0xe2, 0xc8, 0xa1, 0x87, 0x62, 0x2e, 0x3d, 0xf2, 0x11, 0xaa, - 0x38, 0x76, 0xea, 0xa4, 0x0e, 0x70, 0x9b, 0x1d, 0xbf, 0xe7, 0x9d, 0xf7, 0x66, 0x66, 0x61, 0xde, - 0x6a, 0x75, 0x79, 0xdb, 0x39, 0xe2, 0xb2, 0x60, 0x4a, 0x61, 0x0b, 0x9c, 0x1a, 0x26, 0xcc, 0x66, - 0x66, 0xb1, 0x23, 0x3a, 0xc2, 0xcb, 0x17, 0xfb, 0xd1, 0x00, 0x92, 0x79, 0xd6, 0x39, 0xb0, 0xbb, - 0x4e, 0xb3, 0xd0, 0x12, 0xc7, 0xc5, 0x53, 0xbe, 0xf7, 0x89, 0x9f, 0x0a, 0x79, 0x68, 0x15, 0x5b, - 0xe2, 0xf8, 0x58, 0x18, 0xc5, 0xae, 0x6d, 0x9b, 0x1d, 0x69, 0xb6, 0x86, 0xc1, 0x80, 0x95, 0x2b, - 0x01, 0xde, 0x75, 0xb8, 0x3c, 0xe0, 0x92, 0x89, 0x5a, 0x70, 0x07, 0x5e, 0x86, 0xd9, 0x93, 0x41, - 0xb6, 0xb2, 0x91, 0x46, 0x59, 0x94, 0x9f, 0xa5, 0x7f, 0x13, 0xb9, 0x1f, 0x08, 0xf0, 0x10, 0xcb, - 0x84, 0xcf, 0xc7, 0x69, 0x98, 0xee, 0x63, 0x7a, 0x3e, 0x25, 0x4e, 0x83, 0x23, 0x7e, 0x0e, 0xa9, - 0xfe, 0xb5, 0x94, 0x9f, 0x38, 0xdc, 0xb2, 0xd3, 0xb1, 0x2c, 0xca, 0xa7, 0x4a, 0x4b, 0x85, 0x61, - 0x29, 0x5b, 0x8c, 0xed, 0xf8, 0x1f, 0x69, 0x18, 0x89, 0xf3, 0x30, 0xbf, 0x2f, 0x85, 0x61, 0x73, - 0xa3, 0x5d, 0x6e, 0xb7, 0x25, 0xb7, 0xac, 0xf4, 0x94, 0x57, 0xcd, 0x78, 0x1a, 0xdf, 0x85, 0xa4, - 0x63, 0x79, 0xe5, 0xc6, 0x3d, 0x80, 0x7f, 0xc2, 0x39, 0x98, 0xb3, 0xec, 0x3d, 0xdb, 0xd2, 0x8c, - 0xbd, 0xe6, 0x11, 0x6f, 0xa7, 0x13, 0x59, 0x94, 0x9f, 0xa1, 0x23, 0xb9, 0xdc, 0x97, 0x18, 0x2c, - 0x6c, 0xfa, 0xff, 0x0b, 0xbb, 0xf0, 0x02, 0xe2, 0x76, 0xcf, 0xe4, 0x9e, 0x9a, 0xff, 0x4b, 0x0f, - 0x0b, 0xa1, 0x1e, 0x14, 0x22, 0xf0, 0xac, 0x67, 0x72, 0xea, 0x31, 0xa2, 0xea, 0x8e, 0x45, 0xd7, - 0x1d, 0x32, 0x6d, 0x6a, 0xd4, 0xb4, 0x49, 0x8a, 0xc6, 0xcc, 0x4c, 0xdc, 0xda, 0xcc, 0x71, 0x2b, - 0x92, 0x11, 0x56, 0x1c, 0xc2, 0x42, 0xa8, 0xb3, 0x81, 0x48, 0xfc, 0x12, 0x92, 0x7d, 0x98, 0x63, - 0xf9, 0x5e, 0x3c, 0x1a, 0xf1, 0x22, 0x82, 0x51, 0xf3, 0xd0, 0xd4, 0x67, 0xe1, 0x45, 0x48, 0x70, - 0x29, 0x85, 0xf4, 0x5d, 0x18, 0x1c, 0x72, 0x6b, 0xb0, 0xac, 0x0b, 0xfb, 0x60, 0xbf, 0xe7, 0x4f, - 0x50, 0xad, 0xeb, 0xd8, 0x6d, 0x71, 0x6a, 0x04, 0x05, 0x5f, 0x3f, 0x85, 0x2b, 0xf0, 0x60, 0x02, - 0xdb, 0x32, 0x85, 0x61, 0xf1, 0xd5, 0x35, 0xb8, 0x37, 0xa1, 0x4b, 0x78, 0x06, 0xe2, 0x15, 0xbd, - 0xc2, 0x54, 0x05, 0xa7, 0x60, 0x5a, 0xd3, 0x77, 0xeb, 0x5a, 0x5d, 0x53, 0x11, 0x06, 0x48, 0xae, - 0x97, 0xf5, 0x75, 0x6d, 0x5b, 0x8d, 0xad, 0xb6, 0xe0, 0xfe, 0x44, 0x5d, 0x38, 0x09, 0xb1, 0xea, - 0x1b, 0x55, 0xc1, 0x59, 0x58, 0x66, 0xd5, 0x6a, 0xe3, 0x6d, 0x59, 0xff, 0xd0, 0xa0, 0xda, 0x6e, - 0x5d, 0xab, 0xb1, 0x5a, 0x63, 0x47, 0xa3, 0x0d, 0xa6, 0xe9, 0x65, 0x9d, 0xa9, 0x08, 0xcf, 0x42, - 0x42, 0xa3, 0xb4, 0x4a, 0xd5, 0x18, 0xbe, 0x03, 0xff, 0xd5, 0xb6, 0xea, 0x8c, 0x55, 0xf4, 0xd7, - 0x8d, 0x8d, 0xea, 0x3b, 0x5d, 0x9d, 0x2a, 0xfd, 0x44, 0x21, 0xbf, 0x37, 0x85, 0x0c, 0x56, 0xa9, - 0x0e, 0x29, 0x3f, 0xdc, 0x16, 0xc2, 0xc4, 0x2b, 0x23, 0x76, 0xff, 0xbb, 0xaf, 0x99, 0x95, 0x49, - 0xfd, 0xf0, 0xb1, 0x39, 0x25, 0x8f, 0x9e, 0x20, 0x6c, 0xc0, 0x52, 0xa4, 0x65, 0xf8, 0xf1, 0x08, - 0xff, 0xba, 0xa6, 0x64, 0x56, 0x6f, 0x03, 0x1d, 0x74, 0xa0, 0x64, 0xc2, 0x62, 0x58, 0xdd, 0x70, - 0x9c, 0xde, 0xc3, 0x5c, 0x10, 0x7b, 0xfa, 0xb2, 0x37, 0xad, 0x56, 0x26, 0x7b, 0xd3, 0xc0, 0x0d, - 0x14, 0xbe, 0x2a, 0x9f, 0x5d, 0x10, 0xe5, 0xfc, 0x82, 0x28, 0x57, 0x17, 0x04, 0x7d, 0x76, 0x09, - 0xfa, 0xe6, 0x12, 0xf4, 0xdd, 0x25, 0xe8, 0xcc, 0x25, 0xe8, 0x97, 0x4b, 0xd0, 0x6f, 0x97, 0x28, - 0x57, 0x2e, 0x41, 0x5f, 0x2f, 0x89, 0x72, 0x76, 0x49, 0x94, 0xf3, 0x4b, 0xa2, 0x7c, 0x0c, 0xbf, - 0xae, 0xcd, 0xa4, 0xf7, 0x30, 0x3e, 0xfd, 0x13, 0x00, 0x00, 0xff, 0xff, 0x88, 0x0c, 0xfe, 0x56, - 0x84, 0x05, 0x00, 0x00, + // 750 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x4d, 0x4f, 0xdb, 0x4c, + 0x10, 0xf6, 0xe6, 0x0b, 0x98, 0xf0, 0x82, 0xdf, 0x05, 0xde, 0x37, 0x8d, 0xa8, 0x89, 0xac, 0x0a, + 0xa5, 0x1c, 0x92, 0x2a, 0x54, 0x2a, 0xaa, 0x50, 0xa5, 0x94, 0x98, 0x12, 0x95, 0x3a, 0xe0, 0x38, + 0x6a, 0x4b, 0x0f, 0x51, 0x3e, 0x96, 0x24, 0x22, 0xf1, 0x1a, 0x7b, 0x0d, 0xca, 0xad, 0xc7, 0x1e, + 0xfb, 0x27, 0x2a, 0xf5, 0xa7, 0xf4, 0xc8, 0x91, 0x43, 0x0f, 0xc5, 0x5c, 0x7a, 0xe4, 0x27, 0x54, + 0x76, 0xec, 0xd4, 0x09, 0x09, 0x70, 0x9b, 0x19, 0x3f, 0x33, 0x3b, 0xf3, 0x3c, 0xb3, 0x5e, 0x58, + 0x34, 0x1b, 0x6d, 0xd2, 0xb4, 0xba, 0xc4, 0xc8, 0xe8, 0x06, 0x65, 0x14, 0xc7, 0x87, 0x01, 0xbd, + 0x9e, 0x5c, 0x6e, 0xd1, 0x16, 0x75, 0xe3, 0x59, 0xc7, 0x1a, 0x40, 0x92, 0xcf, 0x5b, 0x1d, 0xd6, + 0xb6, 0xea, 0x99, 0x06, 0xed, 0x65, 0xcf, 0x49, 0xed, 0x8c, 0x9c, 0x53, 0xe3, 0xc4, 0xcc, 0x36, + 0x68, 0xaf, 0x47, 0xb5, 0x6c, 0x9b, 0x31, 0xbd, 0x65, 0xe8, 0x8d, 0xa1, 0x31, 0xc8, 0x12, 0x8f, + 0x00, 0x1f, 0x5a, 0xc4, 0xe8, 0x10, 0x43, 0xa5, 0x65, 0xff, 0x0c, 0xbc, 0x0a, 0x73, 0xa7, 0x83, + 0x68, 0xb1, 0x90, 0x40, 0x29, 0x94, 0x9e, 0x53, 0xfe, 0x06, 0xf0, 0x3a, 0x2c, 0x78, 0x4e, 0xbe, + 0xd9, 0x34, 0x88, 0x69, 0x26, 0x42, 0x2e, 0x64, 0x2c, 0x2a, 0x7e, 0x0b, 0x03, 0x1e, 0xd6, 0x54, + 0xa9, 0x77, 0x0e, 0x4e, 0xc0, 0x8c, 0x03, 0xec, 0x7b, 0xa5, 0x23, 0x8a, 0xef, 0xe2, 0x17, 0x10, + 0x77, 0xda, 0x53, 0xc8, 0xa9, 0x45, 0x4c, 0xe6, 0x56, 0x8d, 0xe7, 0x56, 0x32, 0xc3, 0x96, 0xf7, + 0x54, 0xf5, 0xc0, 0xfb, 0xa8, 0x04, 0x91, 0x38, 0x0d, 0x8b, 0xc7, 0x06, 0xd5, 0x18, 0xd1, 0x9a, + 0x7e, 0x4b, 0x61, 0xb7, 0xa5, 0xf1, 0x30, 0xfe, 0x0f, 0x62, 0x96, 0xe9, 0x8e, 0x15, 0x71, 0x01, + 0x9e, 0x87, 0x45, 0x98, 0x37, 0x59, 0x8d, 0x99, 0x92, 0x56, 0xab, 0x77, 0x49, 0x33, 0x11, 0x4d, + 0xa1, 0xf4, 0xac, 0x32, 0x12, 0xc3, 0x02, 0xc0, 0xb1, 0x51, 0x6b, 0xf5, 0x88, 0xc6, 0x8a, 0x85, + 0x44, 0xcc, 0xed, 0x3d, 0x10, 0xc1, 0x9f, 0x60, 0xa1, 0xd1, 0xee, 0x74, 0x9b, 0xc5, 0x02, 0xa3, + 0xce, 0x79, 0x66, 0x62, 0x26, 0x15, 0x4e, 0xc7, 0x73, 0x9b, 0x99, 0x80, 0x7a, 0x99, 0xdb, 0x8c, + 0x64, 0x76, 0x46, 0xb2, 0x24, 0x8d, 0x19, 0x7d, 0x65, 0xac, 0x94, 0xd3, 0x78, 0xc7, 0x54, 0x28, + 0x65, 0x89, 0x59, 0xb7, 0x35, 0xcf, 0x4b, 0xe6, 0x61, 0x69, 0x42, 0x3a, 0xe6, 0x21, 0x7c, 0x42, + 0xfa, 0x1e, 0xc1, 0x8e, 0x89, 0x97, 0x21, 0x7a, 0x56, 0xeb, 0x5a, 0xc4, 0x13, 0x6b, 0xe0, 0xbc, + 0x0c, 0x6d, 0x21, 0xf1, 0x4b, 0x08, 0x96, 0x76, 0x3d, 0x9e, 0x82, 0x5b, 0xb0, 0x05, 0x11, 0xd6, + 0xd7, 0x89, 0x5b, 0x64, 0x21, 0xf7, 0x64, 0x64, 0x8a, 0x09, 0x78, 0xb5, 0xaf, 0x13, 0xc5, 0xcd, + 0x98, 0xa4, 0x47, 0x68, 0xb2, 0x1e, 0x81, 0x65, 0x08, 0x8f, 0x2e, 0xc3, 0x34, 0xa5, 0xc6, 0x96, + 0x24, 0xfa, 0xe0, 0x25, 0x19, 0x97, 0x38, 0x76, 0x5b, 0x62, 0xf1, 0x04, 0x96, 0x02, 0xfa, 0xf8, + 0x43, 0xe2, 0x57, 0x10, 0x73, 0x60, 0x96, 0xe9, 0x71, 0xb1, 0x3e, 0x4d, 0x51, 0x3f, 0xa3, 0xec, + 0xa2, 0x15, 0x2f, 0xcb, 0xe1, 0x9e, 0x18, 0x06, 0x35, 0x7c, 0xee, 0x5d, 0x47, 0xdc, 0x86, 0x55, + 0x99, 0xb2, 0xce, 0x71, 0xdf, 0xdb, 0x83, 0x72, 0xdb, 0x62, 0x4d, 0x7a, 0xae, 0xf9, 0x0d, 0xdf, + 0x79, 0x0b, 0xc5, 0x35, 0x78, 0x3c, 0x25, 0xdb, 0xd4, 0xa9, 0x66, 0x92, 0x8d, 0x6d, 0xf8, 0x7f, + 0x8a, 0x4a, 0x78, 0x16, 0x22, 0x45, 0xb9, 0xa8, 0xf2, 0x1c, 0x8e, 0xc3, 0x8c, 0x24, 0x1f, 0x56, + 0xa4, 0x8a, 0xc4, 0x23, 0x0c, 0x10, 0xdb, 0xc9, 0xcb, 0x3b, 0xd2, 0x3e, 0x1f, 0xda, 0x68, 0xc0, + 0xa3, 0xa9, 0x73, 0xe1, 0x18, 0x84, 0x4a, 0x6f, 0x79, 0x0e, 0xa7, 0x60, 0x55, 0x2d, 0x95, 0xaa, + 0xef, 0xf2, 0xf2, 0xc7, 0xaa, 0x22, 0x1d, 0x56, 0xa4, 0xb2, 0x5a, 0xae, 0x1e, 0x48, 0x4a, 0x55, + 0x95, 0xe4, 0xbc, 0xac, 0xf2, 0x08, 0xcf, 0x41, 0x54, 0x52, 0x94, 0x92, 0xc2, 0x87, 0xf0, 0xbf, + 0xf0, 0x4f, 0x79, 0xaf, 0xa2, 0xaa, 0x45, 0xf9, 0x4d, 0xb5, 0x50, 0x7a, 0x2f, 0xf3, 0xe1, 0xdc, + 0x4f, 0x14, 0xe0, 0x7b, 0x97, 0x1a, 0xfe, 0x2f, 0xa2, 0x02, 0x71, 0xcf, 0xdc, 0xa7, 0x54, 0xc7, + 0x6b, 0x23, 0x74, 0xdf, 0xfe, 0x5f, 0x25, 0xd7, 0xee, 0xb9, 0x61, 0x22, 0x97, 0x46, 0xcf, 0x10, + 0xd6, 0x60, 0x65, 0x22, 0x65, 0xf8, 0xe9, 0x48, 0xfe, 0x5d, 0xa2, 0x24, 0x37, 0x1e, 0x02, 0x1d, + 0x28, 0x90, 0xd3, 0x61, 0x39, 0x38, 0xdd, 0x70, 0x9d, 0x3e, 0xc0, 0xbc, 0x6f, 0xbb, 0xf3, 0xa5, + 0xee, 0xbb, 0x5a, 0xc9, 0xd4, 0x7d, 0x0b, 0x37, 0x98, 0xf0, 0x75, 0xfe, 0xe2, 0x4a, 0xe0, 0x2e, + 0xaf, 0x04, 0xee, 0xe6, 0x4a, 0x40, 0x9f, 0x6d, 0x01, 0x7d, 0xb7, 0x05, 0xf4, 0xc3, 0x16, 0xd0, + 0x85, 0x2d, 0xa0, 0x5f, 0xb6, 0x80, 0x7e, 0xdb, 0x02, 0x77, 0x63, 0x0b, 0xe8, 0xeb, 0xb5, 0xc0, + 0x5d, 0x5c, 0x0b, 0xdc, 0xe5, 0xb5, 0xc0, 0x1d, 0x05, 0x5f, 0x97, 0x7a, 0xcc, 0x7d, 0x18, 0x36, + 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0xee, 0xbe, 0x64, 0x48, 0x84, 0x06, 0x00, 0x00, } func (x FrontendToSchedulerType) String() string { @@ -518,6 +562,9 @@ func (this *QuerierToScheduler) Equal(that interface{}) bool { if this.QuerierID != that1.QuerierID { return false } + if this.QuerierAddress != that1.QuerierAddress { + return false + } return true } func (this *SchedulerToQuerier) Equal(that interface{}) bool { @@ -554,6 +601,20 @@ func (this *SchedulerToQuerier) Equal(that interface{}) bool { if this.StatsEnabled != that1.StatsEnabled { return false } + if this.FragmentID != that1.FragmentID { + return false + } + if len(this.ChildIDtoAddrs) != len(that1.ChildIDtoAddrs) { + return false + } + for i := range this.ChildIDtoAddrs { + if this.ChildIDtoAddrs[i] != that1.ChildIDtoAddrs[i] { + return false + } + } + if this.IsRoot != that1.IsRoot { + return false + } return true } func (this *FrontendToScheduler) Equal(that interface{}) bool { @@ -671,9 +732,10 @@ func (this *QuerierToScheduler) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 5) + s := make([]string, 0, 6) s = append(s, "&schedulerpb.QuerierToScheduler{") s = append(s, "QuerierID: "+fmt.Sprintf("%#v", this.QuerierID)+",\n") + s = append(s, "QuerierAddress: "+fmt.Sprintf("%#v", this.QuerierAddress)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -681,7 +743,7 @@ func (this *SchedulerToQuerier) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 9) + s := make([]string, 0, 12) s = append(s, "&schedulerpb.SchedulerToQuerier{") s = append(s, "QueryID: "+fmt.Sprintf("%#v", this.QueryID)+",\n") if this.HttpRequest != nil { @@ -690,6 +752,21 @@ func (this *SchedulerToQuerier) GoString() string { s = append(s, "FrontendAddress: "+fmt.Sprintf("%#v", this.FrontendAddress)+",\n") s = append(s, "UserID: "+fmt.Sprintf("%#v", this.UserID)+",\n") s = append(s, "StatsEnabled: "+fmt.Sprintf("%#v", this.StatsEnabled)+",\n") + s = append(s, "FragmentID: "+fmt.Sprintf("%#v", this.FragmentID)+",\n") + keysForChildIDtoAddrs := make([]uint64, 0, len(this.ChildIDtoAddrs)) + for k, _ := range this.ChildIDtoAddrs { + keysForChildIDtoAddrs = append(keysForChildIDtoAddrs, k) + } + github_com_gogo_protobuf_sortkeys.Uint64s(keysForChildIDtoAddrs) + mapStringForChildIDtoAddrs := "map[uint64]string{" + for _, k := range keysForChildIDtoAddrs { + mapStringForChildIDtoAddrs += fmt.Sprintf("%#v: %#v,", k, this.ChildIDtoAddrs[k]) + } + mapStringForChildIDtoAddrs += "}" + if this.ChildIDtoAddrs != nil { + s = append(s, "ChildIDtoAddrs: "+mapStringForChildIDtoAddrs+",\n") + } + s = append(s, "IsRoot: "+fmt.Sprintf("%#v", this.IsRoot)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1048,6 +1125,13 @@ func (m *QuerierToScheduler) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.QuerierAddress) > 0 { + i -= len(m.QuerierAddress) + copy(dAtA[i:], m.QuerierAddress) + i = encodeVarintScheduler(dAtA, i, uint64(len(m.QuerierAddress))) + i-- + dAtA[i] = 0x12 + } if len(m.QuerierID) > 0 { i -= len(m.QuerierID) copy(dAtA[i:], m.QuerierID) @@ -1078,6 +1162,38 @@ func (m *SchedulerToQuerier) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.IsRoot { + i-- + if m.IsRoot { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x40 + } + if len(m.ChildIDtoAddrs) > 0 { + for k := range m.ChildIDtoAddrs { + v := m.ChildIDtoAddrs[k] + baseI := i + i -= len(v) + copy(dAtA[i:], v) + i = encodeVarintScheduler(dAtA, i, uint64(len(v))) + i-- + dAtA[i] = 0x12 + i = encodeVarintScheduler(dAtA, i, uint64(k)) + i-- + dAtA[i] = 0x8 + i = encodeVarintScheduler(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0x3a + } + } + if m.FragmentID != 0 { + i = encodeVarintScheduler(dAtA, i, uint64(m.FragmentID)) + i-- + dAtA[i] = 0x30 + } if m.StatsEnabled { i-- if m.StatsEnabled { @@ -1300,6 +1416,10 @@ func (m *QuerierToScheduler) Size() (n int) { if l > 0 { n += 1 + l + sovScheduler(uint64(l)) } + l = len(m.QuerierAddress) + if l > 0 { + n += 1 + l + sovScheduler(uint64(l)) + } return n } @@ -1327,6 +1447,20 @@ func (m *SchedulerToQuerier) Size() (n int) { if m.StatsEnabled { n += 2 } + if m.FragmentID != 0 { + n += 1 + sovScheduler(uint64(m.FragmentID)) + } + if len(m.ChildIDtoAddrs) > 0 { + for k, v := range m.ChildIDtoAddrs { + _ = k + _ = v + mapEntrySize := 1 + sovScheduler(uint64(k)) + 1 + len(v) + sovScheduler(uint64(len(v))) + n += mapEntrySize + 1 + sovScheduler(uint64(mapEntrySize)) + } + } + if m.IsRoot { + n += 2 + } return n } @@ -1410,6 +1544,7 @@ func (this *QuerierToScheduler) String() string { } s := strings.Join([]string{`&QuerierToScheduler{`, `QuerierID:` + fmt.Sprintf("%v", this.QuerierID) + `,`, + `QuerierAddress:` + fmt.Sprintf("%v", this.QuerierAddress) + `,`, `}`, }, "") return s @@ -1418,12 +1553,25 @@ func (this *SchedulerToQuerier) String() string { if this == nil { return "nil" } + keysForChildIDtoAddrs := make([]uint64, 0, len(this.ChildIDtoAddrs)) + for k, _ := range this.ChildIDtoAddrs { + keysForChildIDtoAddrs = append(keysForChildIDtoAddrs, k) + } + github_com_gogo_protobuf_sortkeys.Uint64s(keysForChildIDtoAddrs) + mapStringForChildIDtoAddrs := "map[uint64]string{" + for _, k := range keysForChildIDtoAddrs { + mapStringForChildIDtoAddrs += fmt.Sprintf("%v: %v,", k, this.ChildIDtoAddrs[k]) + } + mapStringForChildIDtoAddrs += "}" s := strings.Join([]string{`&SchedulerToQuerier{`, `QueryID:` + fmt.Sprintf("%v", this.QueryID) + `,`, `HttpRequest:` + strings.Replace(fmt.Sprintf("%v", this.HttpRequest), "HTTPRequest", "httpgrpc.HTTPRequest", 1) + `,`, `FrontendAddress:` + fmt.Sprintf("%v", this.FrontendAddress) + `,`, `UserID:` + fmt.Sprintf("%v", this.UserID) + `,`, `StatsEnabled:` + fmt.Sprintf("%v", this.StatsEnabled) + `,`, + `FragmentID:` + fmt.Sprintf("%v", this.FragmentID) + `,`, + `ChildIDtoAddrs:` + mapStringForChildIDtoAddrs + `,`, + `IsRoot:` + fmt.Sprintf("%v", this.IsRoot) + `,`, `}`, }, "") return s @@ -1542,6 +1690,38 @@ func (m *QuerierToScheduler) Unmarshal(dAtA []byte) error { } m.QuerierID = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field QuerierAddress", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.QuerierAddress = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipScheduler(dAtA[iNdEx:]) @@ -1734,6 +1914,158 @@ func (m *SchedulerToQuerier) Unmarshal(dAtA []byte) error { } } m.StatsEnabled = bool(v != 0) + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FragmentID", wireType) + } + m.FragmentID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.FragmentID |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ChildIDtoAddrs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ChildIDtoAddrs == nil { + m.ChildIDtoAddrs = make(map[uint64]string) + } + var mapkey uint64 + var mapvalue string + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + } else if fieldNum == 2 { + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapvalue |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthScheduler + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue < 0 { + return ErrInvalidLengthScheduler + } + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + } else { + iNdEx = entryPreIndex + skippy, err := skipScheduler(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.ChildIDtoAddrs[mapkey] = mapvalue + iNdEx = postIndex + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IsRoot", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.IsRoot = bool(v != 0) default: iNdEx = preIndex skippy, err := skipScheduler(dAtA[iNdEx:]) diff --git a/pkg/scheduler/schedulerpb/scheduler.proto b/pkg/scheduler/schedulerpb/scheduler.proto index eea28717b83..6df2049790b 100644 --- a/pkg/scheduler/schedulerpb/scheduler.proto +++ b/pkg/scheduler/schedulerpb/scheduler.proto @@ -9,6 +9,7 @@ import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto"; option (gogoproto.marshaler_all) = true; option (gogoproto.unmarshaler_all) = true; +option (gogoproto.sizer_all) = true; // Scheduler interface exposed to Queriers. service SchedulerForQuerier { @@ -27,7 +28,10 @@ service SchedulerForQuerier { // Querier reports its own clientID when it connects, so that scheduler knows how many *different* queriers are connected. // To signal that querier is ready to accept another request, querier sends empty message. message QuerierToScheduler { + string querierID = 1; + + string querierAddress = 2; } message SchedulerToQuerier { @@ -45,6 +49,16 @@ message SchedulerToQuerier { // Whether query statistics tracking should be enabled. The response will include // statistics only when this option is enabled. bool statsEnabled = 5; + + // Below are the meta data that will be used for distributed execution + // The ID of current logical query plan fragment. + uint64 fragmentID = 6; + + // The IDs and addresses of its child fragments + map childIDtoAddrs = 7; + + // Whether the current fragment is the root + bool isRoot = 8; } // Scheduler interface exposed to Frontend. Frontend can enqueue and cancel requests. diff --git a/pkg/util/logical_plan/test_logicalplan_utils.go b/pkg/util/logical_plan/test_logicalplan_utils.go new file mode 100644 index 00000000000..49bd8da286d --- /dev/null +++ b/pkg/util/logical_plan/test_logicalplan_utils.go @@ -0,0 +1,50 @@ +package logical_plan + +import ( + "time" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" +) + +func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) { + if step == 0 { + return start, start + } + return start, end +} + +func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, error) { + + start, end = getStartAndEnd(start, end, step) + + qOpts := query.Options{ + Start: start, + End: end, + Step: step, + StepsBatch: 10, + NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration { + return 0 + }, + LookbackDelta: 0, + EnablePerStepStats: false, + } + + expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr() + if err != nil { + return nil, err + } + + planOpts := logicalplan.PlanOptions{ + DisableDuplicateLabelCheck: false, + } + + logicalPlan, err := logicalplan.NewFromAST(expr, &qOpts, planOpts) + if err != nil { + return nil, err + } + optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers) + + return &optimizedPlan, nil +}