From 02fd6071b4a8da3d97bb6f80d7a8c1ce8b4e7bcf Mon Sep 17 00:00:00 2001 From: rubywtl Date: Fri, 5 Sep 2025 14:32:31 -0700 Subject: [PATCH] logical plan fragmentation implementation Signed-off-by: rubywtl --- pkg/distributed_execution/codec_test.go | 2 +- .../distributed_optimizer.go | 4 +- .../distributed_optimizer_test.go | 50 +------- .../plan_fragments/fragmenter.go | 100 ++++++++++++++-- .../plan_fragments/fragmenter_test.go | 32 ++++- .../test_logicalplan_utils.go | 4 +- pkg/scheduler/scheduler.go | 4 +- pkg/scheduler/scheduler_test.go | 113 +++++++++++++----- 8 files changed, 208 insertions(+), 101 deletions(-) rename pkg/{util/logical_plan => distributed_execution}/test_logicalplan_utils.go (88%) diff --git a/pkg/distributed_execution/codec_test.go b/pkg/distributed_execution/codec_test.go index 1f294f7bc7f..915f00c720f 100644 --- a/pkg/distributed_execution/codec_test.go +++ b/pkg/distributed_execution/codec_test.go @@ -34,7 +34,7 @@ func TestUnmarshalWithLogicalPlan(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - plan, _, err := CreateTestLogicalPlan(tc.query, start, end, step) + plan, err := CreateTestLogicalPlan(tc.query, start, end, step) require.NoError(t, err) require.NotNil(t, plan) diff --git a/pkg/distributed_execution/distributed_optimizer.go b/pkg/distributed_execution/distributed_optimizer.go index 4d0fdbe1d5c..c63a8d05031 100644 --- a/pkg/distributed_execution/distributed_optimizer.go +++ b/pkg/distributed_execution/distributed_optimizer.go @@ -1,10 +1,9 @@ package distributed_execution import ( - "github.com/thanos-io/promql-engine/query" - "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" ) // This is a simplified implementation that only handles binary aggregation cases @@ -18,6 +17,7 @@ type DistributedOptimizer struct{} func (d *DistributedOptimizer) Optimize(root logicalplan.Node, opts *query.Options) (logicalplan.Node, annotations.Annotations) { warns := annotations.New() + // insert remote nodes logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool { if (*current).Type() == logicalplan.BinaryNode && d.hasAggregation(current) { diff --git a/pkg/distributed_execution/distributed_optimizer_test.go b/pkg/distributed_execution/distributed_optimizer_test.go index 73e818cc6aa..7504d178749 100644 --- a/pkg/distributed_execution/distributed_optimizer_test.go +++ b/pkg/distributed_execution/distributed_optimizer_test.go @@ -4,10 +4,8 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" ) func TestDistributedOptimizer(t *testing.T) { @@ -58,7 +56,7 @@ func TestDistributedOptimizer(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - lp, _, err := CreateTestLogicalPlan(tc.query, now, now, time.Minute) + lp, err := CreateTestLogicalPlan(tc.query, now, now, time.Minute) require.NoError(t, err) node := (*lp).Root() @@ -75,49 +73,3 @@ func TestDistributedOptimizer(t *testing.T) { }) } } - -func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) { - if step == 0 { - return start, start - } - return start, end -} - -func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, query.Options, error) { - - start, end = getStartAndEnd(start, end, step) - - qOpts := query.Options{ - Start: start, - End: end, - Step: step, - StepsBatch: 10, - NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration { - return 0 - }, - LookbackDelta: 0, - EnablePerStepStats: false, - } - - expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr() - if err != nil { - return nil, qOpts, err - } - - planOpts := logicalplan.PlanOptions{ - DisableDuplicateLabelCheck: false, - } - - logicalPlan, err := logicalplan.NewFromAST(expr, &qOpts, planOpts) - if err != nil { - return nil, qOpts, err - } - optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers) - - distributedOptimizer := DistributedOptimizer{} - dOptimizedNode, _ := distributedOptimizer.Optimize(optimizedPlan.Root(), &qOpts) - - plan := logicalplan.New(dOptimizedNode, &qOpts, planOpts) - - return &plan, qOpts, nil -} diff --git a/pkg/distributed_execution/plan_fragments/fragmenter.go b/pkg/distributed_execution/plan_fragments/fragmenter.go index d2bd187dfa6..2d3e037e9ee 100644 --- a/pkg/distributed_execution/plan_fragments/fragmenter.go +++ b/pkg/distributed_execution/plan_fragments/fragmenter.go @@ -1,27 +1,103 @@ package plan_fragments -import "github.com/thanos-io/promql-engine/logicalplan" +import ( + "encoding/binary" + + "github.com/google/uuid" + "github.com/thanos-io/promql-engine/logicalplan" + + "github.com/cortexproject/cortex/pkg/distributed_execution" +) // Fragmenter interface type Fragmenter interface { // Fragment function fragments the logical query plan and will always return the fragment in the order of child-to-root // in other words, the order of the fragment in the array will be the order they are being scheduled - Fragment(node logicalplan.Node) ([]Fragment, error) + Fragment(queryID uint64, node logicalplan.Node) ([]Fragment, error) } -type DummyFragmenter struct { +func getNewID() uint64 { + id := uuid.New() + return binary.BigEndian.Uint64(id[:8]) } -func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) { - // simple logic without distributed optimizer - return []Fragment{ - { +type PlanFragmenter struct { +} + +func (f *PlanFragmenter) Fragment(queryID uint64, node logicalplan.Node) ([]Fragment, error) { + fragments := []Fragment{} + + nodeToFragmentID := make(map[*logicalplan.Node]uint64) + nodeToSubtreeFragmentIDs := make(map[*logicalplan.Node][]uint64) + + logicalplan.TraverseBottomUp(nil, &node, func(parent, current *logicalplan.Node) bool { + childFragmentIDs := make(map[uint64]bool) + children := (*current).Children() + + for _, child := range children { + if subtreeIDs, exists := nodeToSubtreeFragmentIDs[child]; exists { + for _, fragmentID := range subtreeIDs { + childFragmentIDs[fragmentID] = true + } + } + } + + childIDs := make([]uint64, 0, len(childFragmentIDs)) + for fragmentID := range childFragmentIDs { + childIDs = append(childIDs, fragmentID) + } + + if parent == nil { // root fragment + newFragment := Fragment{ + Node: *current, + FragmentID: getNewID(), + ChildIDs: childIDs, + IsRoot: true, + } + fragments = append(fragments, newFragment) + + // cache subtree fragment IDs for this node + nodeToSubtreeFragmentIDs[current] = childIDs + + } else if distributed_execution.RemoteNode == (*current).Type() { + remoteNode := (*current).(*distributed_execution.Remote) + fragmentID := getNewID() + nodeToFragmentID[current] = fragmentID + + // Set the fragment key for the remote node + key := distributed_execution.MakeFragmentKey(queryID, fragmentID) + remoteNode.FragmentKey = key + + newFragment := Fragment{ + Node: remoteNode.Expr, + FragmentID: fragmentID, + ChildIDs: childIDs, + IsRoot: false, + } + + fragments = append(fragments, newFragment) + + subtreeIDs := append([]uint64{fragmentID}, childIDs...) + nodeToSubtreeFragmentIDs[current] = subtreeIDs + } else { + nodeToSubtreeFragmentIDs[current] = childIDs + } + + return false + }) + + if len(fragments) > 0 { + return fragments, nil + } else { + // for non-query API calls + // --> treat as root fragment and immediately return the result + return []Fragment{{ Node: node, - FragmentID: uint64(1), + FragmentID: uint64(0), ChildIDs: []uint64{}, IsRoot: true, - }, - }, nil + }}, nil + } } type Fragment struct { @@ -47,6 +123,6 @@ func (s *Fragment) IsEmpty() bool { return true } -func NewDummyFragmenter() Fragmenter { - return &DummyFragmenter{} +func NewPlanFragmenter() Fragmenter { + return &PlanFragmenter{} } diff --git a/pkg/distributed_execution/plan_fragments/fragmenter_test.go b/pkg/distributed_execution/plan_fragments/fragmenter_test.go index 65f4c20022e..4a1b8f1dcae 100644 --- a/pkg/distributed_execution/plan_fragments/fragmenter_test.go +++ b/pkg/distributed_execution/plan_fragments/fragmenter_test.go @@ -6,9 +6,12 @@ import ( "github.com/stretchr/testify/require" - "github.com/cortexproject/cortex/pkg/util/logical_plan" + "github.com/cortexproject/cortex/pkg/distributed_execution" ) +// Tests fragmentation of logical plans, verifying that the fragments contain correct metadata. +// Note: The number of fragments is determined by the distributed optimizer's strategy - +// if the optimizer logic changes, this test will need to be updated accordingly. func TestFragmenter(t *testing.T) { type testCase struct { name string @@ -19,8 +22,6 @@ func TestFragmenter(t *testing.T) { } now := time.Now() - - // more tests will be added when distributed optimizer and fragmenter are implemented tests := []testCase{ { name: "simple logical query plan - no fragmentation", @@ -29,18 +30,37 @@ func TestFragmenter(t *testing.T) { end: now, expectedFragments: 1, }, + { + name: "binary operation with aggregations", + query: "sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])) + sum(rate(node_memory_Active_bytes[5m]))", + start: now, + end: now, + expectedFragments: 3, + }, + { + name: "multiple binary operation with aggregations", + query: "sum(rate(http_requests_total{job=\"api\"}[5m])) + sum(rate(http_requests_total{job=\"web\"}[5m])) + sum(rate(http_requests_total{job=\"cache\"}[5m])) + sum(rate(http_requests_total{job=\"db\"}[5m]))", + start: now, + end: now, + expectedFragments: 7, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - lp, err := logical_plan.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0) + lp, err := distributed_execution.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0) require.NoError(t, err) - fragmenter := NewDummyFragmenter() - res, err := fragmenter.Fragment((*lp).Root()) + fragmenter := NewPlanFragmenter() + res, err := fragmenter.Fragment(uint64(1), (*lp).Root()) require.NoError(t, err) require.Equal(t, tc.expectedFragments, len(res)) + + // check the metadata of the fragments of binary expressions + if len(res) == 3 { + require.Equal(t, []uint64{res[0].FragmentID, res[1].FragmentID}, res[2].ChildIDs) + } }) } } diff --git a/pkg/util/logical_plan/test_logicalplan_utils.go b/pkg/distributed_execution/test_logicalplan_utils.go similarity index 88% rename from pkg/util/logical_plan/test_logicalplan_utils.go rename to pkg/distributed_execution/test_logicalplan_utils.go index 49bd8da286d..d1717068488 100644 --- a/pkg/util/logical_plan/test_logicalplan_utils.go +++ b/pkg/distributed_execution/test_logicalplan_utils.go @@ -1,4 +1,4 @@ -package logical_plan +package distributed_execution import ( "time" @@ -44,7 +44,7 @@ func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time. if err != nil { return nil, err } - optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers) + optimizedPlan, _ := logicalPlan.Optimize(append(logicalplan.DefaultOptimizers, &DistributedOptimizer{})) return &optimizedPlan, nil } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8223884b26f..d9334819de9 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -125,7 +125,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe connectedFrontends: map[string]*connectedFrontend{}, fragmentTable: fragment_table.NewFragmentTable(2 * time.Minute), - fragmenter: plan_fragments.NewDummyFragmenter(), + fragmenter: plan_fragments.NewPlanFragmenter(), distributedExecEnabled: distributedExecEnabled, queryFragmentRegistry: map[queryKey][]uint64{}, } @@ -351,7 +351,7 @@ func (s *Scheduler) fragmentAndEnqueueRequest(frontendContext context.Context, f return err } - fragments, err := s.fragmenter.Fragment(lpNode) + fragments, err := s.fragmenter.Fragment(msg.QueryID, lpNode) if err != nil { return err } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index afdb8c000a1..8ba5c8df0f5 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -15,10 +15,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" "github.com/thanos-io/promql-engine/logicalplan" - "github.com/thanos-io/promql-engine/query" "github.com/uber/jaeger-client-go/config" "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc" @@ -504,8 +502,9 @@ func TestQuerierLoopClient_WithLogicalPlan(t *testing.T) { // CASE 3: request with correct logical plan --> expect to have fragment metadata scheduler.cleanupMetricsForInactiveUser("test2") - lp := createTestLogicalPlan(t, time.Now(), time.Now(), 0, "up") - bytesLp, err := logicalplan.Marshal(lp.Root()) + lp, err := distributed_execution.CreateTestLogicalPlan("up", time.Now(), time.Now(), 0) + require.NoError(t, err) + bytesLp, err := logicalplan.Marshal((*lp).Root()) form := url.Values{} form.Set("plan", string(bytesLp)) // this is to imitate how the real format of http request body require.NoError(t, err) @@ -535,35 +534,95 @@ func TestQuerierLoopClient_WithLogicalPlan(t *testing.T) { require.True(t, s3.IsRoot) } -func createTestLogicalPlan(t *testing.T, startTime time.Time, endTime time.Time, step time.Duration, q string) logicalplan.Plan { - qOpts := query.Options{ - Start: startTime, - End: startTime, - Step: 0, - StepsBatch: 10, - LookbackDelta: 0, - EnablePerStepStats: false, - } +// TestQuerierLoopClient_WithLogicalPlan_Fragmented checks if fragments of the logical plan +// can be picked up successfully and have the correct metadata with them +// It also tests scheduler coordination hashmap. +// It acts as an integration test for the scheduler for distributed query execution +// (this test relates to the design of distributed optimizer + fragmenter, so it needs to be adjusted accordingly) +func TestQuerierLoopClient_WithLogicalPlan_Fragmented(t *testing.T) { + reg := prometheus.NewPedanticRegistry() - if step != 0 { - qOpts.End = endTime - qOpts.Step = step - } + scheduler, frontendClient, querierClient := setupScheduler(t, reg, true) + scheduler.distributedExecEnabled = true - expr, err := parser.NewParser(q, parser.WithFunctions(parser.Functions)).ParseExpr() + frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345") + querierLoop, err := querierClient.QuerierLoop(context.Background()) require.NoError(t, err) - planOpts := logicalplan.PlanOptions{ - DisableDuplicateLabelCheck: false, - } + lp_long, err := distributed_execution.CreateTestLogicalPlan("sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])) + sum(rate(node_memory_Active_bytes[5m]))", time.Now(), time.Now(), 0) + require.NoError(t, err) + + bytesLp_long, err := logicalplan.Marshal((*lp_long).Root()) + form_long := url.Values{} + form_long.Set("plan", string(bytesLp_long)) // this is to imitate how the real format of http request body + require.NoError(t, err) + frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{ + Type: schedulerpb.ENQUEUE, + QueryID: 4, + UserID: "test", + HttpRequest: &httpgrpc.HTTPRequest{Method: "POST", Url: "/hello", Body: []byte(form_long.Encode())}, + }) - logicalPlan, _ := logicalplan.NewFromAST(expr, &qOpts, planOpts) - optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers) - dOptimizer := distributed_execution.DistributedOptimizer{} - dOptimizedPlanNode, _ := dOptimizer.Optimize(optimizedPlan.Root(), &qOpts) - lp := logicalplan.New(dOptimizedPlanNode, &qOpts, planOpts) + // check that there are three fragments enqueued + require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="0",type="fifo",user="test"} 3 + # HELP cortex_request_queue_requests_total Total number of query requests going to the request queue. + # TYPE cortex_request_queue_requests_total counter + cortex_request_queue_requests_total{priority="0",user="test"} 3 + `), "cortex_query_scheduler_queue_length", "cortex_request_queue_requests_total")) - return lp + // fragment 1 + require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{QuerierID: "querier-1", QuerierAddress: "localhost:8000"})) + s1, err := querierLoop.Recv() + + require.NoError(t, err) + require.NotEmpty(t, s1.FragmentID) + require.Equal(t, uint64(4), s1.QueryID) + require.Empty(t, s1.ChildIDtoAddrs) // there is only one fragment for the logical plan, so no child plan_fragments + require.False(t, s1.IsRoot) + + // check if the new address is added to the scheduler's table + addr1, exist := scheduler.fragmentTable.GetAddrByID(s1.QueryID, s1.FragmentID) + require.True(t, exist) + require.Equal(t, addr1, "localhost:8000") + require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{})) // mark ready for the next task + + // fragment 2 + require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{QuerierID: "querier-1", QuerierAddress: "localhost:8000"})) + s2, err := querierLoop.Recv() + + require.NoError(t, err) + require.NotEmpty(t, s2.FragmentID) + require.Equal(t, uint64(4), s2.QueryID) + require.Empty(t, s2.ChildIDtoAddrs) // there is only one fragment for the logical plan, so no child plan_fragments + require.False(t, s2.IsRoot) + + addr2, exist := scheduler.fragmentTable.GetAddrByID(s2.QueryID, s2.FragmentID) + require.True(t, exist) + require.Equal(t, addr2, "localhost:8000") + require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{})) // mark ready for the next task + + // fragment 3 + require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{QuerierID: "querier-1", QuerierAddress: "localhost:8000"})) + s3, err := querierLoop.Recv() + require.NoError(t, err) + require.NotEmpty(t, s3.FragmentID) + require.Equal(t, uint64(4), s3.QueryID) + require.Equal(t, s3.ChildIDtoAddrs, map[uint64]string{s1.FragmentID: addr1, s2.FragmentID: addr2}) // equal to the child fragment IDs + require.True(t, s3.IsRoot) + require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{})) + + // check that the 3 fragments have all been picked up + require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="0",type="fifo",user="test"} 0 + # HELP cortex_request_queue_requests_total Total number of query requests going to the request queue. + # TYPE cortex_request_queue_requests_total counter + cortex_request_queue_requests_total{priority="0",user="test"} 3 + `), "cortex_query_scheduler_queue_length", "cortex_request_queue_requests_total")) } func initFrontendLoop(t *testing.T, client schedulerpb.SchedulerForFrontendClient, frontendAddr string) schedulerpb.SchedulerForFrontend_FrontendLoopClient {