34
34
import com .fasterxml .jackson .databind .SerializationFeature ;
35
35
import com .fasterxml .jackson .databind .node .ObjectNode ;
36
36
import com .google .protobuf .DescriptorProtos ;
37
+ import com .google .protobuf .DescriptorProtos .FileDescriptorProto ;
38
+ import com .google .protobuf .DescriptorProtos .FileDescriptorSet ;
37
39
import com .google .protobuf .Descriptors ;
40
+ import com .google .protobuf .Descriptors .DescriptorValidationException ;
41
+ import com .google .protobuf .Descriptors .FileDescriptor ;
38
42
import com .google .protobuf .DynamicMessage ;
43
+ import com .google .protobuf .ProtocolStringList ;
39
44
import com .google .protobuf .util .JsonFormat ;
40
45
import io .grpc .CallOptions ;
41
46
import io .grpc .Channel ;
69
74
import static org .springframework .cloud .gateway .support .GatewayToStringStyler .filterToStringCreator ;
70
75
71
76
/**
72
- * This filter takes a JSON payload, transform it into a protobuf object, send it to a
77
+ * This filter takes a JSON payload, transform it into a protobuf object, send
78
+ * it to a
73
79
* given gRPC channel, and transform the response back to JSON.
74
80
*
75
- * Making it transparent for the consumer that the service under the gateway is a gRPC
81
+ * Making it transparent for the consumer that the service under the gateway is
82
+ * a gRPC
76
83
* one.
77
84
*
78
85
* @author Alberto C. Ríos
@@ -104,7 +111,7 @@ public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
104
111
105
112
ServerWebExchangeUtils .setAlreadyRouted (exchange );
106
113
return modifiedResponse .writeWith (exchange .getRequest ().getBody ())
107
- .then (chain .filter (exchange .mutate ().response (modifiedResponse ).build ()));
114
+ .then (chain .filter (exchange .mutate ().response (modifiedResponse ).build ()));
108
115
}
109
116
110
117
@ Override
@@ -193,8 +200,7 @@ class GRPCResponseDecorator extends ServerHttpResponseDecorator {
193
200
objectReader = objectMapper .readerFor (JsonNode .class );
194
201
objectNode = objectMapper .createObjectNode ();
195
202
196
- }
197
- catch (IOException | Descriptors .DescriptorValidationException e ) {
203
+ } catch (IOException | Descriptors .DescriptorValidationException e ) {
198
204
throw new RuntimeException (e );
199
205
}
200
206
}
@@ -204,24 +210,25 @@ public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
204
210
exchange .getResponse ().getHeaders ().set ("Content-Type" , "application/json" );
205
211
206
212
return getDelegate ().writeWith (deserializeJSONRequest ().map (callGRPCServer ())
207
- .map (serialiseGRPCResponse ())
208
- .map (wrapGRPCResponse ())
209
- .cast (DataBuffer .class )
210
- .last ());
213
+ .map (serialiseGRPCResponse ())
214
+ .map (wrapGRPCResponse ())
215
+ .cast (DataBuffer .class )
216
+ .last ());
211
217
}
212
218
213
219
private ClientCall <DynamicMessage , DynamicMessage > createClientCallForType (Config config ,
214
220
Descriptors .ServiceDescriptor serviceDescriptor , Descriptors .Descriptor outputType ) {
215
221
MethodDescriptor .Marshaller <DynamicMessage > marshaller = ProtoUtils
216
- .marshaller (DynamicMessage .newBuilder (outputType ).build ());
222
+ .marshaller (DynamicMessage .newBuilder (outputType ).build ());
217
223
MethodDescriptor <DynamicMessage , DynamicMessage > methodDescriptor = MethodDescriptor
218
- .<DynamicMessage , DynamicMessage >newBuilder ()
219
- .setType (MethodDescriptor .MethodType .UNKNOWN )
220
- .setFullMethodName (
221
- MethodDescriptor .generateFullMethodName (serviceDescriptor .getFullName (), config .getMethod ()))
222
- .setRequestMarshaller (marshaller )
223
- .setResponseMarshaller (marshaller )
224
- .build ();
224
+ .<DynamicMessage , DynamicMessage >newBuilder ()
225
+ .setType (MethodDescriptor .MethodType .UNKNOWN )
226
+ .setFullMethodName (
227
+ MethodDescriptor .generateFullMethodName (serviceDescriptor .getFullName (),
228
+ config .getMethod ()))
229
+ .setRequestMarshaller (marshaller )
230
+ .setResponseMarshaller (marshaller )
231
+ .build ();
225
232
Channel channel = createChannel ();
226
233
return channel .newCall (methodDescriptor , CallOptions .DEFAULT );
227
234
}
@@ -230,10 +237,10 @@ private Descriptors.MethodDescriptor getMethodDescriptor(Config config)
230
237
throws IOException , Descriptors .DescriptorValidationException {
231
238
Resource descriptorFile = resourceLoader .getResource (config .getProtoDescriptor ());
232
239
DescriptorProtos .FileDescriptorSet fileDescriptorSet = DescriptorProtos .FileDescriptorSet
233
- .parseFrom (descriptorFile .getInputStream ());
240
+ .parseFrom (descriptorFile .getInputStream ());
234
241
DescriptorProtos .FileDescriptorProto fileProto = fileDescriptorSet .getFile (0 );
235
242
Descriptors .FileDescriptor fileDescriptor = Descriptors .FileDescriptor .buildFrom (fileProto ,
236
- new Descriptors . FileDescriptor [ 0 ] );
243
+ dependencies ( fileDescriptorSet , fileProto . getDependencyList ()) );
237
244
238
245
Descriptors .ServiceDescriptor serviceDescriptor = fileDescriptor .findServiceByName (config .getService ());
239
246
if (serviceDescriptor == null ) {
@@ -243,9 +250,35 @@ private Descriptors.MethodDescriptor getMethodDescriptor(Config config)
243
250
List <Descriptors .MethodDescriptor > methods = serviceDescriptor .getMethods ();
244
251
245
252
return methods .stream ()
246
- .filter (method -> method .getName ().equals (config .getMethod ()))
247
- .findFirst ()
248
- .orElseThrow (() -> new NoSuchElementException ("No Method found" ));
253
+ .filter (method -> method .getName ().equals (config .getMethod ()))
254
+ .findFirst ()
255
+ .orElseThrow (() -> new NoSuchElementException ("No Method found" ));
256
+ }
257
+
258
+ private FileDescriptor [] dependencies (FileDescriptorSet input , ProtocolStringList list ) {
259
+ FileDescriptor [] deps = new FileDescriptor [list .size ()];
260
+ for (int i = 0 ; i < list .size (); i ++) {
261
+ String name = list .get (i );
262
+ FileDescriptorProto file = findFileByName (input , name );
263
+ if (file == null ) {
264
+ throw new IllegalStateException ("Missing dependency: " + name );
265
+ }
266
+ try {
267
+ deps [i ] = FileDescriptor .buildFrom (file , dependencies (input , file .getDependencyList ()));
268
+ } catch (DescriptorValidationException e ) {
269
+ throw new IllegalStateException ("Invalid descriptor: " + file .getName (), e );
270
+ }
271
+ }
272
+ return deps ;
273
+ }
274
+
275
+ private FileDescriptorProto findFileByName (FileDescriptorSet input , String name ) {
276
+ for (FileDescriptorProto file : input .getFileList ()) {
277
+ if (file .getName ().equals (name )) {
278
+ return file ;
279
+ }
280
+ }
281
+ return null ;
249
282
}
250
283
251
284
private ManagedChannel createChannel () {
@@ -259,8 +292,7 @@ private Function<JsonNode, DynamicMessage> callGRPCServer() {
259
292
DynamicMessage .Builder builder = DynamicMessage .newBuilder (descriptor );
260
293
JsonFormat .parser ().merge (jsonRequest .toString (), builder );
261
294
return ClientCalls .blockingUnaryCall (clientCall , builder .build ());
262
- }
263
- catch (IOException e ) {
295
+ } catch (IOException e ) {
264
296
throw new RuntimeException (e );
265
297
}
266
298
};
@@ -270,9 +302,8 @@ private Function<DynamicMessage, Object> serialiseGRPCResponse() {
270
302
return gRPCResponse -> {
271
303
try {
272
304
return objectReader
273
- .readValue (JsonFormat .printer ().omittingInsignificantWhitespace ().print (gRPCResponse ));
274
- }
275
- catch (IOException e ) {
305
+ .readValue (JsonFormat .printer ().omittingInsignificantWhitespace ().print (gRPCResponse ));
306
+ } catch (IOException e ) {
276
307
throw new RuntimeException (e );
277
308
}
278
309
};
@@ -292,9 +323,8 @@ private Function<Object, DataBuffer> wrapGRPCResponse() {
292
323
return jsonResponse -> {
293
324
try {
294
325
return new NettyDataBufferFactory (new PooledByteBufAllocator ())
295
- .wrap (Objects .requireNonNull (new ObjectMapper ().writeValueAsBytes (jsonResponse )));
296
- }
297
- catch (JsonProcessingException e ) {
326
+ .wrap (Objects .requireNonNull (new ObjectMapper ().writeValueAsBytes (jsonResponse )));
327
+ } catch (JsonProcessingException e ) {
298
328
return new NettyDataBufferFactory (new PooledByteBufAllocator ()).allocateBuffer ();
299
329
}
300
330
};
@@ -305,8 +335,7 @@ private ManagedChannel createChannelChannel(String host, int port) {
305
335
NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder .forAddress (host , port );
306
336
try {
307
337
return grpcSslConfigurer .configureSsl (nettyChannelBuilder );
308
- }
309
- catch (SSLException e ) {
338
+ } catch (SSLException e ) {
310
339
throw new RuntimeException (e );
311
340
}
312
341
}
0 commit comments