Skip to content

Commit 5b93c1f

Browse files
committed
RSocket support for SubscriptionErrorException
See gh-339
1 parent 3090328 commit 5b93c1f

11 files changed

+172
-41
lines changed

spring-graphql/src/main/java/org/springframework/graphql/client/AbstractGraphQlClientBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,29 @@ protected void setJsonEncoder(Encoder<?> encoder) {
118118
this.jsonEncoder = encoder;
119119
}
120120

121+
/**
122+
* Access to the configured JSON encoder.
123+
*/
124+
protected Encoder<?> getJsonEncoder() {
125+
Assert.notNull(this.jsonEncoder, "JSON Encoder not set");
126+
return this.jsonEncoder;
127+
}
128+
121129
/**
122130
* Variant of {@link #setJsonCodecs} for setting each codec individually.
123131
*/
124132
protected void setJsonDecoder(Decoder<?> decoder) {
125133
this.jsonDecoder = decoder;
126134
}
127135

136+
/**
137+
* Access to the configured JSON encoder.
138+
*/
139+
protected Decoder<?> getJsonDecoder() {
140+
Assert.notNull(this.jsonDecoder, "JSON Encoder not set");
141+
return this.jsonDecoder;
142+
}
143+
128144
/**
129145
* Return the configured interceptors. For subclasses that look for a
130146
* transport specific interceptor extensions.

spring-graphql/src/main/java/org/springframework/graphql/client/CodecDelegate.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.springframework.core.io.buffer.DataBufferUtils;
2626
import org.springframework.graphql.server.support.GraphQlWebSocketMessage;
2727
import org.springframework.http.MediaType;
28-
import org.springframework.http.codec.ClientCodecConfigurer;
2928
import org.springframework.http.codec.CodecConfigurer;
3029
import org.springframework.http.codec.DecoderHttpMessageReader;
3130
import org.springframework.http.codec.EncoderHttpMessageWriter;
@@ -52,10 +51,6 @@ final class CodecDelegate {
5251
private final Encoder<?> encoder;
5352

5453

55-
CodecDelegate() {
56-
this(ClientCodecConfigurer.create());
57-
}
58-
5954
CodecDelegate(CodecConfigurer configurer) {
6055
Assert.notNull(configurer, "CodecConfigurer is required");
6156
this.codecConfigurer = configurer;

spring-graphql/src/main/java/org/springframework/graphql/client/DefaultRSocketGraphQlClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,16 +147,16 @@ public Builder rsocketRequester(Consumer<RSocketRequester.Builder> requesterCons
147147
@Override
148148
public RSocketGraphQlClient build() {
149149

150-
Assert.state(this.clientTransport != null, "Neither WebSocket nor TCP networking configured");
151-
RSocketRequester requester = this.requesterBuilder.transport(this.clientTransport);
152-
RSocketGraphQlTransport graphQlTransport = new RSocketGraphQlTransport(this.route, requester);
153-
154150
// Pass the codecs to the parent for response decoding
155151
this.requesterBuilder.rsocketStrategies(builder -> {
156152
builder.decoders(decoders -> setJsonDecoder(CodecDelegate.findJsonDecoder(decoders)));
157153
builder.encoders(encoders -> setJsonEncoder(CodecDelegate.findJsonEncoder(encoders)));
158154
});
159155

156+
Assert.state(this.clientTransport != null, "Neither WebSocket nor TCP networking configured");
157+
RSocketRequester requester = this.requesterBuilder.transport(this.clientTransport);
158+
RSocketGraphQlTransport graphQlTransport = new RSocketGraphQlTransport(this.route, requester, getJsonDecoder());
159+
160160
return new DefaultRSocketGraphQlClient(
161161
super.buildGraphQlClient(graphQlTransport),
162162
this.requesterBuilder, this.clientTransport, this.route, getBuilderInitializer());

spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlTransport.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@
1616

1717
package org.springframework.graphql.client;
1818

19+
import java.nio.charset.StandardCharsets;
20+
import java.util.Collections;
21+
import java.util.List;
1922
import java.util.Map;
2023

24+
import graphql.GraphQLError;
25+
import io.rsocket.exceptions.RejectedException;
2126
import reactor.core.publisher.Flux;
2227
import reactor.core.publisher.Mono;
2328

2429
import org.springframework.core.ParameterizedTypeReference;
30+
import org.springframework.core.ResolvableType;
31+
import org.springframework.core.codec.Decoder;
32+
import org.springframework.core.codec.DecodingException;
33+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
2534
import org.springframework.graphql.GraphQlRequest;
2635
import org.springframework.graphql.GraphQlResponse;
2736
import org.springframework.messaging.rsocket.RSocketRequester;
@@ -43,17 +52,23 @@ final class RSocketGraphQlTransport implements GraphQlTransport {
4352
private static final ParameterizedTypeReference<Map<String, Object>> MAP_TYPE =
4453
new ParameterizedTypeReference<Map<String, Object>>() {};
4554

55+
private static final ResolvableType LIST_TYPE = ResolvableType.forClass(List.class);
56+
4657

4758
private final String route;
4859

4960
private final RSocketRequester rsocketRequester;
5061

62+
private final Decoder<?> jsonDecoder;
63+
5164

52-
RSocketGraphQlTransport(String route, RSocketRequester requester) {
65+
RSocketGraphQlTransport(String route, RSocketRequester requester, Decoder<?> jsonDecoder) {
5366
Assert.notNull(route, "'route' is required");
5467
Assert.notNull(requester, "RSocketRequester is required");
68+
Assert.notNull(jsonDecoder, "JSON Decoder is required");
5569
this.route = route;
5670
this.rsocketRequester = requester;
71+
this.jsonDecoder = jsonDecoder;
5772
}
5873

5974

@@ -68,7 +83,23 @@ public Mono<GraphQlResponse> execute(GraphQlRequest request) {
6883
public Flux<GraphQlResponse> executeSubscription(GraphQlRequest request) {
6984
return this.rsocketRequester.route(this.route).data(request.toMap())
7085
.retrieveFlux(MAP_TYPE)
86+
.doOnError(ex -> System.out.println(ex))
87+
.onErrorResume(RejectedException.class, ex -> Flux.error(decodeErrors(request, ex)))
7188
.map(ResponseMapGraphQlResponse::new);
7289
}
7390

91+
@SuppressWarnings("unchecked")
92+
private Exception decodeErrors(GraphQlRequest request, RejectedException ex) {
93+
try {
94+
byte[] errorData = ex.getMessage().getBytes(StandardCharsets.UTF_8);
95+
List<GraphQLError> errors = (List<GraphQLError>) this.jsonDecoder.decode(
96+
DefaultDataBufferFactory.sharedInstance.wrap(errorData), LIST_TYPE, null, null);
97+
GraphQlResponse response = new ResponseMapGraphQlResponse(Collections.singletonMap("errors", errors));
98+
return new SubscriptionErrorException(request, response.getErrors());
99+
}
100+
catch (DecodingException ex2) {
101+
return ex;
102+
}
103+
}
104+
74105
}

spring-graphql/src/main/java/org/springframework/graphql/client/ResponseMapGraphQlResponse.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,13 @@ private static List<SourceLocation> initLocations(Map<String, Object> errorMap)
136136

137137
@SuppressWarnings("unchecked")
138138
private static String initPath(Map<String, Object> errorMap) {
139-
return ((List<Object>) errorMap.getOrDefault("path", Collections.emptyList())).stream()
140-
.reduce("",
141-
(s, o) -> s + (o instanceof Integer ? "[" + o + "]" : (s.isEmpty() ? o : "." + o)),
142-
(s, s2) -> null);
139+
List<Object> path = (List<Object>) errorMap.get("path");
140+
if (path == null) {
141+
return "";
142+
}
143+
return path.stream().reduce("",
144+
(s, o) -> s + (o instanceof Integer ? "[" + o + "]" : (s.isEmpty() ? o : "." + o)),
145+
(s, s2) -> null);
143146
}
144147

145148
@Override

spring-graphql/src/main/java/org/springframework/graphql/server/DefaultWebGraphQlHandlerBuilder.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.graphql.ExecutionGraphQlService;
2626
import org.springframework.graphql.execution.ReactorContextManager;
2727
import org.springframework.graphql.execution.ThreadLocalAccessor;
28+
import org.springframework.graphql.server.WebGraphQlInterceptor.Chain;
2829
import org.springframework.lang.Nullable;
2930
import org.springframework.util.Assert;
3031
import org.springframework.util.CollectionUtils;
@@ -88,18 +89,18 @@ public WebGraphQlHandler.Builder threadLocalAccessors(List<ThreadLocalAccessor>
8889
@Override
8990
public WebGraphQlHandler build() {
9091

91-
WebGraphQlInterceptor.Chain endOfChain = request -> this.service.execute(request).map(WebGraphQlResponse::new);
92+
Chain endOfChain = request -> this.service.execute(request).map(WebGraphQlResponse::new);
9293

93-
WebGraphQlInterceptor.Chain chain = this.interceptors.stream()
94+
Chain executionChain = this.interceptors.stream()
9495
.reduce(WebGraphQlInterceptor::andThen)
95-
.map(interceptor -> (WebGraphQlInterceptor.Chain) (request) -> interceptor.intercept(request, endOfChain))
96+
.map(interceptor -> (Chain) (request) -> interceptor.intercept(request, endOfChain))
9697
.orElse(endOfChain);
9798

9899
return new WebGraphQlHandler() {
99100

100101
@Override
101102
public Mono<WebGraphQlResponse> handleRequest(WebGraphQlRequest request) {
102-
return chain.next(request)
103+
return executionChain.next(request)
103104
.contextWrite(context -> {
104105
if (!CollectionUtils.isEmpty(accessors)) {
105106
ThreadLocalAccessor accessor = ThreadLocalAccessor.composite(accessors);

spring-graphql/src/main/java/org/springframework/graphql/server/GraphQlRSocketHandler.java

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,29 @@
1717
package org.springframework.graphql.server;
1818

1919

20+
import java.nio.charset.StandardCharsets;
2021
import java.util.List;
2122
import java.util.Map;
2223

2324
import graphql.ExecutionResult;
25+
import graphql.GraphQLError;
26+
import io.rsocket.exceptions.InvalidException;
2427
import io.rsocket.exceptions.RejectedException;
2528
import org.reactivestreams.Publisher;
2629
import reactor.core.publisher.Flux;
2730
import reactor.core.publisher.Mono;
2831

32+
import org.springframework.core.ResolvableType;
33+
import org.springframework.core.codec.Encoder;
34+
import org.springframework.core.io.buffer.DataBuffer;
35+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
2936
import org.springframework.graphql.ExecutionGraphQlResponse;
3037
import org.springframework.graphql.ExecutionGraphQlService;
38+
import org.springframework.graphql.server.RSocketGraphQlInterceptor.Chain;
3139
import org.springframework.util.AlternativeJdkIdGenerator;
40+
import org.springframework.util.Assert;
3241
import org.springframework.util.IdGenerator;
42+
import org.springframework.util.MimeTypeUtils;
3343

3444

3545
/**
@@ -67,25 +77,46 @@
6777
*/
6878
public class GraphQlRSocketHandler {
6979

70-
private final RSocketGraphQlInterceptor.Chain executionChain;
80+
private static final ResolvableType LIST_TYPE = ResolvableType.forClass(List.class);
81+
82+
83+
private final Chain executionChain;
84+
85+
private final Encoder<?> jsonEncoder;
7186

7287
private final IdGenerator idGenerator = new AlternativeJdkIdGenerator();
7388

7489

7590
/**
7691
* Create a new instance that handles requests through a chain of interceptors
7792
* followed by the given {@link ExecutionGraphQlService}.
93+
* @param graphQlService the service that will execute the request
94+
* @param interceptors interceptors to form the processing chain
95+
* @param jsonEncoder a JSON encoder for serializing a
96+
* {@link graphql.GraphQLError} list for a failed subscription
7897
*/
7998
public GraphQlRSocketHandler(
80-
ExecutionGraphQlService service, List<RSocketGraphQlInterceptor> interceptors) {
99+
ExecutionGraphQlService graphQlService, List<RSocketGraphQlInterceptor> interceptors,
100+
Encoder<?> jsonEncoder) {
101+
102+
Assert.notNull(graphQlService, "ExecutionGraphQlService is required");
103+
Assert.notNull(jsonEncoder, "JSON Encoder is required");
104+
105+
this.executionChain = initExecutionChain(graphQlService, interceptors);
106+
this.jsonEncoder = jsonEncoder;
107+
}
81108

82-
RSocketGraphQlInterceptor.Chain endOfChain = request -> service.execute(request).map(RSocketGraphQlResponse::new);
109+
private static Chain initExecutionChain(
110+
ExecutionGraphQlService graphQlService, List<RSocketGraphQlInterceptor> interceptors) {
83111

84-
this.executionChain = (interceptors.isEmpty() ? endOfChain :
112+
Chain endOfChain = request ->
113+
graphQlService.execute(request).map(RSocketGraphQlResponse::new);
114+
115+
return interceptors.isEmpty() ? endOfChain :
85116
interceptors.stream()
86117
.reduce(RSocketGraphQlInterceptor::andThen)
87-
.map(interceptor -> (RSocketGraphQlInterceptor.Chain) request -> interceptor.intercept(request, endOfChain))
88-
.orElse(endOfChain));
118+
.map(interceptor -> (Chain) request -> interceptor.intercept(request, endOfChain))
119+
.orElse(endOfChain);
89120
}
90121

91122

@@ -106,12 +137,13 @@ public Flux<Map<String, Object>> handleSubscription(Map<String, Object> payload)
106137
Publisher<ExecutionResult> publisher = response.getData();
107138
return Flux.from(publisher).map(ExecutionResult::toSpecification);
108139
}
109-
110-
String message = (!response.isValid() ?
111-
response.toMap().get("errors").toString() :
112-
"Response is not a stream, is the operation actually a subscription?");
113-
114-
return Flux.error(new RejectedException(message));
140+
else if (response.isValid()) {
141+
return Flux.error(new InvalidException(
142+
"Expected a Publisher for a subscription operation. " +
143+
"This is either a server error or the operation is not a subscription"));
144+
}
145+
String errorData = encodeErrors(response).toString(StandardCharsets.UTF_8);
146+
return Flux.error(new RejectedException(errorData));
115147
});
116148
}
117149

