@@ -8,7 +8,8 @@ import RxSwift
8
8
9
9
/// Server implements the server-side portion of the protocol, allowing a few callbacks for customization.
10
10
public class Server {
11
- let messenger : Messenger
11
+ // We keep this weak because we strongly inject this object into the messenger callback
12
+ weak var messenger : Messenger ?
12
13
13
14
let onExecute : ( GraphQLRequest ) -> EventLoopFuture < GraphQLResult >
14
15
let onSubscribe : ( GraphQLRequest ) -> EventLoopFuture < SubscriptionResult >
@@ -38,8 +39,8 @@ public class Server {
38
39
self . onExecute = onExecute
39
40
self . onSubscribe = onSubscribe
40
41
41
- self . messenger. onRecieve { [ weak self ] message in
42
- guard let self = self else { return }
42
+ messenger. onRecieve { message in
43
+ guard let messenger = self . messenger else { return }
43
44
44
45
self . onMessage ( message)
45
46
@@ -51,7 +52,7 @@ public class Server {
51
52
52
53
guard let json = message. data ( using: . utf8) else {
53
54
let error = GraphQLWSError . invalidEncoding ( )
54
- self . messenger. error ( error. message, code: error. code)
55
+ messenger. error ( error. message, code: error. code)
55
56
return
56
57
}
57
58
@@ -61,42 +62,42 @@ public class Server {
61
62
}
62
63
catch {
63
64
let error = GraphQLWSError . noType ( )
64
- self . messenger. error ( error. message, code: error. code)
65
+ messenger. error ( error. message, code: error. code)
65
66
return
66
67
}
67
68
68
69
switch request. type {
69
70
case . GQL_CONNECTION_INIT:
70
71
guard let connectionInitRequest = try ? self . decoder. decode ( ConnectionInitRequest . self, from: json) else {
71
72
let error = GraphQLWSError . invalidRequestFormat ( messageType: . GQL_CONNECTION_INIT)
72
- self . messenger. error ( error. message, code: error. code)
73
+ messenger. error ( error. message, code: error. code)
73
74
return
74
75
}
75
- self . onConnectionInit ( connectionInitRequest)
76
+ self . onConnectionInit ( connectionInitRequest, messenger )
76
77
case . GQL_START:
77
78
guard let startRequest = try ? self . decoder. decode ( StartRequest . self, from: json) else {
78
79
let error = GraphQLWSError . invalidRequestFormat ( messageType: . GQL_START)
79
- self . messenger. error ( error. message, code: error. code)
80
+ messenger. error ( error. message, code: error. code)
80
81
return
81
82
}
82
- self . onStart ( startRequest)
83
+ self . onStart ( startRequest, messenger )
83
84
case . GQL_STOP:
84
85
guard let stopRequest = try ? self . decoder. decode ( StopRequest . self, from: json) else {
85
86
let error = GraphQLWSError . invalidRequestFormat ( messageType: . GQL_STOP)
86
- self . messenger. error ( error. message, code: error. code)
87
+ messenger. error ( error. message, code: error. code)
87
88
return
88
89
}
89
- self . onStop ( stopRequest, self . messenger)
90
+ self . onStop ( stopRequest, messenger)
90
91
case . GQL_CONNECTION_TERMINATE:
91
92
guard let connectionTerminateRequest = try ? self . decoder. decode ( ConnectionTerminateRequest . self, from: json) else {
92
93
let error = GraphQLWSError . invalidRequestFormat ( messageType: . GQL_CONNECTION_TERMINATE)
93
- self . messenger. error ( error. message, code: error. code)
94
+ messenger. error ( error. message, code: error. code)
94
95
return
95
96
}
96
- self . onConnectionTerminate ( connectionTerminateRequest)
97
+ self . onConnectionTerminate ( connectionTerminateRequest, messenger )
97
98
case . unknown:
98
99
let error = GraphQLWSError . invalidType ( )
99
- self . messenger. error ( error. message, code: error. code)
100
+ messenger. error ( error. message, code: error. code)
100
101
}
101
102
}
102
103
@@ -126,7 +127,7 @@ public class Server {
126
127
self . onMessage = callback
127
128
}
128
129
129
- private func onConnectionInit( _ connectionInitRequest: ConnectionInitRequest ) {
130
+ private func onConnectionInit( _ connectionInitRequest: ConnectionInitRequest , _ messenger : Messenger ) {
130
131
guard !initialized else {
131
132
let error = GraphQLWSError . tooManyInitializations ( )
132
133
messenger. error ( error. message, code: error. code)
@@ -148,7 +149,7 @@ public class Server {
148
149
// TODO: Should we send the `ka` message?
149
150
}
150
151
151
- private func onStart( _ startRequest: StartRequest ) {
152
+ private func onStart( _ startRequest: StartRequest , _ messenger : Messenger ) {
152
153
guard initialized else {
153
154
let error = GraphQLWSError . notInitialized ( )
154
155
messenger. error ( error. message, code: error. code)
@@ -169,48 +170,50 @@ public class Server {
169
170
170
171
if isStreaming {
171
172
let subscribeFuture = onSubscribe ( graphQLRequest)
172
- subscribeFuture. whenSuccess { [ weak self] result in
173
- guard let self = self else { return }
173
+ subscribeFuture. whenSuccess { result in
174
174
guard let streamOpt = result. stream else {
175
175
// API issue - subscribe resolver isn't stream
176
176
let error = GraphQLWSError . internalAPIStreamIssue ( )
177
- self . messenger. error ( error. message, code: error. code)
177
+ messenger. error ( error. message, code: error. code)
178
178
return
179
179
}
180
180
let stream = streamOpt as! ObservableSubscriptionEventStream
181
181
let observable = stream. observable
182
182
observable. subscribe (
183
- onNext: { resultFuture in
183
+ onNext: { [ weak self] resultFuture in
184
+ guard let self = self , let messenger = self . messenger else { return }
184
185
resultFuture. whenSuccess { result in
185
- self . messenger. send ( DataResponse ( result, id: id) . toJSON ( self . encoder) )
186
+ messenger. send ( DataResponse ( result, id: id) . toJSON ( self . encoder) )
186
187
}
187
188
resultFuture. whenFailure { error in
188
- self . messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
189
+ messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
189
190
}
190
191
} ,
191
- onError: { error in
192
- self . messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
192
+ onError: { [ weak self] error in
193
+ guard let self = self , let messenger = self . messenger else { return }
194
+ messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
193
195
} ,
194
- onCompleted: {
195
- self . messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
196
- _ = self . messenger. close ( )
196
+ onCompleted: { [ weak self] in
197
+ guard let self = self , let messenger = self . messenger else { return }
198
+ messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
199
+ _ = messenger. close ( )
197
200
}
198
201
) . disposed ( by: self . disposeBag)
199
202
}
200
203
subscribeFuture. whenFailure { error in
201
204
let error = GraphQLWSError . graphQLError ( error)
202
- _ = self . messenger. error ( error. message, code: error. code)
205
+ _ = messenger. error ( error. message, code: error. code)
203
206
}
204
207
}
205
208
else {
206
209
let executeFuture = onExecute ( graphQLRequest)
207
210
executeFuture. whenSuccess { result in
208
- self . messenger. send ( DataResponse ( result, id: id) . toJSON ( self . encoder) )
209
- self . messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
211
+ messenger. send ( DataResponse ( result, id: id) . toJSON ( self . encoder) )
212
+ messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
210
213
}
211
214
executeFuture. whenFailure { error in
212
- self . messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
213
- self . messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
215
+ messenger. send ( ErrorResponse ( error, id: id) . toJSON ( self . encoder) )
216
+ messenger. send ( CompleteResponse ( id: id) . toJSON ( self . encoder) )
214
217
}
215
218
}
216
219
}
@@ -224,7 +227,7 @@ public class Server {
224
227
onExit ( )
225
228
}
226
229
227
- private func onConnectionTerminate( _: ConnectionTerminateRequest ) {
230
+ private func onConnectionTerminate( _: ConnectionTerminateRequest , _ messenger : Messenger ) {
228
231
onExit ( )
229
232
_ = messenger. close ( )
230
233
}
0 commit comments