16
16
17
17
package org .springframework .cloud .gateway .filter .factory ;
18
18
19
- import static org .springframework .cloud .gateway .support .GatewayToStringStyler .filterToStringCreator ;
20
-
21
19
import java .io .IOException ;
22
20
import java .net .URI ;
23
21
import java .util .Arrays ;
28
26
29
27
import javax .net .ssl .SSLException ;
30
28
31
- import org .reactivestreams .Publisher ;
32
- import org .springframework .cloud .gateway .config .GrpcSslConfigurer ;
33
- import org .springframework .cloud .gateway .filter .GatewayFilter ;
34
- import org .springframework .cloud .gateway .filter .GatewayFilterChain ;
35
- import org .springframework .cloud .gateway .filter .NettyWriteResponseFilter ;
36
- import org .springframework .cloud .gateway .filter .OrderedGatewayFilter ;
37
- import org .springframework .cloud .gateway .route .Route ;
38
- import org .springframework .cloud .gateway .support .ServerWebExchangeUtils ;
39
- import org .springframework .core .ResolvableType ;
40
- import org .springframework .core .io .Resource ;
41
- import org .springframework .core .io .ResourceLoader ;
42
- import org .springframework .core .io .buffer .DataBuffer ;
43
- import org .springframework .core .io .buffer .NettyDataBufferFactory ;
44
- import org .springframework .http .codec .json .Jackson2JsonDecoder ;
45
- import org .springframework .http .server .reactive .ServerHttpResponseDecorator ;
46
- import org .springframework .web .server .ServerWebExchange ;
47
-
48
29
import com .fasterxml .jackson .core .JsonProcessingException ;
49
30
import com .fasterxml .jackson .databind .JsonNode ;
50
31
import com .fasterxml .jackson .databind .ObjectMapper ;
60
41
import com .google .protobuf .DynamicMessage ;
61
42
import com .google .protobuf .ProtocolStringList ;
62
43
import com .google .protobuf .util .JsonFormat ;
63
-
64
44
import io .grpc .CallOptions ;
65
45
import io .grpc .Channel ;
66
46
import io .grpc .ClientCall ;
70
50
import io .grpc .protobuf .ProtoUtils ;
71
51
import io .grpc .stub .ClientCalls ;
72
52
import io .netty .buffer .PooledByteBufAllocator ;
53
+ import org .reactivestreams .Publisher ;
73
54
import reactor .core .publisher .Flux ;
74
55
import reactor .core .publisher .Mono ;
75
56
57
+ import org .springframework .cloud .gateway .config .GrpcSslConfigurer ;
58
+ import org .springframework .cloud .gateway .filter .GatewayFilter ;
59
+ import org .springframework .cloud .gateway .filter .GatewayFilterChain ;
60
+ import org .springframework .cloud .gateway .filter .NettyWriteResponseFilter ;
61
+ import org .springframework .cloud .gateway .filter .OrderedGatewayFilter ;
62
+ import org .springframework .cloud .gateway .route .Route ;
63
+ import org .springframework .cloud .gateway .support .ServerWebExchangeUtils ;
64
+ import org .springframework .core .ResolvableType ;
65
+ import org .springframework .core .io .Resource ;
66
+ import org .springframework .core .io .ResourceLoader ;
67
+ import org .springframework .core .io .buffer .DataBuffer ;
68
+ import org .springframework .core .io .buffer .NettyDataBufferFactory ;
69
+ import org .springframework .http .codec .json .Jackson2JsonDecoder ;
70
+ import org .springframework .http .server .reactive .ServerHttpResponseDecorator ;
71
+ import org .springframework .web .server .ServerWebExchange ;
72
+
73
+ import static org .springframework .cloud .gateway .support .GatewayToStringStyler .filterToStringCreator ;
74
+
76
75
/**
77
- * This filter takes a JSON payload, transform it into a protobuf object, send
78
- * it to a
76
+ * This filter takes a JSON payload, transform it into a protobuf object, send it to a
79
77
* given gRPC channel, and transform the response back to JSON.
80
78
*
81
- * Making it transparent for the consumer that the service under the gateway is
82
- * a gRPC
79
+ * Making it transparent for the consumer that the service under the gateway is a gRPC
83
80
* one.
84
81
*
85
82
* @author Alberto C. Ríos
@@ -111,7 +108,7 @@ public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
111
108
112
109
ServerWebExchangeUtils .setAlreadyRouted (exchange );
113
110
return modifiedResponse .writeWith (exchange .getRequest ().getBody ())
114
- .then (chain .filter (exchange .mutate ().response (modifiedResponse ).build ()));
111
+ .then (chain .filter (exchange .mutate ().response (modifiedResponse ).build ()));
115
112
}
116
113
117
114
@ Override
@@ -189,7 +186,8 @@ class GRPCResponseDecorator extends ServerHttpResponseDecorator {
189
186
objectReader = objectMapper .readerFor (JsonNode .class );
190
187
objectNode = objectMapper .createObjectNode ();
191
188
192
- } catch (IOException | Descriptors .DescriptorValidationException e ) {
189
+ }
190
+ catch (IOException | Descriptors .DescriptorValidationException e ) {
193
191
throw new RuntimeException (e );
194
192
}
195
193
}
@@ -199,25 +197,24 @@ public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
199
197
exchange .getResponse ().getHeaders ().set ("Content-Type" , "application/json" );
200
198
201
199
return getDelegate ().writeWith (deserializeJSONRequest ().map (callGRPCServer ())
202
- .map (serialiseGRPCResponse ())
203
- .map (wrapGRPCResponse ())
204
- .cast (DataBuffer .class )
205
- .last ());
200
+ .map (serialiseGRPCResponse ())
201
+ .map (wrapGRPCResponse ())
202
+ .cast (DataBuffer .class )
203
+ .last ());
206
204
}
207
205
208
206
private ClientCall <DynamicMessage , DynamicMessage > createClientCallForType (Config config ,
209
207
Descriptors .ServiceDescriptor serviceDescriptor , Descriptors .Descriptor outputType ) {
210
208
MethodDescriptor .Marshaller <DynamicMessage > marshaller = ProtoUtils
211
- .marshaller (DynamicMessage .newBuilder (outputType ).build ());
209
+ .marshaller (DynamicMessage .newBuilder (outputType ).build ());
212
210
MethodDescriptor <DynamicMessage , DynamicMessage > methodDescriptor = MethodDescriptor
213
- .<DynamicMessage , DynamicMessage >newBuilder ()
214
- .setType (MethodDescriptor .MethodType .UNKNOWN )
215
- .setFullMethodName (
216
- MethodDescriptor .generateFullMethodName (serviceDescriptor .getFullName (),
217
- config .getMethod ()))
218
- .setRequestMarshaller (marshaller )
219
- .setResponseMarshaller (marshaller )
220
- .build ();
211
+ .<DynamicMessage , DynamicMessage >newBuilder ()
212
+ .setType (MethodDescriptor .MethodType .UNKNOWN )
213
+ .setFullMethodName (
214
+ MethodDescriptor .generateFullMethodName (serviceDescriptor .getFullName (), config .getMethod ()))
215
+ .setRequestMarshaller (marshaller )
216
+ .setResponseMarshaller (marshaller )
217
+ .build ();
221
218
Channel channel = createChannel ();
222
219
return channel .newCall (methodDescriptor , CallOptions .DEFAULT );
223
220
}
@@ -226,7 +223,7 @@ private Descriptors.MethodDescriptor getMethodDescriptor(Config config)
226
223
throws IOException , Descriptors .DescriptorValidationException {
227
224
Resource descriptorFile = resourceLoader .getResource (config .getProtoDescriptor ());
228
225
DescriptorProtos .FileDescriptorSet fileDescriptorSet = DescriptorProtos .FileDescriptorSet
229
- .parseFrom (descriptorFile .getInputStream ());
226
+ .parseFrom (descriptorFile .getInputStream ());
230
227
DescriptorProtos .FileDescriptorProto fileProto = fileDescriptorSet .getFile (0 );
231
228
Descriptors .FileDescriptor fileDescriptor = Descriptors .FileDescriptor .buildFrom (fileProto ,
232
229
dependencies (fileDescriptorSet , fileProto .getDependencyList ()));
@@ -239,9 +236,9 @@ private Descriptors.MethodDescriptor getMethodDescriptor(Config config)
239
236
List <Descriptors .MethodDescriptor > methods = serviceDescriptor .getMethods ();
240
237
241
238
return methods .stream ()
242
- .filter (method -> method .getName ().equals (config .getMethod ()))
243
- .findFirst ()
244
- .orElseThrow (() -> new NoSuchElementException ("No Method found" ));
239
+ .filter (method -> method .getName ().equals (config .getMethod ()))
240
+ .findFirst ()
241
+ .orElseThrow (() -> new NoSuchElementException ("No Method found" ));
245
242
}
246
243
247
244
private FileDescriptor [] dependencies (FileDescriptorSet input , ProtocolStringList list ) {
@@ -254,7 +251,8 @@ private FileDescriptor[] dependencies(FileDescriptorSet input, ProtocolStringLis
254
251
}
255
252
try {
256
253
deps [i ] = FileDescriptor .buildFrom (file , dependencies (input , file .getDependencyList ()));
257
- } catch (DescriptorValidationException e ) {
254
+ }
255
+ catch (DescriptorValidationException e ) {
258
256
throw new IllegalStateException ("Invalid descriptor: " + file .getName (), e );
259
257
}
260
258
}
@@ -281,7 +279,8 @@ private Function<JsonNode, DynamicMessage> callGRPCServer() {
281
279
DynamicMessage .Builder builder = DynamicMessage .newBuilder (descriptor );
282
280
JsonFormat .parser ().merge (jsonRequest .toString (), builder );
283
281
return ClientCalls .blockingUnaryCall (clientCall , builder .build ());
284
- } catch (IOException e ) {
282
+ }
283
+ catch (IOException e ) {
285
284
throw new RuntimeException (e );
286
285
}
287
286
};
@@ -291,8 +290,9 @@ private Function<DynamicMessage, Object> serialiseGRPCResponse() {
291
290
return gRPCResponse -> {
292
291
try {
293
292
return objectReader
294
- .readValue (JsonFormat .printer ().omittingInsignificantWhitespace ().print (gRPCResponse ));
295
- } catch (IOException e ) {
293
+ .readValue (JsonFormat .printer ().omittingInsignificantWhitespace ().print (gRPCResponse ));
294
+ }
295
+ catch (IOException e ) {
296
296
throw new RuntimeException (e );
297
297
}
298
298
};
@@ -312,8 +312,9 @@ private Function<Object, DataBuffer> wrapGRPCResponse() {
312
312
return jsonResponse -> {
313
313
try {
314
314
return new NettyDataBufferFactory (new PooledByteBufAllocator ())
315
- .wrap (Objects .requireNonNull (new ObjectMapper ().writeValueAsBytes (jsonResponse )));
316
- } catch (JsonProcessingException e ) {
315
+ .wrap (Objects .requireNonNull (new ObjectMapper ().writeValueAsBytes (jsonResponse )));
316
+ }
317
+ catch (JsonProcessingException e ) {
317
318
return new NettyDataBufferFactory (new PooledByteBufAllocator ()).allocateBuffer ();
318
319
}
319
320
};
@@ -324,7 +325,8 @@ private ManagedChannel createChannelChannel(String host, int port) {
324
325
NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder .forAddress (host , port );
325
326
try {
326
327
return grpcSslConfigurer .configureSsl (nettyChannelBuilder );
327
- } catch (SSLException e ) {
328
+ }
329
+ catch (SSLException e ) {
328
330
throw new RuntimeException (e );
329
331
}
330
332
}
0 commit comments