@@ -7,8 +7,10 @@ import NIO
7
7
import RxSwift
8
8
9
9
/// Adds server-side [graphql-ws subprotocol](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md)
10
- /// support, namely parsing and adding callbacks for each type of client request .
10
+ /// support. This handles the majority of query processing according to the procol definition, allowing a few callbacks for customization .
11
11
class Server {
12
+ let messenger : Messenger
13
+
12
14
let auth : ( ConnectionInitRequest ) throws -> Void
13
15
let onExecute : ( GraphQLRequest ) -> EventLoopFuture < GraphQLResult >
14
16
let onSubscribe : ( GraphQLRequest ) -> EventLoopFuture < SubscriptionResult >
@@ -21,24 +23,33 @@ class Server {
21
23
let decoder = JSONDecoder ( )
22
24
let encoder = GraphQLJSONEncoder ( )
23
25
26
+ /// Create a new server
27
+ ///
28
+ /// - Parameters:
29
+ /// - messenger: The messenger to bind the server to.
30
+ /// - auth: Callback run during `connection_init` resolution that allows authorization using the `payload`. Throw to indicate that authorization has failed.
31
+ /// - onExecute: Callback run during `start` resolution for non-streaming queries. Typically this is `API.execute`.
32
+ /// - onSubscribe: Callback run during `start` resolution for streaming queries. Typically this is `API.subscribe`.
33
+ /// - onExit: Callback run when the communication is shut down, either by the client or server
34
+ /// - onMessage: callback run on receipt of any message
24
35
init (
36
+ messenger: Messenger ,
25
37
auth: @escaping ( ConnectionInitRequest ) throws -> Void ,
26
38
onExecute: @escaping ( GraphQLRequest ) -> EventLoopFuture < GraphQLResult > ,
27
39
onSubscribe: @escaping ( GraphQLRequest ) -> EventLoopFuture < SubscriptionResult > ,
28
40
onExit: @escaping ( ) -> Void ,
29
41
onMessage: @escaping ( String ) -> Void = { _ in ( ) }
30
42
) {
43
+ self . messenger = messenger
31
44
self . auth = auth
32
45
self . onExecute = onExecute
33
46
self . onSubscribe = onSubscribe
34
47
self . onExit = onExit
35
48
self . onMessage = onMessage
36
- }
37
-
38
- /// Attaches the responder to the provided Messenger in order to recieve and transmit messages
39
- /// - Parameter messenger: The Messenger to use for communication
40
- func attach( to messenger: Messenger ) {
41
- messenger. onRecieve { message in
49
+
50
+ self . messenger. onRecieve { [ weak self] message in
51
+ guard let self = self else { return }
52
+
42
53
self . onMessage ( message)
43
54
44
55
// Detect and ignore error responses.
@@ -49,7 +60,7 @@ class Server {
49
60
50
61
guard let json = message. data ( using: . utf8) else {
51
62
let error = GraphQLWSError . invalidEncoding ( )
52
- messenger. error ( error. message, code: error. code)
63
+ self . messenger. error ( error. message, code: error. code)
53
64
return
54
65
}
55
66
@@ -59,53 +70,53 @@ class Server {
59
70
}
60
71
catch {
61
72
let error = GraphQLWSError . noType ( )
62
- messenger. error ( error. message, code: error. code)
73
+ self . messenger. error ( error. message, code: error. code)
63
74
return
64
75
}
65
76
66
77
switch request. type {
67
78
case . GQL_CONNECTION_INIT:
68
79
guard let connectionInitRequest = try ? self . decoder. decode ( ConnectionInitRequest . self, from: json) else {
69
80
let error = GraphQLWSError . invalidRequestFormat ( messageType: . GQL_CONNECTION_INIT)
70
- messenger. error ( error. message, code: error. code)
81
+ self . messenger. error ( error. message, code: error. code)
71
82
return
72
83
}
73
- self . onConnectionInit ( connectionInitRequest, messenger )
84
+ self . onConnectionInit ( connectionInitRequest)
74
85
case . GQL_START:
75
86
guard let startRequest = try ? self . decoder. decode ( StartRequest . self, from: json) else {
76
87
let error = GraphQLWSError . invalidRequestFormat ( messageType: . GQL_START)
77
- messenger. error ( error. message, code: error. code)
88
+ self . messenger. error ( error. message, code: error. code)
78
89
return
79
90
}
80
- self . onStart ( startRequest, messenger )
91
+ self . onStart ( startRequest)
81
92
case . GQL_STOP:
82
93
guard let stopRequest = try ? self . decoder. decode ( StopRequest . self, from: json) else {
83
94
let error = GraphQLWSError . invalidRequestFormat ( messageType: . GQL_STOP)
84
- messenger. error ( error. message, code: error. code)
95
+ self . messenger. error ( error. message, code: error. code)
85
96
return
86
97
}
87
- self . onStop ( stopRequest, messenger)
98
+ self . onStop ( stopRequest, self . messenger)
88
99
case . GQL_CONNECTION_TERMINATE:
89
100
guard let connectionTerminateRequest = try ? self . decoder. decode ( ConnectionTerminateRequest . self, from: json) else {
90
101
let error = GraphQLWSError . invalidRequestFormat ( messageType: . GQL_CONNECTION_TERMINATE)
91
- messenger. error ( error. message, code: error. code)
102
+ self . messenger. error ( error. message, code: error. code)
92
103
return
93
104
}
94
- self . onConnectionTerminate ( connectionTerminateRequest, messenger )
105
+ self . onConnectionTerminate ( connectionTerminateRequest)
95
106
case . unknown:
96
107
let error = GraphQLWSError . invalidType ( )
97
- messenger. error ( error. message, code: error. code)
108
+ self . messenger. error ( error. message, code: error. code)
98
109
}
99
110
}
100
111
101
112
// Clean up any uncompleted subscriptions
102
113
// TODO: Re-enable this
103
- // messenger.onClose {
104
- // _ = self.context?.cleanupSubscription()
105
- // }
114
+ // messenger.onClose {
115
+ // _ = self.context?.cleanupSubscription()
116
+ // }
106
117
}
107
118
108
- private func onConnectionInit( _ connectionInitRequest: ConnectionInitRequest , _ messenger : Messenger ) {
119
+ private func onConnectionInit( _ connectionInitRequest: ConnectionInitRequest ) {
109
120
guard !initialized else {
110
121
let error = GraphQLWSError . tooManyInitializations ( )
111
122
messenger. error ( error. message, code: error. code)
@@ -127,7 +138,7 @@ class Server {
127
138
// TODO: Should we send the `ka` message?
128
139
}
129
140
130
- private func onStart( _ startRequest: StartRequest , _ messenger : Messenger ) {
141
+ private func onStart( _ startRequest: StartRequest ) {
131
142
guard initialized else {
132
143
let error = GraphQLWSError . notInitialized ( )
133
144
messenger. error ( error. message, code: error. code)
@@ -153,43 +164,43 @@ class Server {
153
164
guard let streamOpt = result. stream else {
154
165
// API issue - subscribe resolver isn't stream
155
166
let error = GraphQLWSError . internalAPIStreamIssue ( )
156
- messenger. error ( error. message, code: error. code)
167
+ self . messenger. error ( error. message, code: error. code)
157
168
return
158
169
}
159
170
let stream = streamOpt as! ObservableSubscriptionEventStream
160
171
let observable = stream. observable
161
172
observable. subscribe (
162
173
onNext: { resultFuture in
163
174
resultFuture. whenSuccess { result in
164
- messenger. send ( DataResponse ( result, id: id) . toJSON ( self . encoder) )
175
+ self . messenger. send ( DataResponse ( result, id: id) . toJSON ( self . encoder) )
165
176
}
166
177
resultFuture. whenFailure { error in
167
- messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
178
+ self . messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
168
179
}
169
180
} ,
170
181
onError: { error in
171
- messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
182
+ self . messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
172
183
} ,
173
184
onCompleted: {
174
- messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
175
- _ = messenger. close ( )
185
+ self . messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
186
+ _ = self . messenger. close ( )
176
187
}
177
188
) . disposed ( by: self . disposeBag)
178
189
}
179
190
subscribeFuture. whenFailure { error in
180
191
let error = GraphQLWSError . graphQLError ( error)
181
- _ = messenger. error ( error. message, code: error. code)
192
+ _ = self . messenger. error ( error. message, code: error. code)
182
193
}
183
194
}
184
195
else {
185
196
let executeFuture = onExecute ( graphQLRequest)
186
197
executeFuture. whenSuccess { result in
187
- messenger. send ( DataResponse ( result, id: id) . toJSON ( self . encoder) )
188
- messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
198
+ self . messenger. send ( DataResponse ( result, id: id) . toJSON ( self . encoder) )
199
+ self . messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
189
200
}
190
201
executeFuture. whenFailure { error in
191
- messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
192
- messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
202
+ self . messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
203
+ self . messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
193
204
}
194
205
}
195
206
}
@@ -203,7 +214,7 @@ class Server {
203
214
onExit ( )
204
215
}
205
216
206
- private func onConnectionTerminate( _: ConnectionTerminateRequest , _ messenger : Messenger ) {
217
+ private func onConnectionTerminate( _: ConnectionTerminateRequest ) {
207
218
onExit ( )
208
219
_ = messenger. close ( )
209
220
}
0 commit comments