22
22
import java .util .concurrent .ConcurrentHashMap ;
23
23
import java .util .concurrent .atomic .AtomicBoolean ;
24
24
import java .util .concurrent .atomic .AtomicLong ;
25
- import java .util .function .Consumer ;
26
25
27
26
import org .apache .commons .logging .Log ;
28
27
import org .apache .commons .logging .LogFactory ;
@@ -69,17 +68,18 @@ final class WebSocketGraphQlTransport implements GraphQlTransport {
69
68
70
69
WebSocketGraphQlTransport (
71
70
URI url , @ Nullable HttpHeaders headers , WebSocketClient client , CodecConfigurer codecConfigurer ,
72
- @ Nullable Object connectionInitPayload , Consumer < Map < String , Object >> connectionAckHandler ) {
71
+ WebSocketGraphQlClientInterceptor interceptor ) {
73
72
74
73
Assert .notNull (url , "URI is required" );
75
- Assert .notNull (url , "URI is required" );
74
+ Assert .notNull (client , "WebSocketClient is required" );
75
+ Assert .notNull (codecConfigurer , "CodecConfigurer is required" );
76
+ Assert .notNull (interceptor , "WebSocketGraphQlClientInterceptor is required" );
76
77
77
78
this .url = url ;
78
79
this .headers .putAll (headers != null ? headers : HttpHeaders .EMPTY );
79
80
this .webSocketClient = client ;
80
81
81
- this .graphQlSessionHandler = new GraphQlSessionHandler (
82
- codecConfigurer , connectionInitPayload , connectionAckHandler );
82
+ this .graphQlSessionHandler = new GraphQlSessionHandler (codecConfigurer , interceptor );
83
83
84
84
this .graphQlSessionMono = initGraphQlSession (this .url , this .headers , client , this .graphQlSessionHandler )
85
85
.cacheInvalidateWhen (GraphQlSession ::notifyWhenClosed );
@@ -167,21 +167,16 @@ private static class GraphQlSessionHandler implements WebSocketHandler {
167
167
168
168
private final CodecDelegate codecDelegate ;
169
169
170
- private final GraphQlMessage connectionInitMessage ;
171
-
172
- private final Consumer <Map <String , Object >> connectionAckHandler ;
170
+ private final WebSocketGraphQlClientInterceptor interceptor ;
173
171
174
172
private Sinks .One <GraphQlSession > graphQlSessionSink ;
175
173
176
174
private final AtomicBoolean stopped = new AtomicBoolean ();
177
175
178
176
179
- GraphQlSessionHandler (CodecConfigurer codecConfigurer ,
180
- @ Nullable Object connectionInitPayload , Consumer <Map <String , Object >> connectionAckHandler ) {
181
-
177
+ GraphQlSessionHandler (CodecConfigurer codecConfigurer , WebSocketGraphQlClientInterceptor interceptor ) {
182
178
this .codecDelegate = new CodecDelegate (codecConfigurer );
183
- this .connectionInitMessage = GraphQlMessage .connectionInit (connectionInitPayload );
184
- this .connectionAckHandler = connectionAckHandler ;
179
+ this .interceptor = interceptor ;
185
180
this .graphQlSessionSink = Sinks .unsafe ().one ();
186
181
}
187
182
@@ -231,8 +226,12 @@ public Mono<Void> handle(WebSocketSession session) {
231
226
GraphQlSession graphQlSession = new GraphQlSession (session );
232
227
registerCloseStatusHandling (graphQlSession , session );
233
228
229
+ Mono <GraphQlMessage > connectionInitMono = this .interceptor .connectionInitPayload ()
230
+ .defaultIfEmpty (Collections .emptyMap ())
231
+ .map (GraphQlMessage ::connectionInit );
232
+
234
233
Mono <Void > sendCompletion =
235
- session .send (Flux . just ( this . connectionInitMessage ) .concatWith (graphQlSession .getRequestFlux ())
234
+ session .send (connectionInitMono .concatWith (graphQlSession .getRequestFlux ())
236
235
.map (message -> this .codecDelegate .encode (session , message )));
237
236
238
237
Mono <Void > receiveCompletion = session .receive ()
@@ -242,20 +241,23 @@ public Mono<Void> handle(WebSocketSession session) {
242
241
GraphQlMessage message = this .codecDelegate .decode (webSocketMessage );
243
242
Assert .state (message .resolvedType () == GraphQlMessageType .CONNECTION_ACK ,
244
243
() -> "Unexpected message before connection_ack: " + message );
245
- this .connectionAckHandler .accept (message .getPayload ());
246
- if (logger .isDebugEnabled ()) {
247
- logger .debug (graphQlSession + " initialized" );
248
- }
244
+ return this .interceptor .handleConnectionAck (message .getPayload ())
245
+ .then (Mono .defer (() -> {
246
+ if (logger .isDebugEnabled ()) {
247
+ logger .debug (graphQlSession + " initialized" );
248
+ }
249
+ Sinks .EmitResult result = this .graphQlSessionSink .tryEmitValue (graphQlSession );
250
+ if (result .isFailure ()) {
251
+ return Mono .error (new IllegalStateException (
252
+ "GraphQlSession initialized but could not be emitted: " + result ));
253
+ }
254
+ return Mono .empty ();
255
+ }));
249
256
}
250
257
catch (Throwable ex ) {
251
258
this .graphQlSessionSink .tryEmitError (ex );
252
259
return Mono .error (ex );
253
260
}
254
- Sinks .EmitResult emitResult = this .graphQlSessionSink .tryEmitValue (graphQlSession );
255
- if (emitResult .isFailure ()) {
256
- return Mono .error (new IllegalStateException (
257
- "GraphQlSession initialized but could not be emitted: " + emitResult ));
258
- }
259
261
}
260
262
else {
261
263
try {
0 commit comments