Skip to content

Commit fd2a748

Browse files
authored
Merge pull request #1 from remorses/subscription-execution
Subscription execution
2 parents a55996a + d0a91ba commit fd2a748

File tree

3 files changed

+519
-157
lines changed

3 files changed

+519
-157
lines changed

subscription.go

Lines changed: 112 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,66 +5,128 @@ import (
55
"fmt"
66

77
"github.com/graphql-go/graphql/gqlerrors"
8-
"github.com/graphql-go/graphql/language/ast"
8+
"github.com/graphql-go/graphql/language/parser"
9+
"github.com/graphql-go/graphql/language/source"
910
)
1011

1112
// SubscribeParams parameters for subscribing
1213
type SubscribeParams struct {
13-
Schema Schema
14-
Document *ast.Document
15-
RootValue interface{}
16-
ContextValue context.Context
14+
Schema Schema
15+
RequestString string
16+
RootValue interface{}
17+
// ContextValue context.Context
1718
VariableValues map[string]interface{}
1819
OperationName string
1920
FieldResolver FieldResolveFn
2021
FieldSubscriber FieldResolveFn
2122
}
2223

23-
// Subscribe performs a subscribe operation
24-
func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
25-
resultChannel := make(chan *Result)
24+
// Subscribe performs a subscribe operation on the given query and schema
25+
// To finish a subscription you can simply close the channel from inside the `Subscribe` function
26+
// currently does not support extensions hooks
27+
func Subscribe(p Params) chan *Result {
28+
29+
source := source.NewSource(&source.Source{
30+
Body: []byte(p.RequestString),
31+
Name: "GraphQL request",
32+
})
33+
34+
// TODO run extensions hooks
35+
36+
// parse the source
37+
AST, err := parser.Parse(parser.ParseParams{Source: source})
38+
if err != nil {
39+
40+
// merge the errors from extensions and the original error from parser
41+
return sendOneResultAndClose(&Result{
42+
Errors: gqlerrors.FormatErrors(err),
43+
})
44+
}
45+
46+
// validate document
47+
validationResult := ValidateDocument(&p.Schema, AST, nil)
48+
49+
if !validationResult.IsValid {
50+
// run validation finish functions for extensions
51+
return sendOneResultAndClose(&Result{
52+
Errors: validationResult.Errors,
53+
})
54+
55+
}
56+
return ExecuteSubscription(ExecuteParams{
57+
Schema: p.Schema,
58+
Root: p.RootObject,
59+
AST: AST,
60+
OperationName: p.OperationName,
61+
Args: p.VariableValues,
62+
Context: p.Context,
63+
})
64+
}
65+
66+
func sendOneResultAndClose(res *Result) chan *Result {
67+
resultChannel := make(chan *Result, 1)
68+
resultChannel <- res
69+
close(resultChannel)
70+
return resultChannel
71+
}
72+
73+
// ExecuteSubscription is similar to graphql.Execute but returns a channel instead of a Result
74+
// currently does not support extensions
75+
func ExecuteSubscription(p ExecuteParams) chan *Result {
76+
77+
if p.Context == nil {
78+
p.Context = context.Background()
79+
}
2680

2781
var mapSourceToResponse = func(payload interface{}) *Result {
2882
return Execute(ExecuteParams{
2983
Schema: p.Schema,
3084
Root: payload,
31-
AST: p.Document,
85+
AST: p.AST,
3286
OperationName: p.OperationName,
33-
Args: p.VariableValues,
34-
Context: p.ContextValue,
87+
Args: p.Args,
88+
Context: p.Context,
3589
})
3690
}
37-
91+
var resultChannel = make(chan *Result)
3892
go func() {
39-
result := &Result{}
93+
defer close(resultChannel)
4094
defer func() {
4195
if err := recover(); err != nil {
42-
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
43-
resultChannel <- result
96+
e, ok := err.(error)
97+
if !ok {
98+
return
99+
}
100+
resultChannel <- &Result{
101+
Errors: gqlerrors.FormatErrors(e),
102+
}
44103
}
45-
close(resultChannel)
104+
return
46105
}()
47106

48107
exeContext, err := buildExecutionContext(buildExecutionCtxParams{
49108
Schema: p.Schema,
50-
Root: p.RootValue,
51-
AST: p.Document,
109+
Root: p.Root,
110+
AST: p.AST,
52111
OperationName: p.OperationName,
53-
Args: p.VariableValues,
54-
Result: result,
55-
Context: p.ContextValue,
112+
Args: p.Args,
113+
Context: p.Context,
56114
})
57115

58116
if err != nil {
59-
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
60-
resultChannel <- result
117+
resultChannel <- &Result{
118+
Errors: gqlerrors.FormatErrors(err),
119+
}
120+
61121
return
62122
}
63123

64124
operationType, err := getOperationRootType(p.Schema, exeContext.Operation)
65125
if err != nil {
66-
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
67-
resultChannel <- result
126+
resultChannel <- &Result{
127+
Errors: gqlerrors.FormatErrors(err),
128+
}
129+
68130
return
69131
}
70132

@@ -85,18 +147,20 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
85147
fieldDef := getFieldDef(p.Schema, operationType, fieldName)
86148

87149
if fieldDef == nil {
88-
err := fmt.Errorf("the subscription field %q is not defined", fieldName)
89-
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
90-
resultChannel <- result
150+
resultChannel <- &Result{
151+
Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription field %q is not defined", fieldName)),
152+
}
153+
91154
return
92155
}
93156

94-
resolveFn := p.FieldSubscriber
157+
resolveFn := fieldDef.Subscribe
158+
95159
if resolveFn == nil {
96-
resolveFn = DefaultResolveFn
97-
}
98-
if fieldDef.Subscribe != nil {
99-
resolveFn = fieldDef.Subscribe
160+
resultChannel <- &Result{
161+
Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription function %q is not defined", fieldName)),
162+
}
163+
return
100164
}
101165
fieldPath := &ResponsePath{
102166
Key: responseName,
@@ -117,21 +181,24 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
117181
}
118182

119183
fieldResult, err := resolveFn(ResolveParams{
120-
Source: p.RootValue,
184+
Source: p.Root,
121185
Args: args,
122186
Info: info,
123-
Context: p.ContextValue,
187+
Context: p.Context,
124188
})
125189
if err != nil {
126-
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
127-
resultChannel <- result
190+
resultChannel <- &Result{
191+
Errors: gqlerrors.FormatErrors(err),
192+
}
193+
128194
return
129195
}
130196

131197
if fieldResult == nil {
132-
err := fmt.Errorf("no field result")
133-
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
134-
resultChannel <- result
198+
resultChannel <- &Result{
199+
Errors: gqlerrors.FormatErrors(fmt.Errorf("no field result")),
200+
}
201+
135202
return
136203
}
137204

@@ -140,10 +207,13 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
140207
sub := fieldResult.(chan interface{})
141208
for {
142209
select {
143-
case <-ctx.Done():
210+
case <-p.Context.Done():
144211
return
145212

146-
case res := <-sub:
213+
case res, more := <-sub:
214+
if !more {
215+
return
216+
}
147217
resultChannel <- mapSourceToResponse(res)
148218
}
149219
}

0 commit comments

Comments
 (0)