Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions execution/engine/execution_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,9 @@ func (e *ExecutionEngine) getCachedPlan(ctx *internalExecutionContext, operation
return nil
}

p := ctx.postProcessor.Process(planResult)
e.executionPlanCache.Add(cacheKey, p)
return p
ctx.postProcessor.Process(planResult)
e.executionPlanCache.Add(cacheKey, planResult)
return planResult
}

func (e *ExecutionEngine) GetWebsocketBeforeStartHook() WebsocketBeforeStartHook {
Expand Down
1 change: 1 addition & 0 deletions v2/pkg/engine/plan/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,7 @@ func (v *Visitor) configureObjectFetch(config *objectFetchConfiguration) {
v.response.RawFetches = append(v.response.RawFetches, fetchItem)
}

// configureFetch builds and assembles all fields of resolve.SingleFetch.
func (v *Visitor) configureFetch(internal *objectFetchConfiguration, external resolve.FetchConfiguration) *resolve.SingleFetch {
dataSourceType := reflect.TypeOf(external.DataSource).String()
dataSourceType = strings.TrimPrefix(dataSourceType, "*")
Expand Down
11 changes: 8 additions & 3 deletions v2/pkg/engine/postprocess/postprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type FetchTreeProcessor interface {
ProcessFetchTree(root *resolve.FetchTreeNode)
}

// Processor transforms and optimizes the query plan after
// it's been created by the planner but before execution.
type Processor struct {
disableExtractFetches bool
collectDataSourceInfo bool
Expand Down Expand Up @@ -131,7 +133,11 @@ func NewProcessor(options ...ProcessorOption) *Processor {
}
}

func (p *Processor) Process(pre plan.Plan) plan.Plan {
// Process takes a raw query plan and optimizes it by deduplicating fetches,
// ordering them correctly by dependencies, and resolving any templated inputs.
// It groups already-ordered fetches into parallel execution batches
// when they have the same dependency requirements satisfied.
func (p *Processor) Process(pre plan.Plan) {
switch t := pre.(type) {
case *plan.SynchronousResponsePlan:
for i := range p.processResponseTree {
Expand Down Expand Up @@ -162,10 +168,9 @@ func (p *Processor) Process(pre plan.Plan) plan.Plan {
p.processFetchTree[i].ProcessFetchTree(t.Response.Response.Fetches)
}
}
return pre
}

// createFetchTree creates an inital fetch tree from the raw fetches in the GraphQL response.
// createFetchTree creates an initial fetch tree from the raw fetches in the GraphQL response.
// The initial fetch tree is a node of sequence fetch kind, with a flat list of fetches as children.
func (p *Processor) createFetchTree(res *resolve.GraphQLResponse) {
if p.disableExtractFetches {
Expand Down
28 changes: 14 additions & 14 deletions v2/pkg/engine/postprocess/postprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
func TestProcess_ExtractFetches(t *testing.T) {
type TestCase struct {
name string
pre plan.Plan
plan plan.Plan
expected plan.Plan
}

cases := []TestCase{
{
name: "1",
pre: &plan.SynchronousResponsePlan{
plan: &plan.SynchronousResponsePlan{
Response: &resolve.GraphQLResponse{
RawFetches: []*resolve.FetchItem{
{Fetch: &resolve.SingleFetch{FetchDependencies: resolve.FetchDependencies{FetchID: 1}}},
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestProcess_ExtractFetches(t *testing.T) {
},
{
name: "2",
pre: &plan.SynchronousResponsePlan{
plan: &plan.SynchronousResponsePlan{
Response: &resolve.GraphQLResponse{
RawFetches: []*resolve.FetchItem{
{
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestProcess_ExtractFetches(t *testing.T) {
},
{
name: "3",
pre: &plan.SynchronousResponsePlan{
plan: &plan.SynchronousResponsePlan{
Response: &resolve.GraphQLResponse{
RawFetches: []*resolve.FetchItem{
{
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestProcess_ExtractFetches(t *testing.T) {
},
{
name: "4",
pre: &plan.SynchronousResponsePlan{
plan: &plan.SynchronousResponsePlan{
Response: &resolve.GraphQLResponse{
RawFetches: []*resolve.FetchItem{
{
Expand Down Expand Up @@ -368,9 +368,9 @@ func TestProcess_ExtractFetches(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
actual := processor.Process(c.pre)
processor.Process(c.plan)

if !assert.Equal(t, c.expected, actual) {
if !assert.Equal(t, c.expected, c.plan) {
formatterConfig := map[reflect.Type]interface{}{
reflect.TypeOf([]byte{}): func(b []byte) string { return fmt.Sprintf(`"%s"`, string(b)) },
}
Expand All @@ -381,7 +381,7 @@ func TestProcess_ExtractFetches(t *testing.T) {
Formatter: formatterConfig,
}

if diff := prettyCfg.Compare(c.expected, actual); diff != "" {
if diff := prettyCfg.Compare(c.expected, c.plan); diff != "" {
t.Errorf("Plan does not match(-want +got)\n%s", diff)
}
}
Expand All @@ -392,14 +392,14 @@ func TestProcess_ExtractFetches(t *testing.T) {
func TestProcess_ExtractServiceNames(t *testing.T) {
type TestCase struct {
name string
pre plan.Plan
plan plan.Plan
expected plan.Plan
}

cases := []TestCase{
{
name: "Collect all service names",
pre: &plan.SynchronousResponsePlan{
plan: &plan.SynchronousResponsePlan{
Response: &resolve.GraphQLResponse{
RawFetches: []*resolve.FetchItem{
{
Expand Down Expand Up @@ -533,7 +533,7 @@ func TestProcess_ExtractServiceNames(t *testing.T) {
},
{
name: "Deduplicate the same service names",
pre: &plan.SynchronousResponsePlan{
plan: &plan.SynchronousResponsePlan{
Response: &resolve.GraphQLResponse{
RawFetches: []*resolve.FetchItem{
{
Expand Down Expand Up @@ -679,9 +679,9 @@ func TestProcess_ExtractServiceNames(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
actual := processor.Process(c.pre)
processor.Process(c.plan)

if !assert.Equal(t, c.expected, actual) {
if !assert.Equal(t, c.expected, c.plan) {
formatterConfig := map[reflect.Type]interface{}{
reflect.TypeOf([]byte{}): func(b []byte) string { return fmt.Sprintf(`"%s"`, string(b)) },
}
Expand All @@ -692,7 +692,7 @@ func TestProcess_ExtractServiceNames(t *testing.T) {
Formatter: formatterConfig,
}

if diff := prettyCfg.Compare(c.expected, actual); diff != "" {
if diff := prettyCfg.Compare(c.expected, c.plan); diff != "" {
t.Errorf("Plan does not match(-want +got)\n%s", diff)
}
}
Expand Down