@@ -5,66 +5,143 @@ import (
5
5
"fmt"
6
6
7
7
"github.com/graphql-go/graphql/gqlerrors"
8
- "github.com/graphql-go/graphql/language/ast"
8
+ "github.com/graphql-go/graphql/language/parser"
9
+ "github.com/graphql-go/graphql/language/source"
9
10
)
10
11
11
12
// SubscribeParams parameters for subscribing
12
13
type SubscribeParams struct {
13
- Schema Schema
14
- Document * ast. Document
15
- RootValue interface {}
16
- ContextValue context.Context
14
+ Schema Schema
15
+ RequestString string
16
+ RootValue interface {}
17
+ // ContextValue context.Context
17
18
VariableValues map [string ]interface {}
18
19
OperationName string
19
20
FieldResolver FieldResolveFn
20
21
FieldSubscriber FieldResolveFn
21
22
}
22
23
24
+ // SubscriptableSchema implements `graphql-transport-ws` `GraphQLService` interface: https://github.com/graph-gophers/graphql-transport-ws/blob/40c0484322990a129cac2f2d2763c3315230280c/graphqlws/internal/connection/connection.go#L53
25
+ type SubscriptableSchema struct {
26
+ Schema Schema
27
+ RootObject map [string ]interface {}
28
+ }
29
+
30
+ func (self * SubscriptableSchema ) Subscribe (ctx context.Context , queryString string , operationName string , variables map [string ]interface {}) (<- chan * Result , error ) {
31
+ c := Subscribe (Params {
32
+ Schema : self .Schema ,
33
+ Context : ctx ,
34
+ OperationName : operationName ,
35
+ RequestString : queryString ,
36
+ RootObject : self .RootObject ,
37
+ VariableValues : variables ,
38
+ })
39
+ return c , nil
40
+ }
41
+
23
42
// Subscribe performs a subscribe operation
24
- func Subscribe (ctx context.Context , p SubscribeParams ) chan * Result {
43
+ func Subscribe (p Params ) chan * Result {
44
+
45
+ source := source .NewSource (& source.Source {
46
+ Body : []byte (p .RequestString ),
47
+ Name : "GraphQL request" ,
48
+ })
49
+
50
+ // TODO run extensions hooks
51
+
52
+ // parse the source
53
+ AST , err := parser .Parse (parser.ParseParams {Source : source })
54
+ if err != nil {
55
+
56
+ // merge the errors from extensions and the original error from parser
57
+ return sendOneResultandClose (& Result {
58
+ Errors : gqlerrors .FormatErrors (err ),
59
+ })
60
+ }
61
+
62
+ // validate document
63
+ validationResult := ValidateDocument (& p .Schema , AST , nil )
64
+
65
+ if ! validationResult .IsValid {
66
+ // run validation finish functions for extensions
67
+ return sendOneResultandClose (& Result {
68
+ Errors : validationResult .Errors ,
69
+ })
70
+
71
+ }
72
+ return ExecuteSubscription (ExecuteParams {
73
+ Schema : p .Schema ,
74
+ Root : p .RootObject ,
75
+ AST : AST ,
76
+ OperationName : p .OperationName ,
77
+ Args : p .VariableValues ,
78
+ Context : p .Context ,
79
+ })
80
+ }
81
+
82
+ func sendOneResultandClose (res * Result ) chan * Result {
25
83
resultChannel := make (chan * Result )
84
+ resultChannel <- res
85
+ close (resultChannel )
86
+ return resultChannel
87
+ }
88
+
89
+ func ExecuteSubscription (p ExecuteParams ) chan * Result {
90
+
91
+ if p .Context == nil {
92
+ p .Context = context .Background ()
93
+ }
94
+
95
+ // TODO run executionDidStart functions from extensions
26
96
27
97
var mapSourceToResponse = func (payload interface {}) * Result {
28
98
return Execute (ExecuteParams {
29
99
Schema : p .Schema ,
30
100
Root : payload ,
31
- AST : p .Document ,
101
+ AST : p .AST ,
32
102
OperationName : p .OperationName ,
33
- Args : p .VariableValues ,
34
- Context : p .ContextValue ,
103
+ Args : p .Args ,
104
+ Context : p .Context ,
35
105
})
36
106
}
37
-
107
+ var resultChannel = make ( chan * Result )
38
108
go func () {
39
- result := & Result {}
40
109
defer func () {
41
110
if err := recover (); err != nil {
42
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
43
- resultChannel <- result
111
+ e , ok := err .(error )
112
+ if ! ok {
113
+ return
114
+ }
115
+ sendOneResultandClose (& Result {
116
+ Errors : gqlerrors .FormatErrors (e ),
117
+ })
44
118
}
45
- close (resultChannel )
119
+ // close(resultChannel)
120
+ return
46
121
}()
47
122
48
123
exeContext , err := buildExecutionContext (buildExecutionCtxParams {
49
124
Schema : p .Schema ,
50
- Root : p .RootValue ,
51
- AST : p .Document ,
125
+ Root : p .Root ,
126
+ AST : p .AST ,
52
127
OperationName : p .OperationName ,
53
- Args : p .VariableValues ,
54
- Result : result ,
55
- Context : p .ContextValue ,
128
+ Args : p .Args ,
129
+ Result : & Result {}, // TODO what is this?
130
+ Context : p .Context ,
56
131
})
57
132
58
133
if err != nil {
59
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
60
- resultChannel <- result
134
+ sendOneResultandClose (& Result {
135
+ Errors : gqlerrors .FormatErrors (err ),
136
+ })
61
137
return
62
138
}
63
139
64
140
operationType , err := getOperationRootType (p .Schema , exeContext .Operation )
65
141
if err != nil {
66
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
67
- resultChannel <- result
142
+ sendOneResultandClose (& Result {
143
+ Errors : gqlerrors .FormatErrors (err ),
144
+ })
68
145
return
69
146
}
70
147
@@ -85,18 +162,19 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
85
162
fieldDef := getFieldDef (p .Schema , operationType , fieldName )
86
163
87
164
if fieldDef == nil {
88
- err := fmt . Errorf ( "the subscription field %q is not defined" , fieldName )
89
- result . Errors = append ( result . Errors , gqlerrors .FormatError ( err .( error )))
90
- resultChannel <- result
165
+ sendOneResultandClose ( & Result {
166
+ Errors : gqlerrors .FormatErrors ( fmt . Errorf ( "the subscription field %q is not defined" , fieldName )),
167
+ })
91
168
return
92
169
}
93
170
94
- resolveFn := p .FieldSubscriber
171
+ resolveFn := fieldDef .Subscribe
172
+
95
173
if resolveFn == nil {
96
- resolveFn = DefaultResolveFn
97
- }
98
- if fieldDef . Subscribe != nil {
99
- resolveFn = fieldDef . Subscribe
174
+ sendOneResultandClose ( & Result {
175
+ Errors : gqlerrors . FormatErrors ( fmt . Errorf ( "the subscription function %q is not defined" , fieldName )),
176
+ })
177
+ return
100
178
}
101
179
fieldPath := & ResponsePath {
102
180
Key : responseName ,
@@ -117,38 +195,47 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
117
195
}
118
196
119
197
fieldResult , err := resolveFn (ResolveParams {
120
- Source : p .RootValue ,
198
+ Source : p .Root ,
121
199
Args : args ,
122
200
Info : info ,
123
- Context : p .ContextValue ,
201
+ Context : p .Context ,
124
202
})
125
203
if err != nil {
126
- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
127
- resultChannel <- result
204
+ sendOneResultandClose (& Result {
205
+ Errors : gqlerrors .FormatErrors (err ),
206
+ })
128
207
return
129
208
}
130
209
131
210
if fieldResult == nil {
132
- err := fmt . Errorf ( "no field result" )
133
- result . Errors = append ( result . Errors , gqlerrors .FormatError ( err .( error )))
134
- resultChannel <- result
211
+ sendOneResultandClose ( & Result {
212
+ Errors : gqlerrors .FormatErrors ( fmt . Errorf ( "no field result" )),
213
+ })
135
214
return
136
215
}
137
216
138
217
switch fieldResult .(type ) {
139
218
case chan interface {}:
140
219
sub := fieldResult .(chan interface {})
220
+ defer close (resultChannel )
141
221
for {
142
222
select {
143
- case <- ctx .Done ():
223
+ case <- p .Context .Done ():
224
+ println ("context cancelled" )
225
+ // TODO send the context error to the resultchannel
144
226
return
145
227
146
- case res := <- sub :
228
+ case res , more := <- sub :
229
+ if ! more {
230
+ return
231
+ }
147
232
resultChannel <- mapSourceToResponse (res )
148
233
}
149
234
}
150
235
default :
236
+ fmt .Println (fieldResult )
151
237
resultChannel <- mapSourceToResponse (fieldResult )
238
+ close (resultChannel )
152
239
return
153
240
}
154
241
}()
0 commit comments