Skip to content

Commit 3b1d1ab

Browse files
committed
Polishing in WebSocket support
1 parent 84f8cfe commit 3b1d1ab

File tree

8 files changed

+68
-97
lines changed

8 files changed

+68
-97
lines changed

spring-graphql/src/main/java/org/springframework/graphql/client/DefaultWebSocketGraphQlClientBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -132,7 +132,7 @@ private WebSocketGraphQlClientInterceptor getInterceptor() {
132132
List<WebSocketGraphQlClientInterceptor> interceptors = getInterceptors().stream()
133133
.filter(interceptor -> interceptor instanceof WebSocketGraphQlClientInterceptor)
134134
.map(interceptor -> (WebSocketGraphQlClientInterceptor) interceptor)
135-
.collect(Collectors.toList());
135+
.toList();
136136

137137
Assert.state(interceptors.size() <= 1,
138138
"Only a single interceptor of type WebSocketGraphQlClientInterceptor may be configured");

spring-graphql/src/main/java/org/springframework/graphql/client/WebSocketGraphQlClientInterceptor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,10 @@
2727
* for WebSocket interception points. Only a single interceptor of type
2828
* {@link WebSocketGraphQlClientInterceptor} may be configured.
2929
*
30+
* <p>Use {@link GraphQlClient.Builder#interceptor(GraphQlClientInterceptor...)}
31+
* to configure the interceptor chain. Only one interceptor in the chain may be
32+
* of type {@code WebSocketGraphQlClientInterceptor}.
33+
*
3034
* @author Rossen Stoyanchev
3135
* @since 1.0.0
3236
*/

spring-graphql/src/main/java/org/springframework/graphql/client/WebSocketGraphQlTransport.java

Lines changed: 16 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -87,6 +87,7 @@ final class WebSocketGraphQlTransport implements GraphQlTransport {
8787
.cacheInvalidateWhen(GraphQlSession::notifyWhenClosed);
8888
}
8989

90+
@SuppressWarnings({"CallingSubscribeInNonBlockingScope", "ReactorTransformationOnMonoVoid"})
9091
private static Mono<GraphQlSession> initGraphQlSession(
9192
URI uri, HttpHeaders headers, WebSocketClient client, GraphQlSessionHandler handler) {
9293

@@ -219,6 +220,7 @@ public boolean isStopped() {
219220
}
220221

221222

223+
@SuppressWarnings({"ReactorZipWithMonoVoid", "ReactiveStreamsThrowInOperator"})
222224
@Override
223225
public Mono<Void> handle(WebSocketSession session) {
224226

@@ -265,21 +267,12 @@ public Mono<Void> handle(WebSocketSession session) {
265267
try {
266268
GraphQlWebSocketMessage message = this.codecDelegate.decode(webSocketMessage);
267269
switch (message.resolvedType()) {
268-
case NEXT:
269-
graphQlSession.handleNext(message);
270-
break;
271-
case PING:
272-
graphQlSession.sendPong(null);
273-
break;
274-
case ERROR:
275-
graphQlSession.handleError(message);
276-
break;
277-
case COMPLETE:
278-
graphQlSession.handleComplete(message);
279-
break;
280-
default:
281-
throw new IllegalStateException(
282-
"Unexpected message type: '" + message.getType() + "'");
270+
case NEXT -> graphQlSession.handleNext(message);
271+
case PING -> graphQlSession.sendPong(null);
272+
case ERROR -> graphQlSession.handleError(message);
273+
case COMPLETE -> graphQlSession.handleComplete(message);
274+
default -> throw new IllegalStateException(
275+
"Unexpected message type: '" + message.getType() + "'");
283276
}
284277
}
285278
catch (Exception ex) {
@@ -501,7 +494,7 @@ public void handleError(GraphQlWebSocketMessage message) {
501494
}
502495
else {
503496
List<ResponseError> errors = response.getErrors();
504-
Exception ex = new SubscriptionErrorException(requestState.getRequest(), errors);
497+
Exception ex = new SubscriptionErrorException(requestState.request(), errors);
505498
requestState.handlerError(ex);
506499
}
507500
}
@@ -619,7 +612,7 @@ public void sendRequest(GraphQlWebSocketMessage message) {
619612
*/
620613
private interface RequestState {
621614

622-
GraphQlRequest getRequest();
615+
GraphQlRequest request();
623616

624617
void handleResponse(GraphQlResponse response);
625618

@@ -628,7 +621,7 @@ private interface RequestState {
628621
void handleCompletion();
629622

630623
default void emitDisconnectError(String message, CloseStatus closeStatus) {
631-
emitDisconnectError(new WebSocketDisconnectedException(message, getRequest(), closeStatus));
624+
emitDisconnectError(new WebSocketDisconnectedException(message, request(), closeStatus));
632625
}
633626

634627
void emitDisconnectError(WebSocketDisconnectedException ex);
@@ -639,21 +632,8 @@ default void emitDisconnectError(String message, CloseStatus closeStatus) {
639632
/**
640633
* State container for a request that emits a single response.
641634
*/
642-
private static class SingleResponseRequestState implements RequestState {
643-
644-
private final GraphQlRequest request;
645-
646-
private final MonoSink<GraphQlResponse> responseSink;
647-
648-
SingleResponseRequestState(GraphQlRequest request, MonoSink<GraphQlResponse> responseSink) {
649-
this.request = request;
650-
this.responseSink = responseSink;
651-
}
652-
653-
@Override
654-
public GraphQlRequest getRequest() {
655-
return this.request;
656-
}
635+
private record SingleResponseRequestState(
636+
GraphQlRequest request, MonoSink<GraphQlResponse> responseSink) implements RequestState {
657637

658638
@Override
659639
public void handleResponse(GraphQlResponse response) {
@@ -681,21 +661,8 @@ public void emitDisconnectError(WebSocketDisconnectedException ex) {
681661
/**
682662
* State container for a subscription request that emits a stream of responses.
683663
*/
684-
private static class SubscriptionRequestState implements RequestState {
685-
686-
private final GraphQlRequest request;
687-
688-
private final FluxSink<GraphQlResponse> responseSink;
689-
690-
SubscriptionRequestState(GraphQlRequest request, FluxSink<GraphQlResponse> responseSink) {
691-
this.request = request;
692-
this.responseSink = responseSink;
693-
}
694-
695-
@Override
696-
public GraphQlRequest getRequest() {
697-
return request;
698-
}
664+
private record SubscriptionRequestState(
665+
GraphQlRequest request, FluxSink<GraphQlResponse> responseSink) implements RequestState {
699666

700667
@Override
701668
public void handleResponse(GraphQlResponse response) {

spring-graphql/src/main/java/org/springframework/graphql/server/WebSocketGraphQlInterceptor.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,9 +22,11 @@
2222

2323
/**
2424
* An extension of {@link WebGraphQlInterceptor} with additional methods
25-
* to handle the start and end of a WebSocket connection. Only a single
26-
* interceptor of type {@link WebSocketGraphQlInterceptor} may be
27-
* declared.
25+
* to handle the start and end of a WebSocket connection.
26+
*
27+
* <p>Use {@link WebGraphQlHandler.Builder#interceptor(WebGraphQlInterceptor...)}
28+
* to configure the interceptor chain. Only one interceptor in the chain may be
29+
* of type {@code WebSocketGraphQlInterceptor}.
2830
*
2931
* @author Rossen Stoyanchev
3032
* @since 1.0.0
@@ -57,8 +59,8 @@ default Mono<Object> handleConnectionInitialization(
5759
* additional, or more centralized handling across subscriptions.
5860
* @param sessionInfo information about the underlying WebSocket session
5961
* @param subscriptionId the unique id for the subscription; correlates to the
60-
* {@link WebGraphQlRequest#getId() requestId} from the original {@code "subscribe"}
61-
* message that started the subscription
62+
* {@link WebGraphQlRequest#getId() requestId} from the {@code "subscribe"}
63+
* message that started the subscription stream
6264
* @return {@code Mono} for the completion of handling
6365
*/
6466
default Mono<Void> handleCancelledSubscription(WebSocketSessionInfo sessionInfo, String subscriptionId) {

spring-graphql/src/main/java/org/springframework/graphql/server/webflux/GraphQlWebSocketHandler.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public List<String> getSubProtocols() {
101101
}
102102

103103

104+
@SuppressWarnings("CallingSubscribeInNonBlockingScope")
104105
@Override
105106
public Mono<Void> handle(WebSocketSession session) {
106107
HandshakeInfo handshakeInfo = session.getHandshakeInfo();
@@ -140,7 +141,7 @@ public Mono<Void> handle(WebSocketSession session) {
140141
String id = message.getId();
141142
Map<String, Object> payload = message.getPayload();
142143
switch (message.resolvedType()) {
143-
case SUBSCRIBE:
144+
case SUBSCRIBE -> {
144145
if (connectionInitPayloadRef.get() == null) {
145146
return GraphQlStatus.close(session, GraphQlStatus.UNAUTHORIZED_STATUS);
146147
}
@@ -156,9 +157,11 @@ public Mono<Void> handle(WebSocketSession session) {
156157
return this.graphQlHandler.handleRequest(request)
157158
.flatMapMany(response -> handleResponse(session, id, subscriptions, response))
158159
.doOnTerminate(() -> subscriptions.remove(id));
159-
case PING:
160+
}
161+
case PING -> {
160162
return Flux.just(this.codecDelegate.encode(session, GraphQlWebSocketMessage.pong(null)));
161-
case COMPLETE:
163+
}
164+
case COMPLETE -> {
162165
if (id != null) {
163166
Subscription subscription = subscriptions.remove(id);
164167
if (subscription != null) {
@@ -168,7 +171,8 @@ public Mono<Void> handle(WebSocketSession session) {
168171
.thenMany(Flux.empty());
169172
}
170173
return Flux.empty();
171-
case CONNECTION_INIT:
174+
}
175+
case CONNECTION_INIT -> {
172176
if (!connectionInitPayloadRef.compareAndSet(null, payload)) {
173177
return GraphQlStatus.close(session, GraphQlStatus.TOO_MANY_INIT_REQUESTS_STATUS);
174178
}
@@ -177,8 +181,10 @@ public Mono<Void> handle(WebSocketSession session) {
177181
.map(ackPayload -> this.codecDelegate.encodeConnectionAck(session, ackPayload))
178182
.flux()
179183
.onErrorResume(ex -> GraphQlStatus.close(session, GraphQlStatus.UNAUTHORIZED_STATUS));
180-
default:
184+
}
185+
default -> {
181186
return GraphQlStatus.close(session, GraphQlStatus.INVALID_MESSAGE_STATUS);
187+
}
182188
}
183189
}));
184190
}

spring-graphql/src/main/java/org/springframework/graphql/server/webmvc/GraphQlWebSocketHandler.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ private void handleInternal(WebSocketSession session, TextMessage webSocketMessa
191191
Map<String, Object> payload = message.getPayload();
192192
SessionState state = getSessionInfo(session);
193193
switch (message.resolvedType()) {
194-
case SUBSCRIBE:
194+
case SUBSCRIBE -> {
195195
if (state.getConnectionInitPayload() == null) {
196196
GraphQlStatus.closeSession(session, GraphQlStatus.UNAUTHORIZED_STATUS);
197197
return;
@@ -212,11 +212,9 @@ private void handleInternal(WebSocketSession session, TextMessage webSocketMessa
212212
.flatMapMany((response) -> handleResponse(session, request.getId(), response))
213213
.publishOn(state.getScheduler()) // Serial blocking send via single thread
214214
.subscribe(new SendMessageSubscriber(id, session, state));
215-
return;
216-
case PING:
217-
session.sendMessage(encode(GraphQlWebSocketMessage.pong(null)));
218-
return;
219-
case COMPLETE:
215+
}
216+
case PING -> session.sendMessage(encode(GraphQlWebSocketMessage.pong(null)));
217+
case COMPLETE -> {
220218
if (id != null) {
221219
Subscription subscription = state.getSubscriptions().remove(id);
222220
if (subscription != null) {
@@ -225,8 +223,8 @@ private void handleInternal(WebSocketSession session, TextMessage webSocketMessa
225223
this.webSocketGraphQlInterceptor.handleCancelledSubscription(state.getSessionInfo(), id)
226224
.block(Duration.ofSeconds(10));
227225
}
228-
return;
229-
case CONNECTION_INIT:
226+
}
227+
case CONNECTION_INIT -> {
230228
if (!state.setConnectionInitPayload(payload)) {
231229
GraphQlStatus.closeSession(session, GraphQlStatus.TOO_MANY_INIT_REQUESTS_STATUS);
232230
return;
@@ -248,9 +246,8 @@ private void handleInternal(WebSocketSession session, TextMessage webSocketMessa
248246
return Mono.empty();
249247
})
250248
.block(Duration.ofSeconds(10));
251-
return;
252-
default:
253-
GraphQlStatus.closeSession(session, GraphQlStatus.INVALID_MESSAGE_STATUS);
249+
}
250+
default -> GraphQlStatus.closeSession(session, GraphQlStatus.INVALID_MESSAGE_STATUS);
254251
}
255252
}
256253

@@ -343,11 +340,6 @@ public void afterConnectionClosed(WebSocketSession session, CloseStatus closeSta
343340
}
344341
}
345342

346-
@Override
347-
public boolean supportsPartialMessages() {
348-
return false;
349-
}
350-
351343

352344
/**
353345
* {@code HandshakeInterceptor} that propagates ThreadLocal context through

spring-graphql/src/test/java/org/springframework/graphql/client/MockGraphQlWebSocketServer.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -88,15 +88,16 @@ public Mono<Void> handle(WebSocketSession session) {
8888
@SuppressWarnings("SuspiciousMethodCalls")
8989
private Publisher<GraphQlWebSocketMessage> handleMessage(GraphQlWebSocketMessage message) {
9090
switch (message.resolvedType()) {
91-
case CONNECTION_INIT:
91+
case CONNECTION_INIT -> {
9292
if (this.connectionInitHandler == null) {
9393
return Flux.just(GraphQlWebSocketMessage.connectionAck(null));
9494
}
9595
else {
9696
Map<String, Object> payload = message.getPayload();
9797
return this.connectionInitHandler.apply(payload).map(GraphQlWebSocketMessage::connectionAck);
9898
}
99-
case SUBSCRIBE:
99+
}
100+
case SUBSCRIBE -> {
100101
String id = message.getId();
101102
Exchange request = expectedExchanges.get(message.getPayload());
102103
if (id == null || request == null) {
@@ -108,10 +109,13 @@ private Publisher<GraphQlWebSocketMessage> handleMessage(GraphQlWebSocketMessage
108109
request.getError() != null ?
109110
GraphQlWebSocketMessage.error(id, Collections.singletonList(request.getError())) :
110111
GraphQlWebSocketMessage.complete(id));
111-
case COMPLETE:
112+
}
113+
case COMPLETE -> {
112114
return Flux.empty();
113-
default:
115+
}
116+
default -> {
114117
return Flux.error(new IllegalStateException("Unexpected message: " + message));
118+
}
115119
}
116120
}
117121

spring-graphql/src/test/java/org/springframework/graphql/client/WebSocketGraphQlTransportTests.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -360,18 +360,14 @@ public Mono<Void> handle(WebSocketSession session) {
360360
return session.send(session.receive()
361361
.flatMap(webSocketMessage -> {
362362
GraphQlWebSocketMessage message = this.codecDelegate.decode(webSocketMessage);
363-
switch (message.resolvedType()) {
364-
case CONNECTION_INIT:
365-
return Flux.just(
366-
GraphQlWebSocketMessage.connectionAck(null),
367-
GraphQlWebSocketMessage.ping(null));
368-
case SUBSCRIBE:
369-
return Flux.just(GraphQlWebSocketMessage.next("1", this.response.toMap()));
370-
case PONG:
371-
return Flux.empty();
372-
default:
373-
return Flux.error(new IllegalStateException("Unexpected message: " + message));
374-
}
363+
return switch (message.resolvedType()) {
364+
case CONNECTION_INIT -> Flux.just(
365+
GraphQlWebSocketMessage.connectionAck(null),
366+
GraphQlWebSocketMessage.ping(null));
367+
case SUBSCRIBE -> Flux.just(GraphQlWebSocketMessage.next("1", this.response.toMap()));
368+
case PONG -> Flux.empty();
369+
default -> Flux.error(new IllegalStateException("Unexpected message: " + message));
370+
};
375371
})
376372
.map(graphQlMessage -> this.codecDelegate.encode(session, graphQlMessage))
377373
);

0 commit comments

Comments
 (0)