@@ -120,4 +152,11 @@ private Mono<RSocketGraphQlResponse> handleInternal(Map<String, Object> payload)
120152
return this.executionChain.next(new RSocketGraphQlRequest(payload, requestId, null));
121153
}
122154

155+
@SuppressWarnings("unchecked")
156+
private DataBuffer encodeErrors(RSocketGraphQlResponse response) {
157+
return ((Encoder<List<GraphQLError>>) this.jsonEncoder).encodeValue(
158+
response.getExecutionResult().getErrors(),
159+
DefaultDataBufferFactory.sharedInstance, LIST_TYPE, MimeTypeUtils.APPLICATION_JSON, null);
160+
}
161+
123162
}

spring-graphql/src/main/java/org/springframework/graphql/server/webflux/CodecDelegate.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@
3636
import org.springframework.web.reactive.socket.WebSocketSession;
3737

3838
/**
39-
* Delegate that can be embedded in a class to help with encoding and decoding
40-
* GraphQL over WebSocket messages.
39+
* Helper class for encoding and decoding GraphQL messages.
4140
*
4241
* @author Rossen Stoyanchev
4342
* @since 1.0.0

spring-graphql/src/test/java/org/springframework/graphql/client/MockGraphQlWebSocketServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.springframework.graphql.GraphQlResponse;
3232
import org.springframework.graphql.support.DefaultGraphQlRequest;
3333
import org.springframework.graphql.server.support.GraphQlWebSocketMessage;
34+
import org.springframework.http.codec.ClientCodecConfigurer;
3435
import org.springframework.lang.Nullable;
3536
import org.springframework.web.reactive.socket.WebSocketHandler;
3637
import org.springframework.web.reactive.socket.WebSocketSession;
@@ -52,7 +53,7 @@ public final class MockGraphQlWebSocketServer implements WebSocketHandler {
5253

5354
private final Map<Map<String, Object>, Exchange> expectedExchanges = new LinkedHashMap<>();
5455

55-
private final CodecDelegate codecDelegate = new CodecDelegate();
56+
private final CodecDelegate codecDelegate = new CodecDelegate(ClientCodecConfigurer.create());
5657

5758

5859
/**

0 commit comments

Comments
 (0)