@@ -96,10 +96,15 @@ private static Mono<GraphQlSession> initGraphQlSession(
96
96
return Mono .error (new IllegalStateException ("WebSocketGraphQlTransport has been stopped" ));
97
97
}
98
98
99
+ // Get the session Mono before connecting
100
+ Mono <GraphQlSession > sessionMono = handler .getGraphQlSession ();
101
+
99
102
client .execute (uri , headers , handler )
100
- .subscribe (aVoid -> {}, handler ::handleWebSocketSessionError , () -> {});
103
+ .subscribe (aVoid -> {},
104
+ handler ::handleWebSocketSessionError ,
105
+ handler ::handleWebSocketSessionClosed );
101
106
102
- return handler . getGraphQlSession () ;
107
+ return sessionMono ;
103
108
});
104
109
}
105
110
@@ -311,10 +316,6 @@ private void registerCloseStatusHandling(GraphQlSession graphQlSession, WebSocke
311
316
}
312
317
graphQlSession .terminateRequests (closeStatusMessage , closeStatus );
313
318
})
314
- .doOnTerminate (() -> {
315
- // Reset GraphQlSession sink to be ready to connect again
316
- this .graphQlSessionSink = Sinks .unsafe ().one ();
317
- })
318
319
.subscribe ();
319
320
}
320
321
@@ -356,6 +357,15 @@ else if (logger.isErrorEnabled()) {
356
357
}
357
358
358
359
this .graphQlSessionSink .tryEmitError (ex );
360
+ this .graphQlSessionSink = Sinks .unsafe ().one ();
361
+ }
362
+
363
+ /**
364
+ * This must be called from code that calls the {@code WebSocketClient}
365
+ * when execution completes.
366
+ */
367
+ public void handleWebSocketSessionClosed () {
368
+ this .graphQlSessionSink = Sinks .unsafe ().one ();
359
369
}
360
370
361
371
}
0 commit comments