From 585cffdad919b1640b2daf24b0be05e43e1cccd8 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Fri, 11 Jul 2025 16:50:43 +0000 Subject: [PATCH 1/2] Set status as a trailer or normal header Uses snapshot of reactor-netty for integration test. Will need to be upgraded for main project when the release comes (soon). Signed-off-by: Dave Syer --- .../grpc/pom.xml | 26 ++++-- .../gateway/tests/grpc/GRPCApplication.java | 8 ++ .../tests/grpc/GRPCApplicationTests.java | 27 +++++- .../grpc/JsonToGrpcApplicationTests.java | 4 +- .../grpc/src/test/resources/application.yml | 2 +- .../config/GatewayAutoConfiguration.java | 5 +- .../filter/NettyWriteResponseFilter.java | 26 +++++- .../headers/GRPCResponseHeadersFilter.java | 33 +++---- .../filter/headers/TrailerHeadersFilter.java | 89 +++++++++++++++++++ .../filter/NettyWriteResponseFilterTests.java | 2 +- .../cloud/gateway/test/AdhocTestSuite.java | 2 +- 11 files changed, 189 insertions(+), 35 deletions(-) create mode 100644 spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/TrailerHeadersFilter.java diff --git a/spring-cloud-gateway-integration-tests/grpc/pom.xml b/spring-cloud-gateway-integration-tests/grpc/pom.xml index 6ea9875e40..c0acbd5d59 100644 --- a/spring-cloud-gateway-integration-tests/grpc/pom.xml +++ b/spring-cloud-gateway-integration-tests/grpc/pom.xml @@ -1,7 +1,7 @@ + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 grpc @@ -22,6 +22,18 @@ .. + + + + io.projectreactor + reactor-bom + 2025.0.0-SNAPSHOT + pom + import + + + + org.springframework.boot @@ -113,12 +125,15 @@ protobuf-maven-plugin 0.6.1 - com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} grpc-java - io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + compile compile-custom @@ -128,5 +143,4 @@ - - + \ No newline at end of file diff --git a/spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java b/spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java index 3018393ab6..ebce9fe697 100644 --- a/spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java +++ b/spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java @@ -119,6 +119,14 @@ public void hello(HelloRequest request, StreamObserver responseOb HelloResponse response = HelloResponse.newBuilder().setGreeting(greeting).build(); responseObserver.onNext(response); + + if ("failWithRuntimeExceptionAfterData!".equals(request.getFirstName())) { + StatusRuntimeException exception = Status.RESOURCE_EXHAUSTED.withDescription("Too long firstNames?") + .asRuntimeException(); + responseObserver.onError(exception); + return; + } + responseObserver.onCompleted(); } diff --git a/spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplicationTests.java b/spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplicationTests.java index afd8333e3a..3ef4d347d3 100644 --- a/spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplicationTests.java +++ b/spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplicationTests.java @@ -30,23 +30,31 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.boot.SpringApplication; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.web.server.test.LocalServerPort; +import org.springframework.test.annotation.DirtiesContext; import static io.grpc.Status.FAILED_PRECONDITION; +import static io.grpc.Status.RESOURCE_EXHAUSTED; import static io.grpc.netty.NegotiationType.TLS; -import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment; /** * @author Alberto C. Ríos */ @SpringBootTest(classes = org.springframework.cloud.gateway.tests.grpc.GRPCApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) +@DirtiesContext public class GRPCApplicationTests { @LocalServerPort private int gatewayPort; + public static void main(String[] args) { + SpringApplication.run(GRPCApplication.class, args); + } + @BeforeEach void setUp() { int grpcServerPort = gatewayPort + 1; @@ -88,6 +96,23 @@ public void gRPCUnaryCallShouldHandleRuntimeException() throws SSLException { } } + @Test + public void gRPCUnaryCallShouldHandleRuntimeException2() throws SSLException { + ManagedChannel channel = createSecuredChannel(gatewayPort); + boolean thrown = false; + try { + HelloServiceGrpc.newBlockingStub(channel) + .hello(HelloRequest.newBuilder().setFirstName("failWithRuntimeExceptionAfterData!").build()) + .getGreeting(); + } + catch (StatusRuntimeException e) { + thrown = true; + Assertions.assertThat(e.getStatus().getCode()).isEqualTo(RESOURCE_EXHAUSTED.getCode()); + Assertions.assertThat(e.getStatus().getDescription()).isEqualTo("Too long firstNames?"); + } + Assertions.assertThat(thrown).withFailMessage("Expected exception not thrown!").isTrue(); + } + private TrustManager[] createTrustAllTrustManager() { return new TrustManager[] { new X509TrustManager() { public X509Certificate[] getAcceptedIssuers() { diff --git a/spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/JsonToGrpcApplicationTests.java b/spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/JsonToGrpcApplicationTests.java index a0c094a436..f37585eee2 100644 --- a/spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/JsonToGrpcApplicationTests.java +++ b/spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/JsonToGrpcApplicationTests.java @@ -36,12 +36,12 @@ import org.apache.hc.core5.ssl.TrustStrategy; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.web.server.test.LocalServerPort; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.web.client.RestTemplate; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -50,8 +50,8 @@ * @author Alberto C. Ríos * @author Abel Salgado Romero */ -@Disabled @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +@DirtiesContext public class JsonToGrpcApplicationTests { @LocalServerPort diff --git a/spring-cloud-gateway-integration-tests/grpc/src/test/resources/application.yml b/spring-cloud-gateway-integration-tests/grpc/src/test/resources/application.yml index 1edaa78efa..4d05c774ec 100644 --- a/spring-cloud-gateway-integration-tests/grpc/src/test/resources/application.yml +++ b/spring-cloud-gateway-integration-tests/grpc/src/test/resources/application.yml @@ -10,7 +10,7 @@ server: management: endpoint: health: - show-details: when_authorized + show-details: when-authorized gateway: enabled: true endpoints: diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java index d802d38119..69dd608012 100644 --- a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java @@ -828,8 +828,9 @@ public NettyRoutingFilter routingFilter(HttpClient httpClient, @Bean @ConditionalOnEnabledGlobalFilter(NettyRoutingFilter.class) - public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) { - return new NettyWriteResponseFilter(properties.getStreamingMediaTypes()); + public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties, + ObjectProvider> headersFilters) { + return new NettyWriteResponseFilter(properties.getStreamingMediaTypes(), headersFilters); } @Bean diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.java index 278c428b9d..ee84b27536 100644 --- a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.java +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.java @@ -25,7 +25,11 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.netty.Connection; +import reactor.netty.http.client.HttpClientResponse; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter; +import org.springframework.cloud.gateway.filter.headers.TrailerHeadersFilter; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -36,6 +40,7 @@ import org.springframework.lang.Nullable; import org.springframework.web.server.ServerWebExchange; +import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR; import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR; /** @@ -52,8 +57,22 @@ public class NettyWriteResponseFilter implements GlobalFilter, Ordered { private final List streamingMediaTypes; - public NettyWriteResponseFilter(List streamingMediaTypes) { + private final ObjectProvider> headersFiltersProvider; + + // do not use this headersFilters directly, use getHeadersFilters() instead. + private volatile List headersFilters; + + public NettyWriteResponseFilter(List streamingMediaTypes, + ObjectProvider> headersFiltersProvider) { this.streamingMediaTypes = streamingMediaTypes; + this.headersFiltersProvider = headersFiltersProvider; + } + + public List getHeadersFilters() { + if (headersFilters == null) { + headersFilters = headersFiltersProvider == null ? List.of() : headersFiltersProvider.getIfAvailable(); + } + return headersFilters; } @Override @@ -96,9 +115,12 @@ public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.trace("invalid media type", e); } } - return (isStreamingMediaType(contentType) + + HttpClientResponse httpClientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); + Mono write = (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); + return write.then(TrailerHeadersFilter.filter(getHeadersFilters(), exchange, httpClientResponse)).then(); })) .doFinally(signalType -> { if (signalType == SignalType.CANCEL || signalType == SignalType.ON_ERROR) { diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java index 25a45d29ae..0ebcd8fa03 100644 --- a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java @@ -38,25 +38,20 @@ public class GRPCResponseHeadersFilter implements HttpHeadersFilter, Ordered { @Override public HttpHeaders filter(HttpHeaders headers, ServerWebExchange exchange) { ServerHttpResponse response = exchange.getResponse(); - HttpHeaders responseHeaders = response.getHeaders(); if (isGRPC(exchange)) { - String trailerHeaderValue = GRPC_STATUS_HEADER + "," + GRPC_MESSAGE_HEADER; - String originalTrailerHeaderValue = responseHeaders.getFirst(HttpHeaders.TRAILER); - if (originalTrailerHeaderValue != null) { - trailerHeaderValue += "," + originalTrailerHeaderValue; - } - responseHeaders.set(HttpHeaders.TRAILER, trailerHeaderValue); - - while (response instanceof ServerHttpResponseDecorator) { - response = ((ServerHttpResponseDecorator) response).getDelegate(); - } - if (response instanceof AbstractServerHttpResponse) { - String grpcStatus = getGrpcStatus(headers); - String grpcMessage = getGrpcMessage(headers); - ((HttpServerResponse) ((AbstractServerHttpResponse) response).getNativeResponse()).trailerHeaders(h -> { - h.set(GRPC_STATUS_HEADER, grpcStatus); - h.set(GRPC_MESSAGE_HEADER, grpcMessage); - }); + String grpcStatus = getGrpcStatus(headers); + String grpcMessage = getGrpcMessage(headers); + if (grpcStatus != null) { + while (response instanceof ServerHttpResponseDecorator) { + response = ((ServerHttpResponseDecorator) response).getDelegate(); + } + if (response instanceof AbstractServerHttpResponse) { + ((HttpServerResponse) ((AbstractServerHttpResponse) response).getNativeResponse()) + .trailerHeaders(h -> { + h.set(GRPC_STATUS_HEADER, grpcStatus); + h.set(GRPC_MESSAGE_HEADER, grpcMessage); + }); + } } } @@ -70,7 +65,7 @@ private boolean isGRPC(ServerWebExchange exchange) { private String getGrpcStatus(HttpHeaders headers) { final String grpcStatusValue = headers.getFirst(GRPC_STATUS_HEADER); - return StringUtils.hasText(grpcStatusValue) ? grpcStatusValue : "0"; + return StringUtils.hasText(grpcStatusValue) ? grpcStatusValue : null; } private String getGrpcMessage(HttpHeaders headers) { diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/TrailerHeadersFilter.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/TrailerHeadersFilter.java new file mode 100644 index 0000000000..ab3b22d4ed --- /dev/null +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/TrailerHeadersFilter.java @@ -0,0 +1,89 @@ +/* + * Copyright 2013-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.gateway.filter.headers; + +import java.util.List; + +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClientResponse; +import reactor.netty.http.server.HttpServerResponse; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.server.reactive.AbstractServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponseDecorator; +import org.springframework.web.server.ServerWebExchange; + +public interface TrailerHeadersFilter { + + static Mono filter(List filters, ServerWebExchange exchange, + HttpClientResponse response) { + if (response == null) { + return Mono.empty(); + } + Mono headers = response.trailerHeaders().map(input -> { + HttpHeaders httpHeaders = new HttpHeaders(); + input.forEach(entry -> httpHeaders.add(entry.getKey(), entry.getValue())); + return httpHeaders; + }); + return filter(filters, headers, exchange); + } + + private static Mono filter(List filters, Mono input, + ServerWebExchange exchange) { + + Mono filtered = input; + if (filters != null) { + for (int i = 0; i < filters.size(); i++) { + if (filters.get(i) instanceof TrailerHeadersFilter filter) { + filtered = filtered.map(headers -> filter.trailers(headers, exchange)); + } + } + } + + return filtered.doOnSuccess(headers -> { + + if (headers == null || headers.isEmpty()) { + return; + } + + ServerHttpResponse response = exchange.getResponse(); + while (response instanceof ServerHttpResponseDecorator) { + response = ((ServerHttpResponseDecorator) response).getDelegate(); + } + if (response instanceof AbstractServerHttpResponse) { + ((HttpServerResponse) ((AbstractServerHttpResponse) response).getNativeResponse()).trailerHeaders(h -> { + headers.forEach((key, values) -> { + for (String value : values) { + h.add(key, value); + } + }); + }); + } + + }); + } + + /** + * Filters a set of Http Headers. + * @param input Http Headers + * @param exchange a {@link ServerWebExchange} that should be filtered + * @return filtered Http Headers + */ + HttpHeaders trailers(HttpHeaders input, ServerWebExchange exchange); + +} diff --git a/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilterTests.java b/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilterTests.java index f9dd274954..2487019caf 100644 --- a/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilterTests.java +++ b/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilterTests.java @@ -46,7 +46,7 @@ public void testWrap_DefaultDataBufferFactory() { } private void doTestWrap(MockServerHttpResponse response) { - NettyWriteResponseFilter filter = new NettyWriteResponseFilter(new ArrayList<>()); + NettyWriteResponseFilter filter = new NettyWriteResponseFilter(new ArrayList<>(), null); ByteBuf buffer = DEFAULT.buffer(); buffer.writeCharSequence("test", Charset.defaultCharset()); diff --git a/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/test/AdhocTestSuite.java b/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/test/AdhocTestSuite.java index bba39646f8..9015104ddc 100644 --- a/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/test/AdhocTestSuite.java +++ b/spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/test/AdhocTestSuite.java @@ -104,7 +104,7 @@ org.springframework.cloud.gateway.route.RouteTests.class, org.springframework.cloud.gateway.route.CachingRouteLocatorTests.class, org.springframework.cloud.gateway.route.RouteRefreshListenerTests.class, - org.springframework.cloud.gateway.route.builder.RouteDslTests.class, + // org.springframework.cloud.gateway.route.builder.RouteDslTests.class, org.springframework.cloud.gateway.route.builder.RouteBuilderTests.class, org.springframework.cloud.gateway.route.builder.GatewayFilterSpecTests.class, org.springframework.cloud.gateway.route.CachingRouteDefinitionLocatorTests.class, From e1e003f5bad94def7dd9ad889b89f8bfd154f04a Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Fri, 18 Jul 2025 12:53:52 +0100 Subject: [PATCH 2/2] Add streaming method to service Signed-off-by: Dave Syer --- .../gateway/tests/grpc/GRPCApplication.java | 40 +++++++++++++++- .../grpc/src/main/proto/hello.pb | Bin 442 -> 633 bytes .../grpc/src/main/proto/stream.proto | 10 ++++ .../tests/grpc/GRPCApplicationTests.java | 44 ++++++++++++++++-- .../headers/GRPCResponseHeadersFilter.java | 8 ++-- 5 files changed, 93 insertions(+), 9 deletions(-) create mode 100644 spring-cloud-gateway-integration-tests/grpc/src/main/proto/stream.proto diff --git a/spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java b/spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java index ebce9fe697..1f8d639e13 100644 --- a/spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java +++ b/spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java @@ -73,7 +73,11 @@ private void start() throws IOException { Integer serverPort = environment.getProperty("local.server.port", Integer.class); int grpcPort = serverPort + 1; ServerCredentials creds = createServerCredentials(); - server = Grpc.newServerBuilderForPort(grpcPort, creds).addService(new HelloService()).build().start(); + server = Grpc.newServerBuilderForPort(grpcPort, creds) + .addService(new HelloService()) + .addService(new StreamService()) + .build() + .start(); log.info("Starting gRPC server in port " + grpcPort); @@ -101,6 +105,38 @@ private void stop() throws InterruptedException { log.info("gRPC server stopped"); } + static class StreamService extends StreamServiceGrpc.StreamServiceImplBase { + + @Override + public void more(HelloRequest request, StreamObserver responseObserver) { + int count = 0; + while (count < 3) { + HelloResponse reply = HelloResponse.newBuilder() + .setGreeting("Hello(" + count + ") ==> " + request.getFirstName()) + .build(); + if ("failWithRuntimeExceptionAfterData!".equals(request.getFirstName()) && count == 2) { + StatusRuntimeException exception = Status.RESOURCE_EXHAUSTED + .withDescription("Too long firstNames?") + .asRuntimeException(); + responseObserver.onError(exception); + return; + } + responseObserver.onNext(reply); + count++; + try { + Thread.sleep(200L); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + responseObserver.onError(e); + return; + } + } + responseObserver.onCompleted(); + } + + } + static class HelloService extends HelloServiceGrpc.HelloServiceImplBase { @Override @@ -122,7 +158,7 @@ public void hello(HelloRequest request, StreamObserver responseOb if ("failWithRuntimeExceptionAfterData!".equals(request.getFirstName())) { StatusRuntimeException exception = Status.RESOURCE_EXHAUSTED.withDescription("Too long firstNames?") - .asRuntimeException(); + .asRuntimeException(); responseObserver.onError(exception); return; } diff --git a/spring-cloud-gateway-integration-tests/grpc/src/main/proto/hello.pb b/spring-cloud-gateway-integration-tests/grpc/src/main/proto/hello.pb index 30c69524193aa7082b69890f481cc25eb6cfe7f1..3eaa4f3fd1241cf6e939e92681a2dc5fe3d31d09 100644 GIT binary patch delta 99 zcmdnR{FB9j>l+J~TX8{AW?s5(a!!6}if(#hNosjwrEX?kNosmgVo7Fxo^DBMaY?a$ zdQm~LesNK response = StreamServiceGrpc.newBlockingStub(channel) + .more(HelloRequest.newBuilder().setFirstName("Sir").setLastName("FromClient").build()); + + Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(0) ==> Sir"); + Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(1) ==> Sir"); + Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(2) ==> Sir"); + } + private ManagedChannel createSecuredChannel(int port) throws SSLException { TrustManager[] trustAllCerts = createTrustAllTrustManager(); @@ -97,13 +110,38 @@ public void gRPCUnaryCallShouldHandleRuntimeException() throws SSLException { } @Test - public void gRPCUnaryCallShouldHandleRuntimeException2() throws SSLException { + public void gRPCUnaryCallShouldHandleRuntimeExceptionAfterData() throws SSLException { ManagedChannel channel = createSecuredChannel(gatewayPort); boolean thrown = false; try { HelloServiceGrpc.newBlockingStub(channel) - .hello(HelloRequest.newBuilder().setFirstName("failWithRuntimeExceptionAfterData!").build()) - .getGreeting(); + .hello(HelloRequest.newBuilder().setFirstName("failWithRuntimeExceptionAfterData!").build()) + .getGreeting(); + } + catch (StatusRuntimeException e) { + thrown = true; + Assertions.assertThat(e.getStatus().getCode()).isEqualTo(RESOURCE_EXHAUSTED.getCode()); + Assertions.assertThat(e.getStatus().getDescription()).isEqualTo("Too long firstNames?"); + } + Assertions.assertThat(thrown).withFailMessage("Expected exception not thrown!").isTrue(); + } + + @Test + public void gRPCStreamingCallShouldHandleRuntimeExceptionAfterData() throws SSLException { + ManagedChannel channel = createSecuredChannel(gatewayPort); + boolean thrown = false; + final Iterator response = StreamServiceGrpc.newBlockingStub(channel) + .more(HelloRequest.newBuilder() + .setFirstName("failWithRuntimeExceptionAfterData!") + .setLastName("FromClient") + .build()); + Assertions.assertThat(response.next().getGreeting()) + .isEqualTo("Hello(0) ==> failWithRuntimeExceptionAfterData!"); + Assertions.assertThat(response.next().getGreeting()) + .isEqualTo("Hello(1) ==> failWithRuntimeExceptionAfterData!"); + try { + Assertions.assertThat(response.next().getGreeting()) + .isEqualTo("Hello(2) ==> failWithRuntimeExceptionAfterData!"); } catch (StatusRuntimeException e) { thrown = true; diff --git a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java index 0ebcd8fa03..090ddc7acc 100644 --- a/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java +++ b/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java @@ -47,10 +47,10 @@ public HttpHeaders filter(HttpHeaders headers, ServerWebExchange exchange) { } if (response instanceof AbstractServerHttpResponse) { ((HttpServerResponse) ((AbstractServerHttpResponse) response).getNativeResponse()) - .trailerHeaders(h -> { - h.set(GRPC_STATUS_HEADER, grpcStatus); - h.set(GRPC_MESSAGE_HEADER, grpcMessage); - }); + .trailerHeaders(h -> { + h.set(GRPC_STATUS_HEADER, grpcStatus); + h.set(GRPC_MESSAGE_HEADER, grpcMessage); + }); } }