@@ -100,12 +100,6 @@ public class Server {
100
100
messenger. error ( error. message, code: error. code)
101
101
}
102
102
}
103
-
104
- // Clean up any uncompleted subscriptions
105
- // TODO: Re-enable this
106
- // messenger.onClose {
107
- // _ = self.context?.cleanupSubscription()
108
- // }
109
103
}
110
104
111
105
/// Define the callback run during `connection_init` resolution that allows authorization using the `payload`.
@@ -143,9 +137,7 @@ public class Server {
143
137
return
144
138
}
145
139
initialized = true
146
- messenger. send (
147
- ConnectionAckResponse ( ) . toJSON ( self . encoder)
148
- )
140
+ self . sendConnectionAck ( )
149
141
// TODO: Should we send the `ka` message?
150
142
}
151
143
@@ -164,7 +156,7 @@ public class Server {
164
156
isStreaming = try graphQLRequest. isSubscription ( )
165
157
}
166
158
catch {
167
- messenger . send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder ) )
159
+ self . sendError ( error, id: id)
168
160
return
169
161
}
170
162
@@ -181,22 +173,22 @@ public class Server {
181
173
let observable = stream. observable
182
174
observable. subscribe (
183
175
onNext: { [ weak self] resultFuture in
184
- guard let self = self , let messenger = self . messenger else { return }
176
+ guard let self = self else { return }
185
177
resultFuture. whenSuccess { result in
186
- messenger . send ( DataResponse ( result, id: id) . toJSON ( self . encoder ) )
178
+ self . sendData ( result, id: id)
187
179
}
188
180
resultFuture. whenFailure { error in
189
- messenger . send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder ) )
181
+ self . sendError ( error, id: id)
190
182
}
191
183
} ,
192
184
onError: { [ weak self] error in
193
- guard let self = self , let messenger = self . messenger else { return }
194
- messenger . send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder ) )
185
+ guard let self = self else { return }
186
+ self . sendError ( error, id: id)
195
187
} ,
196
188
onCompleted: { [ weak self] in
197
- guard let self = self , let messenger = self . messenger else { return }
198
- messenger . send ( CompleteResponse ( id: id) . toJSON ( self . encoder ) )
199
- _ = messenger. close ( )
189
+ guard let self = self else { return }
190
+ self . sendComplete ( id: id)
191
+ self . messenger? . close ( )
200
192
}
201
193
) . disposed ( by: self . disposeBag)
202
194
}
@@ -208,12 +200,14 @@ public class Server {
208
200
else {
209
201
let executeFuture = onExecute ( graphQLRequest)
210
202
executeFuture. whenSuccess { result in
211
- messenger. send ( DataResponse ( result, id: id) . toJSON ( self . encoder) )
212
- messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
203
+ self . sendData ( result, id: id)
204
+ self . sendComplete ( id: id)
205
+ messenger. close ( )
213
206
}
214
207
executeFuture. whenFailure { error in
215
- messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
216
- messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
208
+ self . sendError ( error, id: id)
209
+ self . sendComplete ( id: id)
210
+ messenger. close ( )
217
211
}
218
212
}
219
213
}
@@ -231,4 +225,70 @@ public class Server {
231
225
onExit ( )
232
226
_ = messenger. close ( )
233
227
}
228
+
229
+ /// Send a `connection_ack` response through the messenger
230
+ private func sendConnectionAck( _ payload: [ String : Map ] ? = nil ) {
231
+ guard let messenger = messenger else { return }
232
+ messenger. send (
233
+ ConnectionAckResponse ( payload) . toJSON ( encoder)
234
+ )
235
+ }
236
+
237
+ /// Send a `connection_error` response through the messenger
238
+ private func sendConnectionError( _ payload: [ String : Map ] ? = nil ) {
239
+ guard let messenger = messenger else { return }
240
+ messenger. send (
241
+ ConnectionErrorResponse ( payload) . toJSON ( encoder)
242
+ )
243
+ }
244
+
245
+ /// Send a `ka` response through the messenger
246
+ private func sendConnectionKeepAlive( _ payload: [ String : Map ] ? = nil ) {
247
+ guard let messenger = messenger else { return }
248
+ messenger. send (
249
+ ConnectionKeepAliveResponse ( payload) . toJSON ( encoder)
250
+ )
251
+ }
252
+
253
+ /// Send a `data` response through the messenger
254
+ private func sendData( _ payload: GraphQLResult ? = nil , id: String ) {
255
+ guard let messenger = messenger else { return }
256
+ messenger. send (
257
+ DataResponse (
258
+ payload,
259
+ id: id
260
+ ) . toJSON ( encoder)
261
+ )
262
+ }
263
+
264
+ /// Send a `complete` response through the messenger
265
+ private func sendComplete( id: String ) {
266
+ guard let messenger = messenger else { return }
267
+ messenger. send (
268
+ CompleteResponse (
269
+ id: id
270
+ ) . toJSON ( encoder)
271
+ )
272
+ }
273
+
274
+ /// Send an `error` response through the messenger
275
+ private func sendError( _ errors: [ Error ] , id: String ) {
276
+ guard let messenger = messenger else { return }
277
+ messenger. send (
278
+ ErrorResponse (
279
+ errors,
280
+ id: id
281
+ ) . toJSON ( encoder)
282
+ )
283
+ }
284
+
285
+ /// Send an `error` response through the messenger
286
+ private func sendError( _ error: Error , id: String ) {
287
+ self . sendError ( [ error] , id: id)
288
+ }
289
+
290
+ /// Send an `error` response through the messenger
291
+ private func sendError( _ errorMessage: String , id: String ) {
292
+ self . sendError ( GraphQLError ( message: errorMessage) , id: id)
293
+ }
234
294
}
0 commit comments