@@ -5,66 +5,128 @@ import (
5
5
"fmt"
6
6
7
7
"github.com/graphql-go/graphql/gqlerrors"
8
- "github.com/graphql-go/graphql/language/ast"
8
+ "github.com/graphql-go/graphql/language/parser"
9
+ "github.com/graphql-go/graphql/language/source"
9
10
)
10
11
11
12
// SubscribeParams parameters for subscribing
12
13
type SubscribeParams struct {
13
- Schema Schema
14
- Document * ast. Document
15
- RootValue interface {}
16
- ContextValue context.Context
14
+ Schema Schema
15
+ RequestString string
16
+ RootValue interface {}
17
+ // ContextValue context.Context
17
18
VariableValues map [string ]interface {}
18
19
OperationName string
19
20
FieldResolver FieldResolveFn
20
21
FieldSubscriber FieldResolveFn
21
22
}
22
23
23
- // Subscribe performs a subscribe operation
24
- func Subscribe (ctx context.Context , p SubscribeParams ) chan * Result {
25
- resultChannel := make (chan * Result )
24
+ // Subscribe performs a subscribe operation on the given query and schema
25
+ // To finish a subscription you can simply close the channel from inside the `Subscribe` function
26
+ // currently does not support extensions hooks
27
+ func Subscribe (p Params ) chan * Result {
28
+
29
+ source := source .NewSource (& source.Source {
30
+ Body : []byte (p .RequestString ),
31
+ Name : "GraphQL request" ,
32
+ })
33
+
34
+ // TODO run extensions hooks
35
+
36
+ // parse the source
37
+ AST , err := parser .Parse (parser.ParseParams {Source : source })
38
+ if err != nil {
39
+
40
+ // merge the errors from extensions and the original error from parser
41
+ return sendOneResultAndClose (& Result {
42
+ Errors : gqlerrors .FormatErrors (err ),
43
+ })
44
+ }
45
+
46
+ // validate document
47
+ validationResult := ValidateDocument (& p .Schema , AST , nil )
48
+
49
+ if ! validationResult .IsValid {
50
+ // run validation finish functions for extensions
51
+ return sendOneResultAndClose (& Result {
52
+ Errors : validationResult .Errors ,
53
+ })
54
+
55
+ }
56
+ return ExecuteSubscription (ExecuteParams {
57
+ Schema : p .Schema ,
58
+ Root : p .RootObject ,
59
+ AST : AST ,
60
+ OperationName : p .OperationName ,
61
+ Args : p .VariableValues ,
62
+ Context : p .Context ,
63
+ })
64
+ }
65
+
66
+ func sendOneResultAndClose (res * Result ) chan * Result {
67
+ resultChannel := make (chan * Result , 1 )
68
+ resultChannel <- res
69
+ close (resultChannel )
70
+ return resultChannel
71
+ }
72
+
73
+ // ExecuteSubscription is similar to graphql.Execute but returns a channel instead of a Result
74
+ // currently does not support extensions
75
+ func ExecuteSubscription (p ExecuteParams ) chan * Result {
76
+
77
+ if p .Context == nil {
78
+ p .Context = context .Background ()
79
+ }
26
80
27
81
var mapSourceToResponse = func (payload interface {}) * Result {
28
82
return Execute (ExecuteParams {
29
83
Schema : p .Schema ,
30
84
Root : payload ,
31
- AST : p .Document ,
85
+ AST : p .AST ,
32
86
OperationName : p .OperationName ,
33
- Args : p .VariableValues ,
34
- Context : p .ContextValue ,
87
+ Args : p .Args ,
88
+ Context : p .Context ,
35
89
})
36
90
}
37
-
91
+ var resultChannel = make ( chan * Result )
38
92
go func () {
39
- result := & Result {}
93
+ defer close ( resultChannel )
40
94
defer func () {
41
95
if err := recover (); err != nil {
42
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
43
- resultChannel <- result
96
+ e , ok := err .(error )
97
+ if ! ok {
98
+ return
99
+ }
100
+ resultChannel <- & Result {
101
+ Errors : gqlerrors .FormatErrors (e ),
102
+ }
44
103
}
45
- close ( resultChannel )
104
+ return
46
105
}()
47
106
48
107
exeContext , err := buildExecutionContext (buildExecutionCtxParams {
49
108
Schema : p .Schema ,
50
- Root : p .RootValue ,
51
- AST : p .Document ,
109
+ Root : p .Root ,
110
+ AST : p .AST ,
52
111
OperationName : p .OperationName ,
53
- Args : p .VariableValues ,
54
- Result : result ,
55
- Context : p .ContextValue ,
112
+ Args : p .Args ,
113
+ Context : p .Context ,
56
114
})
57
115
58
116
if err != nil {
59
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
60
- resultChannel <- result
117
+ resultChannel <- & Result {
118
+ Errors : gqlerrors .FormatErrors (err ),
119
+ }
120
+
61
121
return
62
122
}
63
123
64
124
operationType , err := getOperationRootType (p .Schema , exeContext .Operation )
65
125
if err != nil {
66
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
67
- resultChannel <- result
126
+ resultChannel <- & Result {
127
+ Errors : gqlerrors .FormatErrors (err ),
128
+ }
129
+
68
130
return
69
131
}
70
132
@@ -85,18 +147,20 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
85
147
fieldDef := getFieldDef (p .Schema , operationType , fieldName )
86
148
87
149
if fieldDef == nil {
88
- err := fmt .Errorf ("the subscription field %q is not defined" , fieldName )
89
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
90
- resultChannel <- result
150
+ resultChannel <- & Result {
151
+ Errors : gqlerrors .FormatErrors (fmt .Errorf ("the subscription field %q is not defined" , fieldName )),
152
+ }
153
+
91
154
return
92
155
}
93
156
94
- resolveFn := p .FieldSubscriber
157
+ resolveFn := fieldDef .Subscribe
158
+
95
159
if resolveFn == nil {
96
- resolveFn = DefaultResolveFn
97
- }
98
- if fieldDef . Subscribe != nil {
99
- resolveFn = fieldDef . Subscribe
160
+ resultChannel <- & Result {
161
+ Errors : gqlerrors . FormatErrors ( fmt . Errorf ( "the subscription function %q is not defined" , fieldName )),
162
+ }
163
+ return
100
164
}
101
165
fieldPath := & ResponsePath {
102
166
Key : responseName ,
@@ -117,21 +181,24 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
117
181
}
118
182
119
183
fieldResult , err := resolveFn (ResolveParams {
120
- Source : p .RootValue ,
184
+ Source : p .Root ,
121
185
Args : args ,
122
186
Info : info ,
123
- Context : p .ContextValue ,
187
+ Context : p .Context ,
124
188
})
125
189
if err != nil {
126
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
127
- resultChannel <- result
190
+ resultChannel <- & Result {
191
+ Errors : gqlerrors .FormatErrors (err ),
192
+ }
193
+
128
194
return
129
195
}
130
196
131
197
if fieldResult == nil {
132
- err := fmt .Errorf ("no field result" )
133
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
134
- resultChannel <- result
198
+ resultChannel <- & Result {
199
+ Errors : gqlerrors .FormatErrors (fmt .Errorf ("no field result" )),
200
+ }
201
+
135
202
return
136
203
}
137
204
@@ -140,10 +207,13 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
140
207
sub := fieldResult .(chan interface {})
141
208
for {
142
209
select {
143
- case <- ctx .Done ():
210
+ case <- p . Context .Done ():
144
211
return
145
212
146
- case res := <- sub :
213
+ case res , more := <- sub :
214
+ if ! more {
215
+ return
216
+ }
147
217
resultChannel <- mapSourceToResponse (res )
148
218
}
149
219
}
0 commit comments