Skip to content

Commit 8200315

Browse files
authored
chore: simplify Processor.Process() (#1354)
Do no return anything from it since it is mutation of a plan anyway. Cosmo already does not use the returned value. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Refactor** * Restructured query plan processing workflow to separate caching and post-processing logic. * Updated plan processor architecture with new internal components for enhanced plan optimization. * **Tests** * Updated test cases to reflect changes in plan processing flow. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 33b847e commit 8200315

File tree

4 files changed

+26
-20
lines changed

4 files changed

+26
-20
lines changed

execution/engine/execution_engine.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,9 @@ func (e *ExecutionEngine) getCachedPlan(ctx *internalExecutionContext, operation
260260
return nil
261261
}
262262

263-
p := ctx.postProcessor.Process(planResult)
264-
e.executionPlanCache.Add(cacheKey, p)
265-
return p
263+
ctx.postProcessor.Process(planResult)
264+
e.executionPlanCache.Add(cacheKey, planResult)
265+
return planResult
266266
}
267267

268268
func (e *ExecutionEngine) GetWebsocketBeforeStartHook() WebsocketBeforeStartHook {

v2/pkg/engine/plan/visitor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,7 @@ func (v *Visitor) configureObjectFetch(config *objectFetchConfiguration) {
13131313
v.response.RawFetches = append(v.response.RawFetches, fetchItem)
13141314
}
13151315

1316+
// configureFetch builds and assembles all fields of resolve.SingleFetch.
13161317
func (v *Visitor) configureFetch(internal *objectFetchConfiguration, external resolve.FetchConfiguration) *resolve.SingleFetch {
13171318
dataSourceType := reflect.TypeOf(external.DataSource).String()
13181319
dataSourceType = strings.TrimPrefix(dataSourceType, "*")

v2/pkg/engine/postprocess/postprocess.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type FetchTreeProcessor interface {
1616
ProcessFetchTree(root *resolve.FetchTreeNode)
1717
}
1818

19+
// Processor transforms and optimizes the query plan after
20+
// it's been created by the planner but before execution.
1921
type Processor struct {
2022
disableExtractFetches bool
2123
collectDataSourceInfo bool
@@ -131,7 +133,11 @@ func NewProcessor(options ...ProcessorOption) *Processor {
131133
}
132134
}
133135

134-
func (p *Processor) Process(pre plan.Plan) plan.Plan {
136+
// Process takes a raw query plan and optimizes it by deduplicating fetches,
137+
// ordering them correctly by dependencies, and resolving any templated inputs.
138+
// It groups already-ordered fetches into parallel execution batches
139+
// when they have the same dependency requirements satisfied.
140+
func (p *Processor) Process(pre plan.Plan) {
135141
switch t := pre.(type) {
136142
case *plan.SynchronousResponsePlan:
137143
for i := range p.processResponseTree {
@@ -162,10 +168,9 @@ func (p *Processor) Process(pre plan.Plan) plan.Plan {
162168
p.processFetchTree[i].ProcessFetchTree(t.Response.Response.Fetches)
163169
}
164170
}
165-
return pre
166171
}
167172

168-
// createFetchTree creates an inital fetch tree from the raw fetches in the GraphQL response.
173+
// createFetchTree creates an initial fetch tree from the raw fetches in the GraphQL response.
169174
// The initial fetch tree is a node of sequence fetch kind, with a flat list of fetches as children.
170175
func (p *Processor) createFetchTree(res *resolve.GraphQLResponse) {
171176
if p.disableExtractFetches {

v2/pkg/engine/postprocess/postprocess_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ import (
1616
func TestProcess_ExtractFetches(t *testing.T) {
1717
type TestCase struct {
1818
name string
19-
pre plan.Plan
19+
plan plan.Plan
2020
expected plan.Plan
2121
}
2222

2323
cases := []TestCase{
2424
{
2525
name: "1",
26-
pre: &plan.SynchronousResponsePlan{
26+
plan: &plan.SynchronousResponsePlan{
2727
Response: &resolve.GraphQLResponse{
2828
RawFetches: []*resolve.FetchItem{
2929
{Fetch: &resolve.SingleFetch{FetchDependencies: resolve.FetchDependencies{FetchID: 1}}},
@@ -64,7 +64,7 @@ func TestProcess_ExtractFetches(t *testing.T) {
6464
},
6565
{
6666
name: "2",
67-
pre: &plan.SynchronousResponsePlan{
67+
plan: &plan.SynchronousResponsePlan{
6868
Response: &resolve.GraphQLResponse{
6969
RawFetches: []*resolve.FetchItem{
7070
{
@@ -161,7 +161,7 @@ func TestProcess_ExtractFetches(t *testing.T) {
161161
},
162162
{
163163
name: "3",
164-
pre: &plan.SynchronousResponsePlan{
164+
plan: &plan.SynchronousResponsePlan{
165165
Response: &resolve.GraphQLResponse{
166166
RawFetches: []*resolve.FetchItem{
167167
{
@@ -274,7 +274,7 @@ func TestProcess_ExtractFetches(t *testing.T) {
274274
},
275275
{
276276
name: "4",
277-
pre: &plan.SynchronousResponsePlan{
277+
plan: &plan.SynchronousResponsePlan{
278278
Response: &resolve.GraphQLResponse{
279279
RawFetches: []*resolve.FetchItem{
280280
{
@@ -368,9 +368,9 @@ func TestProcess_ExtractFetches(t *testing.T) {
368368

369369
for _, c := range cases {
370370
t.Run(c.name, func(t *testing.T) {
371-
actual := processor.Process(c.pre)
371+
processor.Process(c.plan)
372372

373-
if !assert.Equal(t, c.expected, actual) {
373+
if !assert.Equal(t, c.expected, c.plan) {
374374
formatterConfig := map[reflect.Type]interface{}{
375375
reflect.TypeOf([]byte{}): func(b []byte) string { return fmt.Sprintf(`"%s"`, string(b)) },
376376
}
@@ -381,7 +381,7 @@ func TestProcess_ExtractFetches(t *testing.T) {
381381
Formatter: formatterConfig,
382382
}
383383

384-
if diff := prettyCfg.Compare(c.expected, actual); diff != "" {
384+
if diff := prettyCfg.Compare(c.expected, c.plan); diff != "" {
385385
t.Errorf("Plan does not match(-want +got)\n%s", diff)
386386
}
387387
}
@@ -392,14 +392,14 @@ func TestProcess_ExtractFetches(t *testing.T) {
392392
func TestProcess_ExtractServiceNames(t *testing.T) {
393393
type TestCase struct {
394394
name string
395-
pre plan.Plan
395+
plan plan.Plan
396396
expected plan.Plan
397397
}
398398

399399
cases := []TestCase{
400400
{
401401
name: "Collect all service names",
402-
pre: &plan.SynchronousResponsePlan{
402+
plan: &plan.SynchronousResponsePlan{
403403
Response: &resolve.GraphQLResponse{
404404
RawFetches: []*resolve.FetchItem{
405405
{
@@ -533,7 +533,7 @@ func TestProcess_ExtractServiceNames(t *testing.T) {
533533
},
534534
{
535535
name: "Deduplicate the same service names",
536-
pre: &plan.SynchronousResponsePlan{
536+
plan: &plan.SynchronousResponsePlan{
537537
Response: &resolve.GraphQLResponse{
538538
RawFetches: []*resolve.FetchItem{
539539
{
@@ -679,9 +679,9 @@ func TestProcess_ExtractServiceNames(t *testing.T) {
679679

680680
for _, c := range cases {
681681
t.Run(c.name, func(t *testing.T) {
682-
actual := processor.Process(c.pre)
682+
processor.Process(c.plan)
683683

684-
if !assert.Equal(t, c.expected, actual) {
684+
if !assert.Equal(t, c.expected, c.plan) {
685685
formatterConfig := map[reflect.Type]interface{}{
686686
reflect.TypeOf([]byte{}): func(b []byte) string { return fmt.Sprintf(`"%s"`, string(b)) },
687687
}
@@ -692,7 +692,7 @@ func TestProcess_ExtractServiceNames(t *testing.T) {
692692
Formatter: formatterConfig,
693693
}
694694

695-
if diff := prettyCfg.Compare(c.expected, actual); diff != "" {
695+
if diff := prettyCfg.Compare(c.expected, c.plan); diff != "" {
696696
t.Errorf("Plan does not match(-want +got)\n%s", diff)
697697
}
698698
}

0 commit comments

Comments
 (0)